mirror of
https://github.com/bitcoin/bips.git
synced 2026-04-20 16:28:39 +00:00
Merge pull request #2046 from macgyver13/bip375-reference-testvectors-pr
BIP375: Add test vectors + validator
This commit is contained in:
@@ -10,6 +10,7 @@
|
||||
Assigned: 2025-01-08
|
||||
License: BSD-2-Clause
|
||||
Discussion: https://groups.google.com/g/bitcoindev/c/5G5wzqUXyk4
|
||||
Version: 0.1.1
|
||||
Requires: 352, 370, 374
|
||||
</pre>
|
||||
|
||||
@@ -177,7 +178,7 @@ All rules must be followed from PSBTv2 for this role. If there are any outputs w
|
||||
If any input is spending an output with script using Segwit version > 1, the Signer must fail.
|
||||
|
||||
For each output with PSBT_OUT_SP_V0_INFO set, the Signer should:
|
||||
* Compute and set an ECDH share and DLEQ proof for each input it has the private key for, or set a global ECDH share and DLEQ proof if it has private keys for all eligible inputs.
|
||||
* Compute and set an ECDH share and DLEQ proof for each eligible input it has the private key for, or set a global ECDH share and DLEQ proof if it has private keys for all eligible inputs.
|
||||
* Verify the DLEQ proofs for all inputs it does not have the private keys for, or the global DLEQ proof if it is set.
|
||||
* If all eligible inputs have an ECDH share or the global ECDH share is set, compute and set the PSBT_OUT_SCRIPT.
|
||||
|
||||
@@ -235,7 +236,7 @@ Using [https://github.com/bitcoin/bips/blob/master/bip-0374.mediawiki#dleq-proof
|
||||
|
||||
====Computing the Output Scripts====
|
||||
|
||||
Compute the PSBT_OUT_SCRIPT using the procedure in [https://github.com/bitcoin/bips/blob/master/bip-0352.mediawiki#user-content-Creating_outputs BIP352] but substituting ''a·B<sub>scan</sub>'' with the PSBT_GLOBAL_SP_ECDH_SHARE for that scan key if available, or the sum of all PSBT_IN_SP_ECDH_SHAREs for that scan key.
|
||||
Compute the PSBT_OUT_SCRIPT using the procedure in [https://github.com/bitcoin/bips/blob/master/bip-0352.mediawiki#user-content-Creating_outputs BIP352] but substituting ''a·B<sub>scan</sub>'' with the PSBT_GLOBAL_SP_ECDH_SHARE for that scan key if available, or the sum of all PSBT_IN_SP_ECDH_SHAREs from eligible inputs for that scan key.
|
||||
If there are multiple silent payment codes with the same scan key, sort the codes lexicographically in ascending order to determine the ordering of the ''k'' value.
|
||||
If there are multiple silent payment codes with both the same scan and spend keys, sort the subgroup by output index in ascending order.
|
||||
|
||||
@@ -249,7 +250,163 @@ Silent payment capable PSBTs are backwards compatible with PSBTv2 once all outpu
|
||||
|
||||
==Test Vectors==
|
||||
|
||||
Todo
|
||||
A [[bip-0375/bip375_test_vectors.json|collection of test vectors in JSON format]] is provided. Each test vector contains a base64-encoded PSBT string, which alone can be used to verify sending Silent Payments with PSBTs.
|
||||
Validation is performed in 4 sequential checks. This [[bip-0375/validator/validate_psbt.py|Python implementation]] demonstrates the validation logic for each:
|
||||
|
||||
# '''PSBT Structure''' - Verify BIP375 field requirements
|
||||
# '''ECDH Coverage''' - Verify ECDH share presence and correctness using BIP374 DLEQ
|
||||
# '''Input Eligibility''' - Verify input constraints when silent payment outputs are present
|
||||
# '''Output Scripts''' - Verify output scripts match silent payment derivation
|
||||
|
||||
Valid PSBTs are organized into 2 categories:
|
||||
# '''Can Finalize''' - Signed and ready to finalize
|
||||
# '''In Progress''' - Incomplete but valid
|
||||
|
||||
Use the provided [[bip-0375/test_runner.py|test runner]] to validate each test vector. See the [[bip-0375/README.md|README]] for more details.
|
||||
|
||||
===Invalid PSBTs===
|
||||
|
||||
{|
|
||||
! Category
|
||||
! Description
|
||||
|-
|
||||
| PSBT Structure
|
||||
| missing PSBT_OUT_SP_V0_INFO field when PSBT_OUT_SP_V0_LABEL set
|
||||
|-
|
||||
| PSBT Structure
|
||||
| incorrect byte length for PSBT_OUT_SP_V0_INFO field
|
||||
|-
|
||||
| PSBT Structure
|
||||
| incorrect byte length for PSBT_IN_SP_ECDH_SHARE field
|
||||
|-
|
||||
| PSBT Structure
|
||||
| incorrect byte length for PSBT_IN_SP_DLEQ field
|
||||
|-
|
||||
| PSBT Structure
|
||||
| PSBT_GLOBAL_TX_MODIFIABLE field is non-zero when PSBT_OUT_SCRIPT set for sp output
|
||||
|-
|
||||
| PSBT Structure
|
||||
| missing PSBT_OUT_SCRIPT field when sending to non-sp output
|
||||
|-
|
||||
| ECDH Coverage
|
||||
| only one ineligible P2SH multisig input when PSBT_OUT_SCRIPT set for sp output
|
||||
|-
|
||||
| ECDH Coverage
|
||||
| missing PSBT_IN_SP_ECDH_SHARE field for input 0 when PSBT_OUT_SCRIPT set for sp output
|
||||
|-
|
||||
| ECDH Coverage
|
||||
| missing PSBT_IN_SP_DLEQ field for input when PSBT_IN_SP_ECDH_SHARE set
|
||||
|-
|
||||
| ECDH Coverage
|
||||
| missing PSBT_GLOBAL_SP_DLEQ field when PSBT_GLOBAL_SP_ECDH_SHARE set
|
||||
|-
|
||||
| ECDH Coverage
|
||||
| invalid proof in PSBT_IN_SP_DLEQ field
|
||||
|-
|
||||
| ECDH Coverage
|
||||
| invalid proof in PSBT_GLOBAL_SP_DLEQ field
|
||||
|-
|
||||
| ECDH Coverage
|
||||
| missing PSBT_IN_BIP32_DERIVATION field for input when PSBT_IN_SP_DLEQ set
|
||||
|-
|
||||
| ECDH Coverage
|
||||
| output 1 missing ECDH share for scan key with one input / three sp outputs (different scan keys)
|
||||
|-
|
||||
| ECDH Coverage
|
||||
| input 1 missing ECDH share for output 1 with two inputs / two sp outputs (different scan keys)
|
||||
|-
|
||||
| ECDH Coverage
|
||||
| input 1 missing ECDH share for scan key with two inputs / one sp output
|
||||
|-
|
||||
| Input Eligibility
|
||||
| segwit version greater than 1 in transaction inputs with sp output
|
||||
|-
|
||||
| Input Eligibility
|
||||
| non-SIGHASH_ALL signature on input with sp output
|
||||
|-
|
||||
| Output Scripts
|
||||
| P2TR input with NUMS internal key cannot derive sp output
|
||||
|-
|
||||
| Output Scripts
|
||||
| PSBT_OUT_SCRIPT does not match derived sp output
|
||||
|-
|
||||
| Output Scripts
|
||||
| two sp outputs (same scan / different spend keys) not sorted lexicographically by spend key
|
||||
|-
|
||||
| Output Scripts
|
||||
| k values assigned to wrong output indices with three sp outputs (same scan / spend keys)
|
||||
|}
|
||||
|
||||
===Valid PSBTs===
|
||||
|
||||
{|
|
||||
! State
|
||||
! Description
|
||||
|-
|
||||
| Can Finalize
|
||||
| one P2PKH input single-signer
|
||||
|-
|
||||
| Can Finalize
|
||||
| two inputs single-signer using global ECDH share
|
||||
|-
|
||||
| Can Finalize
|
||||
| two inputs single-signer using per-input ECDH shares
|
||||
|-
|
||||
| Can Finalize
|
||||
| two inputs / two sp outputs with mixed global and per-input ECDH shares
|
||||
|-
|
||||
| Can Finalize
|
||||
| one input / one sp output with both global and per-input ECDH shares
|
||||
|-
|
||||
| Can Finalize
|
||||
| three sp outputs (different scan keys) with multiple global ECDH shares
|
||||
|-
|
||||
| Can Finalize
|
||||
| one P2WPKH input / two mixed outputs - labeled sp output and BIP 32 change
|
||||
|-
|
||||
| Can Finalize
|
||||
| one input / two sp outputs - output 0 has no label / output 1 uses label=0 convention for sp change
|
||||
|-
|
||||
| Can Finalize
|
||||
| two sp outputs - output 0 uses label=3 / output 1 uses label=1
|
||||
|-
|
||||
| Can Finalize
|
||||
| two inputs using per-input ECDH shares - only eligible inputs contribute shares (P2SH excluded)
|
||||
|-
|
||||
| Can Finalize
|
||||
| two inputs using global ECDH share - only eligible inputs contribute shares (P2SH excluded)
|
||||
|-
|
||||
| Can Finalize
|
||||
| two mixed input types - only eligible inputs contribute ECDH shares (NUMS internal key excluded)
|
||||
|-
|
||||
| Can Finalize
|
||||
| three sp outputs (same scan key) - each output has distinct k value
|
||||
|-
|
||||
| Can Finalize
|
||||
| three sp outputs (same scan key) / two regular outputs - k values assigned independently of output index
|
||||
|-
|
||||
| In Progress
|
||||
| two P2TR inputs, neither is signed
|
||||
|-
|
||||
| In Progress
|
||||
| one P2TR input / one sp output with no ECDH shares when PSBT_OUT_SCRIPT field is not set
|
||||
|-
|
||||
| In Progress
|
||||
| two inputs / one sp output, input 1 missing ECDH share when PSBT_OUT_SCRIPT field is not set
|
||||
|-
|
||||
| In Progress
|
||||
| one input / two sp outputs, input 0 missing ECDH share for output 0 when PSBT_OUT_SCRIPT field is not set
|
||||
|-
|
||||
| In Progress
|
||||
| large PSBT with nine mixed inputs / six outputs - some inputs signed
|
||||
|}
|
||||
|
||||
== Changelog ==
|
||||
|
||||
* '''0.1.1''' (2026-04-16):
|
||||
** Add test vectors and reference for validating Sending Silent Payments with PSBTs.
|
||||
* '''0.1.0''' (2025-01-13):
|
||||
** Initial version, merged as BIP-375.
|
||||
|
||||
==Rationale==
|
||||
|
||||
|
||||
32
bip-0375/README.md
Normal file
32
bip-0375/README.md
Normal file
@@ -0,0 +1,32 @@
|
||||
# BIP-375 Validation Reference
|
||||
|
||||
A reference validation implementation for BIP-375: Sending Silent Payments with PSBTs.
|
||||
|
||||
## Core Files
|
||||
- **`validator/bip352_crypto.py`** - Silent payment output script derivation
|
||||
- **`validator/inputs.py`** - PSBT input utility functions
|
||||
- **`validator/psbt_bip375.py`** - BIP-375 specific PSBT/PSBTMap extensions
|
||||
- **`validator/validate_psbt.py`** - Main BIP-375 validation functions
|
||||
- **`test_runner.py`** - Test infrastructure (executable)
|
||||
|
||||
## Dependencies
|
||||
- **`deps/bitcoin_test/psbt.py`** - Bitcoin test framework PSBT module - [PR #21283](https://github.com/bitcoin/bitcoin/pull/21283)
|
||||
- **`deps/bitcoin_test/messages.py`** - Bitcoin test framework primitives and message structures
|
||||
- **`deps/dleq.py`** - Reference DLEQ implementation from BIP-374
|
||||
- **`deps/secp256k1lab/`** - vendored copy of [secp256k1lab](https://github.com/secp256k1lab/secp256k1lab/commit/44dc4bd893b8f03e621585e3bf255253e0e0fbfb) library at version 1.0.0
|
||||
|
||||
## Testing
|
||||
|
||||
### Run Tests
|
||||
|
||||
```bash
|
||||
python test_runner.py # Run all tests
|
||||
python test_runner.py -v # Verbose mode with detailed validation status
|
||||
python test_runner.py -vv # More verbose with validation check failure reason
|
||||
|
||||
python test_runner.py -f vectors.json # Use custom test vector file
|
||||
```
|
||||
|
||||
### Generating Test Vectors
|
||||
|
||||
Test vectors were generated using [test_generator.py](https://github.com/macgyver13/bip375-test-generator/)
|
||||
1759
bip-0375/bip375_test_vectors.json
Normal file
1759
bip-0375/bip375_test_vectors.json
Normal file
File diff suppressed because one or more lines are too long
449
bip-0375/deps/bitcoin_test/messages.py
Normal file
449
bip-0375/deps/bitcoin_test/messages.py
Normal file
@@ -0,0 +1,449 @@
|
||||
#!/usr/bin/env python3
|
||||
# Copyright (c) 2010 ArtForz -- public domain half-a-node
|
||||
# Copyright (c) 2012 Jeff Garzik
|
||||
# Copyright (c) 2010-present The Bitcoin Core developers
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
"""Bitcoin test framework primitive and message structures
|
||||
|
||||
CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....:
|
||||
data structures that should map to corresponding structures in
|
||||
bitcoin/primitives
|
||||
|
||||
msg_block, msg_tx, msg_headers, etc.:
|
||||
data structures that represent network messages
|
||||
|
||||
ser_*, deser_*: functions that handle serialization/deserialization.
|
||||
|
||||
Classes use __slots__ to ensure extraneous attributes aren't accidentally added
|
||||
by tests, compromising their intended effect.
|
||||
"""
|
||||
|
||||
########################################################################
|
||||
# Adapted from Bitcoin Core test framework messages.py
|
||||
# for BIP-375 PSBT validation tests.
|
||||
########################################################################
|
||||
|
||||
import copy
|
||||
import hashlib
|
||||
import math
|
||||
from io import BytesIO
|
||||
|
||||
COIN = 100000000 # 1 btc in satoshis
|
||||
WITNESS_SCALE_FACTOR = 4
|
||||
|
||||
# ============================================================================
|
||||
# Serialization utilities
|
||||
# ============================================================================
|
||||
|
||||
def hash160(s: bytes) -> bytes:
|
||||
return hashlib.new("ripemd160", sha256(s)).digest()
|
||||
|
||||
|
||||
def sha256(s: bytes) -> bytes:
|
||||
return hashlib.sha256(s).digest()
|
||||
|
||||
|
||||
def hash256(s: bytes) -> bytes:
|
||||
return sha256(sha256(s))
|
||||
|
||||
|
||||
def ser_compact_size(l):
|
||||
r = b""
|
||||
if l < 253:
|
||||
r = l.to_bytes(1, "little")
|
||||
elif l < 0x10000:
|
||||
r = (253).to_bytes(1, "little") + l.to_bytes(2, "little")
|
||||
elif l < 0x100000000:
|
||||
r = (254).to_bytes(1, "little") + l.to_bytes(4, "little")
|
||||
else:
|
||||
r = (255).to_bytes(1, "little") + l.to_bytes(8, "little")
|
||||
return r
|
||||
|
||||
|
||||
def deser_compact_size(f):
|
||||
nit = int.from_bytes(f.read(1), "little")
|
||||
if nit == 253:
|
||||
nit = int.from_bytes(f.read(2), "little")
|
||||
elif nit == 254:
|
||||
nit = int.from_bytes(f.read(4), "little")
|
||||
elif nit == 255:
|
||||
nit = int.from_bytes(f.read(8), "little")
|
||||
return nit
|
||||
|
||||
|
||||
def ser_varint(l):
|
||||
r = b""
|
||||
while True:
|
||||
r = bytes([(l & 0x7f) | (0x80 if len(r) > 0 else 0x00)]) + r
|
||||
if l <= 0x7f:
|
||||
return r
|
||||
l = (l >> 7) - 1
|
||||
|
||||
|
||||
def deser_varint(f):
|
||||
n = 0
|
||||
while True:
|
||||
dat = f.read(1)[0]
|
||||
n = (n << 7) | (dat & 0x7f)
|
||||
if (dat & 0x80) > 0:
|
||||
n += 1
|
||||
else:
|
||||
return n
|
||||
|
||||
|
||||
def deser_string(f):
|
||||
nit = deser_compact_size(f)
|
||||
return f.read(nit)
|
||||
|
||||
|
||||
def ser_string(s):
|
||||
return ser_compact_size(len(s)) + s
|
||||
|
||||
|
||||
def deser_uint256(f):
|
||||
return int.from_bytes(f.read(32), 'little')
|
||||
|
||||
|
||||
def ser_uint256(u):
|
||||
return u.to_bytes(32, 'little')
|
||||
|
||||
|
||||
def uint256_from_str(s):
|
||||
return int.from_bytes(s[:32], 'little')
|
||||
|
||||
|
||||
def uint256_from_compact(c):
|
||||
nbytes = (c >> 24) & 0xFF
|
||||
v = (c & 0xFFFFFF) << (8 * (nbytes - 3))
|
||||
return v
|
||||
|
||||
|
||||
# deser_function_name: Allow for an alternate deserialization function on the
|
||||
# entries in the vector.
|
||||
def deser_vector(f, c, deser_function_name=None):
|
||||
nit = deser_compact_size(f)
|
||||
r = []
|
||||
for _ in range(nit):
|
||||
t = c()
|
||||
if deser_function_name:
|
||||
getattr(t, deser_function_name)(f)
|
||||
else:
|
||||
t.deserialize(f)
|
||||
r.append(t)
|
||||
return r
|
||||
|
||||
|
||||
# ser_function_name: Allow for an alternate serialization function on the
|
||||
# entries in the vector (we use this for serializing the vector of transactions
|
||||
# for a witness block).
|
||||
def ser_vector(l, ser_function_name=None):
|
||||
r = ser_compact_size(len(l))
|
||||
for i in l:
|
||||
if ser_function_name:
|
||||
r += getattr(i, ser_function_name)()
|
||||
else:
|
||||
r += i.serialize()
|
||||
return r
|
||||
|
||||
|
||||
def deser_uint256_vector(f):
|
||||
nit = deser_compact_size(f)
|
||||
r = []
|
||||
for _ in range(nit):
|
||||
t = deser_uint256(f)
|
||||
r.append(t)
|
||||
return r
|
||||
|
||||
|
||||
def ser_uint256_vector(l):
|
||||
r = ser_compact_size(len(l))
|
||||
for i in l:
|
||||
r += ser_uint256(i)
|
||||
return r
|
||||
|
||||
|
||||
def deser_string_vector(f):
|
||||
nit = deser_compact_size(f)
|
||||
r = []
|
||||
for _ in range(nit):
|
||||
t = deser_string(f)
|
||||
r.append(t)
|
||||
return r
|
||||
|
||||
|
||||
def ser_string_vector(l):
|
||||
r = ser_compact_size(len(l))
|
||||
for sv in l:
|
||||
r += ser_string(sv)
|
||||
return r
|
||||
|
||||
# like from_hex, but without the hex part
|
||||
def from_binary(cls, stream):
|
||||
"""deserialize a binary stream (or bytes object) into an object"""
|
||||
# handle bytes object by turning it into a stream
|
||||
was_bytes = isinstance(stream, bytes)
|
||||
if was_bytes:
|
||||
stream = BytesIO(stream)
|
||||
obj = cls()
|
||||
obj.deserialize(stream)
|
||||
if was_bytes:
|
||||
assert len(stream.read()) == 0
|
||||
return obj
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Transaction data structures
|
||||
# ============================================================================
|
||||
|
||||
class COutPoint:
|
||||
__slots__ = ("hash", "n")
|
||||
|
||||
def __init__(self, hash=0, n=0):
|
||||
self.hash = hash
|
||||
self.n = n
|
||||
|
||||
def deserialize(self, f):
|
||||
self.hash = deser_uint256(f)
|
||||
self.n = int.from_bytes(f.read(4), "little")
|
||||
|
||||
def serialize(self):
|
||||
r = b""
|
||||
r += ser_uint256(self.hash)
|
||||
r += self.n.to_bytes(4, "little")
|
||||
return r
|
||||
|
||||
def __repr__(self):
|
||||
return "COutPoint(hash=%064x n=%i)" % (self.hash, self.n)
|
||||
|
||||
class CTxIn:
|
||||
__slots__ = ("nSequence", "prevout", "scriptSig")
|
||||
|
||||
def __init__(self, outpoint=None, scriptSig=b"", nSequence=0):
|
||||
if outpoint is None:
|
||||
self.prevout = COutPoint()
|
||||
else:
|
||||
self.prevout = outpoint
|
||||
self.scriptSig = scriptSig
|
||||
self.nSequence = nSequence
|
||||
|
||||
def deserialize(self, f):
|
||||
self.prevout = COutPoint()
|
||||
self.prevout.deserialize(f)
|
||||
self.scriptSig = deser_string(f)
|
||||
self.nSequence = int.from_bytes(f.read(4), "little")
|
||||
|
||||
def serialize(self):
|
||||
r = b""
|
||||
r += self.prevout.serialize()
|
||||
r += ser_string(self.scriptSig)
|
||||
r += self.nSequence.to_bytes(4, "little")
|
||||
return r
|
||||
|
||||
def __repr__(self):
|
||||
return "CTxIn(prevout=%s scriptSig=%s nSequence=%i)" \
|
||||
% (repr(self.prevout), self.scriptSig.hex(),
|
||||
self.nSequence)
|
||||
|
||||
|
||||
class CTxOut:
|
||||
__slots__ = ("nValue", "scriptPubKey")
|
||||
|
||||
def __init__(self, nValue=0, scriptPubKey=b""):
|
||||
self.nValue = nValue
|
||||
self.scriptPubKey = scriptPubKey
|
||||
|
||||
def deserialize(self, f):
|
||||
self.nValue = int.from_bytes(f.read(8), "little", signed=True)
|
||||
self.scriptPubKey = deser_string(f)
|
||||
|
||||
def serialize(self):
|
||||
r = b""
|
||||
r += self.nValue.to_bytes(8, "little", signed=True)
|
||||
r += ser_string(self.scriptPubKey)
|
||||
return r
|
||||
|
||||
def __repr__(self):
|
||||
return "CTxOut(nValue=%i.%08i scriptPubKey=%s)" \
|
||||
% (self.nValue // COIN, self.nValue % COIN,
|
||||
self.scriptPubKey.hex())
|
||||
|
||||
|
||||
class CScriptWitness:
|
||||
__slots__ = ("stack",)
|
||||
|
||||
def __init__(self):
|
||||
# stack is a vector of strings
|
||||
self.stack = []
|
||||
|
||||
def __repr__(self):
|
||||
return "CScriptWitness(%s)" % \
|
||||
(",".join([x.hex() for x in self.stack]))
|
||||
|
||||
def is_null(self):
|
||||
if self.stack:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class CTxInWitness:
|
||||
__slots__ = ("scriptWitness",)
|
||||
|
||||
def __init__(self):
|
||||
self.scriptWitness = CScriptWitness()
|
||||
|
||||
def deserialize(self, f):
|
||||
self.scriptWitness.stack = deser_string_vector(f)
|
||||
|
||||
def serialize(self):
|
||||
return ser_string_vector(self.scriptWitness.stack)
|
||||
|
||||
def __repr__(self):
|
||||
return repr(self.scriptWitness)
|
||||
|
||||
def is_null(self):
|
||||
return self.scriptWitness.is_null()
|
||||
|
||||
|
||||
class CTxWitness:
|
||||
__slots__ = ("vtxinwit",)
|
||||
|
||||
def __init__(self):
|
||||
self.vtxinwit = []
|
||||
|
||||
def deserialize(self, f):
|
||||
for i in range(len(self.vtxinwit)):
|
||||
self.vtxinwit[i].deserialize(f)
|
||||
|
||||
def serialize(self):
|
||||
r = b""
|
||||
# This is different than the usual vector serialization --
|
||||
# we omit the length of the vector, which is required to be
|
||||
# the same length as the transaction's vin vector.
|
||||
for x in self.vtxinwit:
|
||||
r += x.serialize()
|
||||
return r
|
||||
|
||||
def __repr__(self):
|
||||
return "CTxWitness(%s)" % \
|
||||
(';'.join([repr(x) for x in self.vtxinwit]))
|
||||
|
||||
def is_null(self):
|
||||
for x in self.vtxinwit:
|
||||
if not x.is_null():
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class CTransaction:
|
||||
__slots__ = ("nLockTime", "version", "vin", "vout", "wit")
|
||||
|
||||
def __init__(self, tx=None):
|
||||
if tx is None:
|
||||
self.version = 2
|
||||
self.vin = []
|
||||
self.vout = []
|
||||
self.wit = CTxWitness()
|
||||
self.nLockTime = 0
|
||||
else:
|
||||
self.version = tx.version
|
||||
self.vin = copy.deepcopy(tx.vin)
|
||||
self.vout = copy.deepcopy(tx.vout)
|
||||
self.nLockTime = tx.nLockTime
|
||||
self.wit = copy.deepcopy(tx.wit)
|
||||
|
||||
def deserialize(self, f):
|
||||
self.version = int.from_bytes(f.read(4), "little")
|
||||
self.vin = deser_vector(f, CTxIn)
|
||||
flags = 0
|
||||
if len(self.vin) == 0:
|
||||
flags = int.from_bytes(f.read(1), "little")
|
||||
# Not sure why flags can't be zero, but this
|
||||
# matches the implementation in bitcoind
|
||||
if (flags != 0):
|
||||
self.vin = deser_vector(f, CTxIn)
|
||||
self.vout = deser_vector(f, CTxOut)
|
||||
else:
|
||||
self.vout = deser_vector(f, CTxOut)
|
||||
if flags != 0:
|
||||
self.wit.vtxinwit = [CTxInWitness() for _ in range(len(self.vin))]
|
||||
self.wit.deserialize(f)
|
||||
else:
|
||||
self.wit = CTxWitness()
|
||||
self.nLockTime = int.from_bytes(f.read(4), "little")
|
||||
|
||||
def serialize_without_witness(self):
|
||||
r = b""
|
||||
r += self.version.to_bytes(4, "little")
|
||||
r += ser_vector(self.vin)
|
||||
r += ser_vector(self.vout)
|
||||
r += self.nLockTime.to_bytes(4, "little")
|
||||
return r
|
||||
|
||||
# Only serialize with witness when explicitly called for
|
||||
def serialize_with_witness(self):
|
||||
flags = 0
|
||||
if not self.wit.is_null():
|
||||
flags |= 1
|
||||
r = b""
|
||||
r += self.version.to_bytes(4, "little")
|
||||
if flags:
|
||||
dummy = []
|
||||
r += ser_vector(dummy)
|
||||
r += flags.to_bytes(1, "little")
|
||||
r += ser_vector(self.vin)
|
||||
r += ser_vector(self.vout)
|
||||
if flags & 1:
|
||||
if (len(self.wit.vtxinwit) != len(self.vin)):
|
||||
# vtxinwit must have the same length as vin
|
||||
self.wit.vtxinwit = self.wit.vtxinwit[:len(self.vin)]
|
||||
for _ in range(len(self.wit.vtxinwit), len(self.vin)):
|
||||
self.wit.vtxinwit.append(CTxInWitness())
|
||||
r += self.wit.serialize()
|
||||
r += self.nLockTime.to_bytes(4, "little")
|
||||
return r
|
||||
|
||||
# Regular serialization is with witness -- must explicitly
|
||||
# call serialize_without_witness to exclude witness data.
|
||||
def serialize(self):
|
||||
return self.serialize_with_witness()
|
||||
|
||||
@property
|
||||
def wtxid_hex(self):
|
||||
"""Return wtxid (transaction hash with witness) as hex string."""
|
||||
return hash256(self.serialize())[::-1].hex()
|
||||
|
||||
@property
|
||||
def wtxid_int(self):
|
||||
"""Return wtxid (transaction hash with witness) as integer."""
|
||||
return uint256_from_str(hash256(self.serialize_with_witness()))
|
||||
|
||||
@property
|
||||
def txid_hex(self):
|
||||
"""Return txid (transaction hash without witness) as hex string."""
|
||||
return hash256(self.serialize_without_witness())[::-1].hex()
|
||||
|
||||
@property
|
||||
def txid_int(self):
|
||||
"""Return txid (transaction hash without witness) as integer."""
|
||||
return uint256_from_str(hash256(self.serialize_without_witness()))
|
||||
|
||||
def is_valid(self):
|
||||
for tout in self.vout:
|
||||
if tout.nValue < 0 or tout.nValue > 21000000 * COIN:
|
||||
return False
|
||||
return True
|
||||
|
||||
# Calculate the transaction weight using witness and non-witness
|
||||
# serialization size (does NOT use sigops).
|
||||
def get_weight(self):
|
||||
with_witness_size = len(self.serialize_with_witness())
|
||||
without_witness_size = len(self.serialize_without_witness())
|
||||
return (WITNESS_SCALE_FACTOR - 1) * without_witness_size + with_witness_size
|
||||
|
||||
def get_vsize(self):
|
||||
return math.ceil(self.get_weight() / WITNESS_SCALE_FACTOR)
|
||||
|
||||
def __repr__(self):
|
||||
return "CTransaction(version=%i vin=%s vout=%s wit=%s nLockTime=%i)" \
|
||||
% (self.version, repr(self.vin), repr(self.vout), repr(self.wit), self.nLockTime)
|
||||
197
bip-0375/deps/bitcoin_test/psbt.py
Normal file
197
bip-0375/deps/bitcoin_test/psbt.py
Normal file
@@ -0,0 +1,197 @@
|
||||
#!/usr/bin/env python3
|
||||
# Copyright (c) 2022-present The Bitcoin Core developers
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
########################################################################
|
||||
# Adapted from Bitcoin Core test framework psbt.py
|
||||
# for BIP-375 PSBT validation tests.
|
||||
########################################################################
|
||||
|
||||
import base64
|
||||
import struct
|
||||
|
||||
from io import BytesIO
|
||||
|
||||
from .messages import (
|
||||
CTransaction,
|
||||
deser_string,
|
||||
deser_compact_size,
|
||||
from_binary,
|
||||
ser_compact_size,
|
||||
)
|
||||
|
||||
|
||||
# global types
|
||||
PSBT_GLOBAL_UNSIGNED_TX = 0x00
|
||||
PSBT_GLOBAL_XPUB = 0x01
|
||||
PSBT_GLOBAL_TX_VERSION = 0x02
|
||||
PSBT_GLOBAL_FALLBACK_LOCKTIME = 0x03
|
||||
PSBT_GLOBAL_INPUT_COUNT = 0x04
|
||||
PSBT_GLOBAL_OUTPUT_COUNT = 0x05
|
||||
PSBT_GLOBAL_TX_MODIFIABLE = 0x06
|
||||
PSBT_GLOBAL_VERSION = 0xfb
|
||||
PSBT_GLOBAL_PROPRIETARY = 0xfc
|
||||
|
||||
# per-input types
|
||||
PSBT_IN_NON_WITNESS_UTXO = 0x00
|
||||
PSBT_IN_WITNESS_UTXO = 0x01
|
||||
PSBT_IN_PARTIAL_SIG = 0x02
|
||||
PSBT_IN_SIGHASH_TYPE = 0x03
|
||||
PSBT_IN_REDEEM_SCRIPT = 0x04
|
||||
PSBT_IN_WITNESS_SCRIPT = 0x05
|
||||
PSBT_IN_BIP32_DERIVATION = 0x06
|
||||
PSBT_IN_FINAL_SCRIPTSIG = 0x07
|
||||
PSBT_IN_FINAL_SCRIPTWITNESS = 0x08
|
||||
PSBT_IN_POR_COMMITMENT = 0x09
|
||||
PSBT_IN_RIPEMD160 = 0x0a
|
||||
PSBT_IN_SHA256 = 0x0b
|
||||
PSBT_IN_HASH160 = 0x0c
|
||||
PSBT_IN_HASH256 = 0x0d
|
||||
PSBT_IN_PREVIOUS_TXID = 0x0e
|
||||
PSBT_IN_OUTPUT_INDEX = 0x0f
|
||||
PSBT_IN_SEQUENCE = 0x10
|
||||
PSBT_IN_REQUIRED_TIME_LOCKTIME = 0x11
|
||||
PSBT_IN_REQUIRED_HEIGHT_LOCKTIME = 0x12
|
||||
PSBT_IN_TAP_KEY_SIG = 0x13
|
||||
PSBT_IN_TAP_SCRIPT_SIG = 0x14
|
||||
PSBT_IN_TAP_LEAF_SCRIPT = 0x15
|
||||
PSBT_IN_TAP_BIP32_DERIVATION = 0x16
|
||||
PSBT_IN_TAP_INTERNAL_KEY = 0x17
|
||||
PSBT_IN_TAP_MERKLE_ROOT = 0x18
|
||||
PSBT_IN_MUSIG2_PARTICIPANT_PUBKEYS = 0x1a
|
||||
PSBT_IN_MUSIG2_PUB_NONCE = 0x1b
|
||||
PSBT_IN_MUSIG2_PARTIAL_SIG = 0x1c
|
||||
PSBT_IN_PROPRIETARY = 0xfc
|
||||
|
||||
# per-output types
|
||||
PSBT_OUT_REDEEM_SCRIPT = 0x00
|
||||
PSBT_OUT_WITNESS_SCRIPT = 0x01
|
||||
PSBT_OUT_BIP32_DERIVATION = 0x02
|
||||
PSBT_OUT_AMOUNT = 0x03
|
||||
PSBT_OUT_SCRIPT = 0x04
|
||||
PSBT_OUT_TAP_INTERNAL_KEY = 0x05
|
||||
PSBT_OUT_TAP_TREE = 0x06
|
||||
PSBT_OUT_TAP_BIP32_DERIVATION = 0x07
|
||||
PSBT_OUT_MUSIG2_PARTICIPANT_PUBKEYS = 0x08
|
||||
PSBT_OUT_PROPRIETARY = 0xfc
|
||||
|
||||
|
||||
class PSBTMap:
|
||||
"""Class for serializing and deserializing PSBT maps"""
|
||||
|
||||
def __init__(self, map=None):
|
||||
self.map = map if map is not None else {}
|
||||
|
||||
def deserialize(self, f):
|
||||
m = {}
|
||||
while True:
|
||||
k = deser_string(f)
|
||||
if len(k) == 0:
|
||||
break
|
||||
v = deser_string(f)
|
||||
if len(k) == 1:
|
||||
k = k[0]
|
||||
assert k not in m
|
||||
m[k] = v
|
||||
self.map = m
|
||||
|
||||
def serialize(self):
|
||||
m = b""
|
||||
for k,v in self.map.items():
|
||||
if isinstance(k, int) and 0 <= k and k <= 255:
|
||||
k = bytes([k])
|
||||
if isinstance(v, list):
|
||||
assert all(type(elem) is bytes for elem in v)
|
||||
v = b"".join(v) # simply concatenate the byte-strings w/o size prefixes
|
||||
m += ser_compact_size(len(k)) + k
|
||||
m += ser_compact_size(len(v)) + v
|
||||
m += b"\x00"
|
||||
return m
|
||||
|
||||
class PSBT:
|
||||
"""Class for serializing and deserializing PSBTs"""
|
||||
|
||||
def __init__(self, *, g=None, i=None, o=None):
|
||||
self.g = g if g is not None else PSBTMap()
|
||||
self.i = i if i is not None else []
|
||||
self.o = o if o is not None else []
|
||||
self.in_count = len(i) if i is not None else None
|
||||
self.out_count = len(o) if o is not None else None
|
||||
self.version = None
|
||||
|
||||
def deserialize(self, f):
|
||||
assert f.read(5) == b"psbt\xff"
|
||||
self.g = from_binary(PSBTMap, f)
|
||||
|
||||
self.version = 0
|
||||
if PSBT_GLOBAL_VERSION in self.g.map:
|
||||
assert PSBT_GLOBAL_INPUT_COUNT in self.g.map
|
||||
assert PSBT_GLOBAL_OUTPUT_COUNT in self.g.map
|
||||
self.version = struct.unpack("<I", self.g.map[PSBT_GLOBAL_VERSION])[0]
|
||||
assert self.version in [0, 2]
|
||||
if self.version == 2:
|
||||
self.in_count = deser_compact_size(BytesIO(self.g.map[PSBT_GLOBAL_INPUT_COUNT]))
|
||||
self.out_count = deser_compact_size(BytesIO(self.g.map[PSBT_GLOBAL_OUTPUT_COUNT]))
|
||||
else:
|
||||
assert PSBT_GLOBAL_UNSIGNED_TX in self.g.map
|
||||
tx = from_binary(CTransaction, self.g.map[PSBT_GLOBAL_UNSIGNED_TX])
|
||||
self.in_count = len(tx.vin)
|
||||
self.out_count = len(tx.vout)
|
||||
|
||||
self.i = [from_binary(PSBTMap, f) for _ in range(self.in_count)]
|
||||
self.o = [from_binary(PSBTMap, f) for _ in range(self.out_count)]
|
||||
return self
|
||||
|
||||
def serialize(self):
|
||||
assert isinstance(self.g, PSBTMap)
|
||||
assert isinstance(self.i, list) and all(isinstance(x, PSBTMap) for x in self.i)
|
||||
assert isinstance(self.o, list) and all(isinstance(x, PSBTMap) for x in self.o)
|
||||
if self.version is not None and self.version == 2:
|
||||
self.g.map[PSBT_GLOBAL_INPUT_COUNT] = ser_compact_size(len(self.i))
|
||||
self.g.map[PSBT_GLOBAL_OUTPUT_COUNT] = ser_compact_size(len(self.o))
|
||||
|
||||
psbt = [x.serialize() for x in [self.g] + self.i + self.o]
|
||||
return b"psbt\xff" + b"".join(psbt)
|
||||
|
||||
def make_blank(self):
|
||||
"""
|
||||
Remove all fields except for required fields depending on version
|
||||
"""
|
||||
if self.version == 0:
|
||||
for m in self.i + self.o:
|
||||
m.map.clear()
|
||||
|
||||
self.g = PSBTMap(map={PSBT_GLOBAL_UNSIGNED_TX: self.g.map[PSBT_GLOBAL_UNSIGNED_TX]})
|
||||
elif self.version == 2:
|
||||
self.g = PSBTMap(map={
|
||||
PSBT_GLOBAL_TX_VERSION: self.g.map[PSBT_GLOBAL_TX_VERSION],
|
||||
PSBT_GLOBAL_INPUT_COUNT: self.g.map[PSBT_GLOBAL_INPUT_COUNT],
|
||||
PSBT_GLOBAL_OUTPUT_COUNT: self.g.map[PSBT_GLOBAL_OUTPUT_COUNT],
|
||||
PSBT_GLOBAL_VERSION: self.g.map[PSBT_GLOBAL_VERSION],
|
||||
})
|
||||
|
||||
new_i = []
|
||||
for m in self.i:
|
||||
new_i.append(PSBTMap(map={
|
||||
PSBT_IN_PREVIOUS_TXID: m.map[PSBT_IN_PREVIOUS_TXID],
|
||||
PSBT_IN_OUTPUT_INDEX: m.map[PSBT_IN_OUTPUT_INDEX],
|
||||
}))
|
||||
self.i = new_i
|
||||
|
||||
new_o = []
|
||||
for m in self.o:
|
||||
new_o.append(PSBTMap(map={
|
||||
PSBT_OUT_SCRIPT: m.map[PSBT_OUT_SCRIPT],
|
||||
PSBT_OUT_AMOUNT: m.map[PSBT_OUT_AMOUNT],
|
||||
}))
|
||||
self.o = new_o
|
||||
else:
|
||||
assert False
|
||||
|
||||
def to_base64(self):
|
||||
return base64.b64encode(self.serialize()).decode("utf8")
|
||||
|
||||
@classmethod
|
||||
def from_base64(cls, b64psbt):
|
||||
return from_binary(cls, base64.b64decode(b64psbt))
|
||||
86
bip-0375/deps/dleq.py
Normal file
86
bip-0375/deps/dleq.py
Normal file
@@ -0,0 +1,86 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Handle DLEQ proof generation and verification
|
||||
|
||||
Adapted from bip-0374 reference.py
|
||||
"""
|
||||
|
||||
from secp256k1lab.secp256k1 import G, GE
|
||||
from secp256k1lab.util import tagged_hash, xor_bytes
|
||||
|
||||
|
||||
DLEQ_TAG_AUX = "BIP0374/aux"
|
||||
DLEQ_TAG_NONCE = "BIP0374/nonce"
|
||||
DLEQ_TAG_CHALLENGE = "BIP0374/challenge"
|
||||
|
||||
|
||||
def dleq_challenge(
|
||||
A: GE, B: GE, C: GE, R1: GE, R2: GE, m: bytes | None, G: GE,
|
||||
) -> int:
|
||||
if m is not None:
|
||||
assert len(m) == 32
|
||||
m = bytes([]) if m is None else m
|
||||
return int.from_bytes(
|
||||
tagged_hash(
|
||||
DLEQ_TAG_CHALLENGE,
|
||||
A.to_bytes_compressed()
|
||||
+ B.to_bytes_compressed()
|
||||
+ C.to_bytes_compressed()
|
||||
+ G.to_bytes_compressed()
|
||||
+ R1.to_bytes_compressed()
|
||||
+ R2.to_bytes_compressed()
|
||||
+ m,
|
||||
),
|
||||
"big",
|
||||
)
|
||||
|
||||
|
||||
def dleq_generate_proof(
|
||||
a: int, B: GE, r: bytes, G: GE = G, m: bytes | None = None
|
||||
) -> bytes | None:
|
||||
assert len(r) == 32
|
||||
if not (0 < a < GE.ORDER):
|
||||
return None
|
||||
if B.infinity:
|
||||
return None
|
||||
if m is not None:
|
||||
assert len(m) == 32
|
||||
A = a * G
|
||||
C = a * B
|
||||
t = xor_bytes(a.to_bytes(32, "big"), tagged_hash(DLEQ_TAG_AUX, r))
|
||||
m_prime = bytes([]) if m is None else m
|
||||
rand = tagged_hash(
|
||||
DLEQ_TAG_NONCE, t + A.to_bytes_compressed() + C.to_bytes_compressed() + m_prime
|
||||
)
|
||||
k = int.from_bytes(rand, "big") % GE.ORDER
|
||||
if k == 0:
|
||||
return None
|
||||
R1 = k * G
|
||||
R2 = k * B
|
||||
e = dleq_challenge(A, B, C, R1, R2, m, G)
|
||||
s = (k + e * a) % GE.ORDER
|
||||
proof = e.to_bytes(32, "big") + s.to_bytes(32, "big")
|
||||
if not dleq_verify_proof(A, B, C, proof, G=G, m=m):
|
||||
return None
|
||||
return proof
|
||||
|
||||
|
||||
def dleq_verify_proof(
|
||||
A: GE, B: GE, C: GE, proof: bytes, G: GE = G, m: bytes | None = None
|
||||
) -> bool:
|
||||
if A.infinity or B.infinity or C.infinity or G.infinity:
|
||||
return False
|
||||
assert len(proof) == 64
|
||||
e = int.from_bytes(proof[:32], "big")
|
||||
s = int.from_bytes(proof[32:], "big")
|
||||
if s >= GE.ORDER:
|
||||
return False
|
||||
R1 = s * G - e * A
|
||||
if R1.infinity:
|
||||
return False
|
||||
R2 = s * B - e * C
|
||||
if R2.infinity:
|
||||
return False
|
||||
if e != dleq_challenge(A, B, C, R1, R2, m, G):
|
||||
return False
|
||||
return True
|
||||
17
bip-0375/deps/secp256k1lab/.github/workflows/main.yml
vendored
Normal file
17
bip-0375/deps/secp256k1lab/.github/workflows/main.yml
vendored
Normal file
@@ -0,0 +1,17 @@
|
||||
name: Tests
|
||||
on: [push, pull_request]
|
||||
jobs:
|
||||
ruff:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install the latest version of uv
|
||||
uses: astral-sh/setup-uv@v5
|
||||
- run: uvx ruff check .
|
||||
mypy:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install the latest version of uv
|
||||
uses: astral-sh/setup-uv@v5
|
||||
- run: uvx mypy .
|
||||
1
bip-0375/deps/secp256k1lab/.python-version
Normal file
1
bip-0375/deps/secp256k1lab/.python-version
Normal file
@@ -0,0 +1 @@
|
||||
3.9
|
||||
10
bip-0375/deps/secp256k1lab/CHANGELOG.md
Normal file
10
bip-0375/deps/secp256k1lab/CHANGELOG.md
Normal file
@@ -0,0 +1,10 @@
|
||||
# Changelog
|
||||
|
||||
All notable changes to this project will be documented in this file.
|
||||
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [1.0.0] - 2025-03-31
|
||||
|
||||
Initial release.
|
||||
23
bip-0375/deps/secp256k1lab/COPYING
Normal file
23
bip-0375/deps/secp256k1lab/COPYING
Normal file
@@ -0,0 +1,23 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2009-2024 The Bitcoin Core developers
|
||||
Copyright (c) 2009-2024 Bitcoin Developers
|
||||
Copyright (c) 2025- The secp256k1lab Developers
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
13
bip-0375/deps/secp256k1lab/README.md
Normal file
13
bip-0375/deps/secp256k1lab/README.md
Normal file
@@ -0,0 +1,13 @@
|
||||
secp256k1lab
|
||||
============
|
||||
|
||||

|
||||
|
||||
An INSECURE implementation of the secp256k1 elliptic curve and related cryptographic schemes written in Python, intended for prototyping, experimentation and education.
|
||||
|
||||
Features:
|
||||
* Low-level secp256k1 field and group arithmetic.
|
||||
* Schnorr signing/verification and key generation according to [BIP-340](https://github.com/bitcoin/bips/blob/master/bip-0340.mediawiki).
|
||||
* ECDH key exchange.
|
||||
|
||||
WARNING: The code in this library is slow and trivially vulnerable to side channel attacks.
|
||||
34
bip-0375/deps/secp256k1lab/pyproject.toml
Normal file
34
bip-0375/deps/secp256k1lab/pyproject.toml
Normal file
@@ -0,0 +1,34 @@
|
||||
[project]
|
||||
name = "secp256k1lab"
|
||||
version = "1.0.0"
|
||||
description = "An INSECURE implementation of the secp256k1 elliptic curve and related cryptographic schemes, intended for prototyping, experimentation and education"
|
||||
readme = "README.md"
|
||||
authors = [
|
||||
{ name = "Pieter Wuille", email = "pieter@wuille.net" },
|
||||
{ name = "Tim Ruffing", email = "me@real-or-random.org" },
|
||||
{ name = "Jonas Nick", email = "jonasd.nick@gmail.com" },
|
||||
{ name = "Sebastian Falbesoner", email = "sebastian.falbesoner@gmail.com" }
|
||||
]
|
||||
maintainers = [
|
||||
{ name = "Tim Ruffing", email = "me@real-or-random.org" },
|
||||
{ name = "Jonas Nick", email = "jonasd.nick@gmail.com" },
|
||||
{ name = "Sebastian Falbesoner", email = "sebastian.falbesoner@gmail.com" }
|
||||
]
|
||||
requires-python = ">=3.9"
|
||||
license = "MIT"
|
||||
license-files = ["COPYING"]
|
||||
keywords = ["secp256k1", "elliptic curves", "cryptography", "Bitcoin"]
|
||||
classifiers = [
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
"Intended Audience :: Developers",
|
||||
"Intended Audience :: Education",
|
||||
"Intended Audience :: Science/Research",
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Programming Language :: Python",
|
||||
"Topic :: Security :: Cryptography",
|
||||
]
|
||||
dependencies = []
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
73
bip-0375/deps/secp256k1lab/src/secp256k1lab/bip340.py
Normal file
73
bip-0375/deps/secp256k1lab/src/secp256k1lab/bip340.py
Normal file
@@ -0,0 +1,73 @@
|
||||
# The following functions are based on the BIP 340 reference implementation:
|
||||
# https://github.com/bitcoin/bips/blob/master/bip-0340/reference.py
|
||||
|
||||
from .secp256k1 import FE, GE, G
|
||||
from .util import int_from_bytes, bytes_from_int, xor_bytes, tagged_hash
|
||||
|
||||
|
||||
def pubkey_gen(seckey: bytes) -> bytes:
|
||||
d0 = int_from_bytes(seckey)
|
||||
if not (1 <= d0 <= GE.ORDER - 1):
|
||||
raise ValueError("The secret key must be an integer in the range 1..n-1.")
|
||||
P = d0 * G
|
||||
assert not P.infinity
|
||||
return P.to_bytes_xonly()
|
||||
|
||||
|
||||
def schnorr_sign(
|
||||
msg: bytes, seckey: bytes, aux_rand: bytes, tag_prefix: str = "BIP0340"
|
||||
) -> bytes:
|
||||
d0 = int_from_bytes(seckey)
|
||||
if not (1 <= d0 <= GE.ORDER - 1):
|
||||
raise ValueError("The secret key must be an integer in the range 1..n-1.")
|
||||
if len(aux_rand) != 32:
|
||||
raise ValueError("aux_rand must be 32 bytes instead of %i." % len(aux_rand))
|
||||
P = d0 * G
|
||||
assert not P.infinity
|
||||
d = d0 if P.has_even_y() else GE.ORDER - d0
|
||||
t = xor_bytes(bytes_from_int(d), tagged_hash(tag_prefix + "/aux", aux_rand))
|
||||
k0 = (
|
||||
int_from_bytes(tagged_hash(tag_prefix + "/nonce", t + P.to_bytes_xonly() + msg))
|
||||
% GE.ORDER
|
||||
)
|
||||
if k0 == 0:
|
||||
raise RuntimeError("Failure. This happens only with negligible probability.")
|
||||
R = k0 * G
|
||||
assert not R.infinity
|
||||
k = k0 if R.has_even_y() else GE.ORDER - k0
|
||||
e = (
|
||||
int_from_bytes(
|
||||
tagged_hash(
|
||||
tag_prefix + "/challenge", R.to_bytes_xonly() + P.to_bytes_xonly() + msg
|
||||
)
|
||||
)
|
||||
% GE.ORDER
|
||||
)
|
||||
sig = R.to_bytes_xonly() + bytes_from_int((k + e * d) % GE.ORDER)
|
||||
assert schnorr_verify(msg, P.to_bytes_xonly(), sig, tag_prefix=tag_prefix)
|
||||
return sig
|
||||
|
||||
|
||||
def schnorr_verify(
|
||||
msg: bytes, pubkey: bytes, sig: bytes, tag_prefix: str = "BIP0340"
|
||||
) -> bool:
|
||||
if len(pubkey) != 32:
|
||||
raise ValueError("The public key must be a 32-byte array.")
|
||||
if len(sig) != 64:
|
||||
raise ValueError("The signature must be a 64-byte array.")
|
||||
try:
|
||||
P = GE.from_bytes_xonly(pubkey)
|
||||
except ValueError:
|
||||
return False
|
||||
r = int_from_bytes(sig[0:32])
|
||||
s = int_from_bytes(sig[32:64])
|
||||
if (r >= FE.SIZE) or (s >= GE.ORDER):
|
||||
return False
|
||||
e = (
|
||||
int_from_bytes(tagged_hash(tag_prefix + "/challenge", sig[0:32] + pubkey + msg))
|
||||
% GE.ORDER
|
||||
)
|
||||
R = s * G - e * P
|
||||
if R.infinity or (not R.has_even_y()) or (R.x != r):
|
||||
return False
|
||||
return True
|
||||
16
bip-0375/deps/secp256k1lab/src/secp256k1lab/ecdh.py
Normal file
16
bip-0375/deps/secp256k1lab/src/secp256k1lab/ecdh.py
Normal file
@@ -0,0 +1,16 @@
|
||||
import hashlib
|
||||
|
||||
from .secp256k1 import GE, Scalar
|
||||
|
||||
|
||||
def ecdh_compressed_in_raw_out(seckey: bytes, pubkey: bytes) -> GE:
|
||||
"""TODO"""
|
||||
shared_secret = Scalar.from_bytes_checked(seckey) * GE.from_bytes_compressed(pubkey)
|
||||
assert not shared_secret.infinity # prime-order group
|
||||
return shared_secret
|
||||
|
||||
|
||||
def ecdh_libsecp256k1(seckey: bytes, pubkey: bytes) -> bytes:
|
||||
"""TODO"""
|
||||
shared_secret = ecdh_compressed_in_raw_out(seckey, pubkey)
|
||||
return hashlib.sha256(shared_secret.to_bytes_compressed()).digest()
|
||||
15
bip-0375/deps/secp256k1lab/src/secp256k1lab/keys.py
Normal file
15
bip-0375/deps/secp256k1lab/src/secp256k1lab/keys.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from .secp256k1 import GE, G
|
||||
from .util import int_from_bytes
|
||||
|
||||
# The following function is based on the BIP 327 reference implementation
|
||||
# https://github.com/bitcoin/bips/blob/master/bip-0327/reference.py
|
||||
|
||||
|
||||
# Return the plain public key corresponding to a given secret key
|
||||
def pubkey_gen_plain(seckey: bytes) -> bytes:
|
||||
d0 = int_from_bytes(seckey)
|
||||
if not (1 <= d0 <= GE.ORDER - 1):
|
||||
raise ValueError("The secret key must be an integer in the range 1..n-1.")
|
||||
P = d0 * G
|
||||
assert not P.infinity
|
||||
return P.to_bytes_compressed()
|
||||
454
bip-0375/deps/secp256k1lab/src/secp256k1lab/secp256k1.py
Normal file
454
bip-0375/deps/secp256k1lab/src/secp256k1lab/secp256k1.py
Normal file
@@ -0,0 +1,454 @@
|
||||
# Copyright (c) 2022-2023 The Bitcoin Core developers
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
"""Test-only implementation of low-level secp256k1 field and group arithmetic
|
||||
|
||||
It is designed for ease of understanding, not performance.
|
||||
|
||||
WARNING: This code is slow and trivially vulnerable to side channel attacks. Do not use for
|
||||
anything but tests.
|
||||
|
||||
Exports:
|
||||
* FE: class for secp256k1 field elements
|
||||
* GE: class for secp256k1 group elements
|
||||
* G: the secp256k1 generator point
|
||||
"""
|
||||
|
||||
# TODO Docstrings of methods still say "field element"
|
||||
class APrimeFE:
|
||||
"""Objects of this class represent elements of a prime field.
|
||||
|
||||
They are represented internally in numerator / denominator form, in order to delay inversions.
|
||||
"""
|
||||
|
||||
# The size of the field (also its modulus and characteristic).
|
||||
SIZE: int
|
||||
|
||||
def __init__(self, a=0, b=1):
|
||||
"""Initialize a field element a/b; both a and b can be ints or field elements."""
|
||||
if isinstance(a, type(self)):
|
||||
num = a._num
|
||||
den = a._den
|
||||
else:
|
||||
num = a % self.SIZE
|
||||
den = 1
|
||||
if isinstance(b, type(self)):
|
||||
den = (den * b._num) % self.SIZE
|
||||
num = (num * b._den) % self.SIZE
|
||||
else:
|
||||
den = (den * b) % self.SIZE
|
||||
assert den != 0
|
||||
if num == 0:
|
||||
den = 1
|
||||
self._num = num
|
||||
self._den = den
|
||||
|
||||
def __add__(self, a):
|
||||
"""Compute the sum of two field elements (second may be int)."""
|
||||
if isinstance(a, type(self)):
|
||||
return type(self)(self._num * a._den + self._den * a._num, self._den * a._den)
|
||||
if isinstance(a, int):
|
||||
return type(self)(self._num + self._den * a, self._den)
|
||||
return NotImplemented
|
||||
|
||||
def __radd__(self, a):
|
||||
"""Compute the sum of an integer and a field element."""
|
||||
return type(self)(a) + self
|
||||
|
||||
@classmethod
|
||||
# REVIEW This should be
|
||||
# def sum(cls, *es: Iterable[Self]) -> Self:
|
||||
# but Self needs the typing_extension package on Python <= 3.12.
|
||||
def sum(cls, *es):
|
||||
"""Compute the sum of field elements.
|
||||
|
||||
sum(a, b, c, ...) is identical to (0 + a + b + c + ...)."""
|
||||
return sum(es, start=cls(0))
|
||||
|
||||
def __sub__(self, a):
|
||||
"""Compute the difference of two field elements (second may be int)."""
|
||||
if isinstance(a, type(self)):
|
||||
return type(self)(self._num * a._den - self._den * a._num, self._den * a._den)
|
||||
if isinstance(a, int):
|
||||
return type(self)(self._num - self._den * a, self._den)
|
||||
return NotImplemented
|
||||
|
||||
def __rsub__(self, a):
|
||||
"""Compute the difference of an integer and a field element."""
|
||||
return type(self)(a) - self
|
||||
|
||||
def __mul__(self, a):
|
||||
"""Compute the product of two field elements (second may be int)."""
|
||||
if isinstance(a, type(self)):
|
||||
return type(self)(self._num * a._num, self._den * a._den)
|
||||
if isinstance(a, int):
|
||||
return type(self)(self._num * a, self._den)
|
||||
return NotImplemented
|
||||
|
||||
def __rmul__(self, a):
|
||||
"""Compute the product of an integer with a field element."""
|
||||
return type(self)(a) * self
|
||||
|
||||
def __truediv__(self, a):
|
||||
"""Compute the ratio of two field elements (second may be int)."""
|
||||
if isinstance(a, type(self)) or isinstance(a, int):
|
||||
return type(self)(self, a)
|
||||
return NotImplemented
|
||||
|
||||
def __pow__(self, a):
|
||||
"""Raise a field element to an integer power."""
|
||||
return type(self)(pow(self._num, a, self.SIZE), pow(self._den, a, self.SIZE))
|
||||
|
||||
def __neg__(self):
|
||||
"""Negate a field element."""
|
||||
return type(self)(-self._num, self._den)
|
||||
|
||||
def __int__(self):
|
||||
"""Convert a field element to an integer in range 0..SIZE-1. The result is cached."""
|
||||
if self._den != 1:
|
||||
self._num = (self._num * pow(self._den, -1, self.SIZE)) % self.SIZE
|
||||
self._den = 1
|
||||
return self._num
|
||||
|
||||
def sqrt(self):
|
||||
"""Compute the square root of a field element if it exists (None otherwise)."""
|
||||
raise NotImplementedError
|
||||
|
||||
def is_square(self):
|
||||
"""Determine if this field element has a square root."""
|
||||
# A more efficient algorithm is possible here (Jacobi symbol).
|
||||
return self.sqrt() is not None
|
||||
|
||||
def is_even(self):
|
||||
"""Determine whether this field element, represented as integer in 0..SIZE-1, is even."""
|
||||
return int(self) & 1 == 0
|
||||
|
||||
def __eq__(self, a):
|
||||
"""Check whether two field elements are equal (second may be an int)."""
|
||||
if isinstance(a, type(self)):
|
||||
return (self._num * a._den - self._den * a._num) % self.SIZE == 0
|
||||
return (self._num - self._den * a) % self.SIZE == 0
|
||||
|
||||
def to_bytes(self):
|
||||
"""Convert a field element to a 32-byte array (BE byte order)."""
|
||||
return int(self).to_bytes(32, 'big')
|
||||
|
||||
@classmethod
|
||||
def from_int_checked(cls, v):
|
||||
"""Convert an integer to a field element (no overflow allowed)."""
|
||||
if v >= cls.SIZE:
|
||||
raise ValueError
|
||||
return cls(v)
|
||||
|
||||
@classmethod
|
||||
def from_int_wrapping(cls, v):
|
||||
"""Convert an integer to a field element (reduced modulo SIZE)."""
|
||||
return cls(v % cls.SIZE)
|
||||
|
||||
@classmethod
|
||||
def from_bytes_checked(cls, b):
|
||||
"""Convert a 32-byte array to a field element (BE byte order, no overflow allowed)."""
|
||||
v = int.from_bytes(b, 'big')
|
||||
return cls.from_int_checked(v)
|
||||
|
||||
@classmethod
|
||||
def from_bytes_wrapping(cls, b):
|
||||
"""Convert a 32-byte array to a field element (BE byte order, reduced modulo SIZE)."""
|
||||
v = int.from_bytes(b, 'big')
|
||||
return cls.from_int_wrapping(v)
|
||||
|
||||
def __str__(self):
|
||||
"""Convert this field element to a 64 character hex string."""
|
||||
return f"{int(self):064x}"
|
||||
|
||||
def __repr__(self):
|
||||
"""Get a string representation of this field element."""
|
||||
return f"{type(self).__qualname__}(0x{int(self):x})"
|
||||
|
||||
|
||||
class FE(APrimeFE):
|
||||
SIZE = 2**256 - 2**32 - 977
|
||||
|
||||
def sqrt(self):
|
||||
# Due to the fact that our modulus p is of the form (p % 4) == 3, the Tonelli-Shanks
|
||||
# algorithm (https://en.wikipedia.org/wiki/Tonelli-Shanks_algorithm) is simply
|
||||
# raising the argument to the power (p + 1) / 4.
|
||||
|
||||
# To see why: (p-1) % 2 = 0, so 2 divides the order of the multiplicative group,
|
||||
# and thus only half of the non-zero field elements are squares. An element a is
|
||||
# a (nonzero) square when Euler's criterion, a^((p-1)/2) = 1 (mod p), holds. We're
|
||||
# looking for x such that x^2 = a (mod p). Given a^((p-1)/2) = 1, that is equivalent
|
||||
# to x^2 = a^(1 + (p-1)/2) mod p. As (1 + (p-1)/2) is even, this is equivalent to
|
||||
# x = a^((1 + (p-1)/2)/2) mod p, or x = a^((p+1)/4) mod p.
|
||||
v = int(self)
|
||||
s = pow(v, (self.SIZE + 1) // 4, self.SIZE)
|
||||
if s**2 % self.SIZE == v:
|
||||
return type(self)(s)
|
||||
return None
|
||||
|
||||
|
||||
class Scalar(APrimeFE):
|
||||
"""TODO Docstring"""
|
||||
SIZE = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
|
||||
|
||||
|
||||
class GE:
|
||||
"""Objects of this class represent secp256k1 group elements (curve points or infinity)
|
||||
|
||||
GE objects are immutable.
|
||||
|
||||
Normal points on the curve have fields:
|
||||
* x: the x coordinate (a field element)
|
||||
* y: the y coordinate (a field element, satisfying y^2 = x^3 + 7)
|
||||
* infinity: False
|
||||
|
||||
The point at infinity has field:
|
||||
* infinity: True
|
||||
"""
|
||||
|
||||
# TODO The following two class attributes should probably be just getters as
|
||||
# classmethods to enforce immutability. Unfortunately Python makes it hard
|
||||
# to create "classproperties". `G` could then also be just a classmethod.
|
||||
|
||||
# Order of the group (number of points on the curve, plus 1 for infinity)
|
||||
ORDER = Scalar.SIZE
|
||||
|
||||
# Number of valid distinct x coordinates on the curve.
|
||||
ORDER_HALF = ORDER // 2
|
||||
|
||||
@property
|
||||
def infinity(self):
|
||||
"""Whether the group element is the point at infinity."""
|
||||
return self._infinity
|
||||
|
||||
@property
|
||||
def x(self):
|
||||
"""The x coordinate (a field element) of a non-infinite group element."""
|
||||
assert not self.infinity
|
||||
return self._x
|
||||
|
||||
@property
|
||||
def y(self):
|
||||
"""The y coordinate (a field element) of a non-infinite group element."""
|
||||
assert not self.infinity
|
||||
return self._y
|
||||
|
||||
def __init__(self, x=None, y=None):
|
||||
"""Initialize a group element with specified x and y coordinates, or infinity."""
|
||||
if x is None:
|
||||
# Initialize as infinity.
|
||||
assert y is None
|
||||
self._infinity = True
|
||||
else:
|
||||
# Initialize as point on the curve (and check that it is).
|
||||
fx = FE(x)
|
||||
fy = FE(y)
|
||||
assert fy**2 == fx**3 + 7
|
||||
self._infinity = False
|
||||
self._x = fx
|
||||
self._y = fy
|
||||
|
||||
def __add__(self, a):
|
||||
"""Add two group elements together."""
|
||||
# Deal with infinity: a + infinity == infinity + a == a.
|
||||
if self.infinity:
|
||||
return a
|
||||
if a.infinity:
|
||||
return self
|
||||
if self.x == a.x:
|
||||
if self.y != a.y:
|
||||
# A point added to its own negation is infinity.
|
||||
assert self.y + a.y == 0
|
||||
return GE()
|
||||
else:
|
||||
# For identical inputs, use the tangent (doubling formula).
|
||||
lam = (3 * self.x**2) / (2 * self.y)
|
||||
else:
|
||||
# For distinct inputs, use the line through both points (adding formula).
|
||||
lam = (self.y - a.y) / (self.x - a.x)
|
||||
# Determine point opposite to the intersection of that line with the curve.
|
||||
x = lam**2 - (self.x + a.x)
|
||||
y = lam * (self.x - x) - self.y
|
||||
return GE(x, y)
|
||||
|
||||
@staticmethod
|
||||
def sum(*ps):
|
||||
"""Compute the sum of group elements.
|
||||
|
||||
GE.sum(a, b, c, ...) is identical to (GE() + a + b + c + ...)."""
|
||||
return sum(ps, start=GE())
|
||||
|
||||
@staticmethod
|
||||
def batch_mul(*aps):
|
||||
"""Compute a (batch) scalar group element multiplication.
|
||||
|
||||
GE.batch_mul((a1, p1), (a2, p2), (a3, p3)) is identical to a1*p1 + a2*p2 + a3*p3,
|
||||
but more efficient."""
|
||||
# Reduce all the scalars modulo order first (so we can deal with negatives etc).
|
||||
naps = [(int(a), p) for a, p in aps]
|
||||
# Start with point at infinity.
|
||||
r = GE()
|
||||
# Iterate over all bit positions, from high to low.
|
||||
for i in range(255, -1, -1):
|
||||
# Double what we have so far.
|
||||
r = r + r
|
||||
# Add then add the points for which the corresponding scalar bit is set.
|
||||
for (a, p) in naps:
|
||||
if (a >> i) & 1:
|
||||
r += p
|
||||
return r
|
||||
|
||||
def __rmul__(self, a):
|
||||
"""Multiply an integer with a group element."""
|
||||
if self == G:
|
||||
return FAST_G.mul(Scalar(a))
|
||||
return GE.batch_mul((Scalar(a), self))
|
||||
|
||||
def __neg__(self):
|
||||
"""Compute the negation of a group element."""
|
||||
if self.infinity:
|
||||
return self
|
||||
return GE(self.x, -self.y)
|
||||
|
||||
def __sub__(self, a):
|
||||
"""Subtract a group element from another."""
|
||||
return self + (-a)
|
||||
|
||||
def __eq__(self, a):
|
||||
"""Check if two group elements are equal."""
|
||||
return (self - a).infinity
|
||||
|
||||
def has_even_y(self):
|
||||
"""Determine whether a non-infinity group element has an even y coordinate."""
|
||||
assert not self.infinity
|
||||
return self.y.is_even()
|
||||
|
||||
def to_bytes_compressed(self):
|
||||
"""Convert a non-infinite group element to 33-byte compressed encoding."""
|
||||
assert not self.infinity
|
||||
return bytes([3 - self.y.is_even()]) + self.x.to_bytes()
|
||||
|
||||
def to_bytes_compressed_with_infinity(self):
|
||||
"""Convert a group element to 33-byte compressed encoding, mapping infinity to zeros."""
|
||||
if self.infinity:
|
||||
return 33 * b"\x00"
|
||||
return self.to_bytes_compressed()
|
||||
|
||||
def to_bytes_uncompressed(self):
|
||||
"""Convert a non-infinite group element to 65-byte uncompressed encoding."""
|
||||
assert not self.infinity
|
||||
return b'\x04' + self.x.to_bytes() + self.y.to_bytes()
|
||||
|
||||
def to_bytes_xonly(self):
|
||||
"""Convert (the x coordinate of) a non-infinite group element to 32-byte xonly encoding."""
|
||||
assert not self.infinity
|
||||
return self.x.to_bytes()
|
||||
|
||||
@staticmethod
|
||||
def lift_x(x):
|
||||
"""Return group element with specified field element as x coordinate (and even y)."""
|
||||
y = (FE(x)**3 + 7).sqrt()
|
||||
if y is None:
|
||||
raise ValueError
|
||||
if not y.is_even():
|
||||
y = -y
|
||||
return GE(x, y)
|
||||
|
||||
@staticmethod
|
||||
def from_bytes_compressed(b):
|
||||
"""Convert a compressed to a group element."""
|
||||
assert len(b) == 33
|
||||
if b[0] != 2 and b[0] != 3:
|
||||
raise ValueError
|
||||
x = FE.from_bytes_checked(b[1:])
|
||||
r = GE.lift_x(x)
|
||||
if b[0] == 3:
|
||||
r = -r
|
||||
return r
|
||||
|
||||
@staticmethod
|
||||
def from_bytes_uncompressed(b):
|
||||
"""Convert an uncompressed to a group element."""
|
||||
assert len(b) == 65
|
||||
if b[0] != 4:
|
||||
raise ValueError
|
||||
x = FE.from_bytes_checked(b[1:33])
|
||||
y = FE.from_bytes_checked(b[33:])
|
||||
if y**2 != x**3 + 7:
|
||||
raise ValueError
|
||||
return GE(x, y)
|
||||
|
||||
@staticmethod
|
||||
def from_bytes(b):
|
||||
"""Convert a compressed or uncompressed encoding to a group element."""
|
||||
assert len(b) in (33, 65)
|
||||
if len(b) == 33:
|
||||
return GE.from_bytes_compressed(b)
|
||||
else:
|
||||
return GE.from_bytes_uncompressed(b)
|
||||
|
||||
@staticmethod
|
||||
def from_bytes_xonly(b):
|
||||
"""Convert a point given in xonly encoding to a group element."""
|
||||
assert len(b) == 32
|
||||
x = FE.from_bytes_checked(b)
|
||||
r = GE.lift_x(x)
|
||||
return r
|
||||
|
||||
@staticmethod
|
||||
def is_valid_x(x):
|
||||
"""Determine whether the provided field element is a valid X coordinate."""
|
||||
return (FE(x)**3 + 7).is_square()
|
||||
|
||||
def __str__(self):
|
||||
"""Convert this group element to a string."""
|
||||
if self.infinity:
|
||||
return "(inf)"
|
||||
return f"({self.x},{self.y})"
|
||||
|
||||
def __repr__(self):
|
||||
"""Get a string representation for this group element."""
|
||||
if self.infinity:
|
||||
return "GE()"
|
||||
return f"GE(0x{int(self.x):x},0x{int(self.y):x})"
|
||||
|
||||
def __hash__(self):
|
||||
"""Compute a non-cryptographic hash of the group element."""
|
||||
if self.infinity:
|
||||
return 0 # 0 is not a valid x coordinate
|
||||
return int(self.x)
|
||||
|
||||
|
||||
# The secp256k1 generator point
|
||||
G = GE.lift_x(0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798)
|
||||
|
||||
|
||||
class FastGEMul:
|
||||
"""Table for fast multiplication with a constant group element.
|
||||
|
||||
Speed up scalar multiplication with a fixed point P by using a precomputed lookup table with
|
||||
its powers of 2:
|
||||
|
||||
table = [P, 2*P, 4*P, (2^3)*P, (2^4)*P, ..., (2^255)*P]
|
||||
|
||||
During multiplication, the points corresponding to each bit set in the scalar are added up,
|
||||
i.e. on average ~128 point additions take place.
|
||||
"""
|
||||
|
||||
def __init__(self, p):
|
||||
self.table = [p] # table[i] = (2^i) * p
|
||||
for _ in range(255):
|
||||
p = p + p
|
||||
self.table.append(p)
|
||||
|
||||
def mul(self, a):
|
||||
result = GE()
|
||||
a = int(a)
|
||||
for bit in range(a.bit_length()):
|
||||
if a & (1 << bit):
|
||||
result += self.table[bit]
|
||||
return result
|
||||
|
||||
# Precomputed table with multiples of G for fast multiplication
|
||||
FAST_G = FastGEMul(G)
|
||||
24
bip-0375/deps/secp256k1lab/src/secp256k1lab/util.py
Normal file
24
bip-0375/deps/secp256k1lab/src/secp256k1lab/util.py
Normal file
@@ -0,0 +1,24 @@
|
||||
import hashlib
|
||||
|
||||
|
||||
# This implementation can be sped up by storing the midstate after hashing
|
||||
# tag_hash instead of rehashing it all the time.
|
||||
def tagged_hash(tag: str, msg: bytes) -> bytes:
|
||||
tag_hash = hashlib.sha256(tag.encode()).digest()
|
||||
return hashlib.sha256(tag_hash + tag_hash + msg).digest()
|
||||
|
||||
|
||||
def bytes_from_int(x: int) -> bytes:
|
||||
return x.to_bytes(32, byteorder="big")
|
||||
|
||||
|
||||
def xor_bytes(b0: bytes, b1: bytes) -> bytes:
|
||||
return bytes(x ^ y for (x, y) in zip(b0, b1))
|
||||
|
||||
|
||||
def int_from_bytes(b: bytes) -> int:
|
||||
return int.from_bytes(b, byteorder="big")
|
||||
|
||||
|
||||
def hash_sha256(b: bytes) -> bytes:
|
||||
return hashlib.sha256(b).digest()
|
||||
155
bip-0375/test_runner.py
Normal file
155
bip-0375/test_runner.py
Normal file
@@ -0,0 +1,155 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Process test vectors JSON file and run validation checks"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
from pathlib import Path
|
||||
import sys
|
||||
from typing import Tuple
|
||||
|
||||
project_root = Path(__file__).parent
|
||||
deps_dir = project_root / "deps"
|
||||
secp256k1lab_dir = deps_dir / "secp256k1lab" / "src"
|
||||
for path in [str(deps_dir), str(secp256k1lab_dir)]:
|
||||
if path not in sys.path:
|
||||
sys.path.insert(0, path)
|
||||
|
||||
from validator.psbt_bip375 import BIP375PSBT
|
||||
from validator.validate_psbt import (
|
||||
validate_psbt_structure,
|
||||
validate_ecdh_coverage,
|
||||
validate_input_eligibility,
|
||||
validate_output_scripts,
|
||||
)
|
||||
|
||||
CHECK_FUNCTIONS = {
|
||||
"psbt_structure": validate_psbt_structure,
|
||||
"ecdh_coverage": validate_ecdh_coverage,
|
||||
"input_eligibility": validate_input_eligibility,
|
||||
"output_scripts": validate_output_scripts,
|
||||
}
|
||||
|
||||
|
||||
def validate_bip375_psbt(
|
||||
psbt_data: str, checks: list[str], debug: bool = False
|
||||
) -> Tuple[bool, str]:
|
||||
"""Performs sequential validation of a PSBT against BIP-375 rules"""
|
||||
psbt = BIP375PSBT.from_base64(psbt_data)
|
||||
|
||||
if checks is None:
|
||||
checks = [
|
||||
"psbt_structure",
|
||||
"ecdh_coverage",
|
||||
"input_eligibility",
|
||||
"output_scripts",
|
||||
]
|
||||
|
||||
for check_name in checks:
|
||||
if check_name not in CHECK_FUNCTIONS:
|
||||
return False, f"Unknown check: {check_name}"
|
||||
|
||||
check_fn = CHECK_FUNCTIONS[check_name]
|
||||
|
||||
is_valid, msg = check_fn(psbt)
|
||||
if debug:
|
||||
msg = f"{check_name.upper()}: {msg}" if msg else msg
|
||||
|
||||
if not is_valid:
|
||||
return False, msg
|
||||
|
||||
return True, "All checks passed"
|
||||
|
||||
|
||||
def load_test_vectors(filename: str) -> dict:
|
||||
"""Load test vectors from JSON file"""
|
||||
try:
|
||||
with open(filename, "r") as f:
|
||||
return json.load(f)
|
||||
except FileNotFoundError:
|
||||
print(f"Error: Test vector file '{filename}' not found")
|
||||
sys.exit(1)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Error: Invalid JSON in test vector file: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def run_validation_tests(test_data: dict, verbosity: int = 0) -> tuple[int, int]:
|
||||
"""Run validation checks for each test vector"""
|
||||
passed = 0
|
||||
failed = 0
|
||||
|
||||
# Process invalid PSBTs (should fail validation)
|
||||
invalid_tests = test_data.get("invalid", [])
|
||||
print(f"Invalid PSBTs: {len(invalid_tests)}")
|
||||
for test_vector in invalid_tests:
|
||||
is_valid, result = validate_bip375_psbt(
|
||||
test_vector["psbt"], test_vector.get("checks"), debug=verbosity >= 2
|
||||
)
|
||||
print(f"{test_vector['description']}")
|
||||
if not is_valid:
|
||||
passed += 1
|
||||
if verbosity >= 1:
|
||||
print(f" {result}")
|
||||
else:
|
||||
failed += 1
|
||||
if result:
|
||||
print(f" ERROR: {result}")
|
||||
|
||||
# Process valid PSBTs (should pass validation)
|
||||
valid_tests = test_data.get("valid", [])
|
||||
print("")
|
||||
print(f"Valid PSBTs: {len(valid_tests)}")
|
||||
for test_vector in valid_tests:
|
||||
is_valid, result = validate_bip375_psbt(
|
||||
test_vector["psbt"], test_vector.get("checks"), debug=verbosity >= 2
|
||||
)
|
||||
|
||||
print(f"{test_vector['description']}")
|
||||
if is_valid:
|
||||
passed += 1
|
||||
if verbosity >= 1:
|
||||
print(f" {result}")
|
||||
else:
|
||||
failed += 1
|
||||
if result:
|
||||
print(f" ERROR: {result}")
|
||||
|
||||
return passed, failed
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Silent Payments PSBT Validator",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--test-file",
|
||||
"-f",
|
||||
default=str(project_root / "bip375_test_vectors.json"),
|
||||
help="Test vector file to run (default: bip375_test_vectors.json)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-v",
|
||||
dest="verbosity",
|
||||
action="count",
|
||||
default=0,
|
||||
help="Verbosity level: -v shows pass/fail details, -vv enables debug output",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
test_data = load_test_vectors(args.test_file)
|
||||
|
||||
print(f"Description: {test_data.get('description', 'N/A')}")
|
||||
print(f"Version: {test_data.get('version', 'N/A')}")
|
||||
print()
|
||||
|
||||
passed, failed = run_validation_tests(test_data, args.verbosity)
|
||||
|
||||
print()
|
||||
print(f"Summary: {passed} passed, {failed} failed")
|
||||
|
||||
sys.exit(0 if failed == 0 else 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
51
bip-0375/validator/bip352_crypto.py
Normal file
51
bip-0375/validator/bip352_crypto.py
Normal file
@@ -0,0 +1,51 @@
|
||||
"""
|
||||
Silent payment output script derivation
|
||||
"""
|
||||
|
||||
from typing import List
|
||||
|
||||
from deps.bitcoin_test.messages import COutPoint
|
||||
from secp256k1lab.secp256k1 import G, GE, Scalar
|
||||
from secp256k1lab.ecdh import ecdh_compressed_in_raw_out
|
||||
from secp256k1lab.util import tagged_hash
|
||||
|
||||
|
||||
def compute_silent_payment_output_script(
|
||||
outpoints: List[COutPoint],
|
||||
summed_pubkey_bytes: bytes,
|
||||
ecdh_share_bytes: bytes,
|
||||
spend_pubkey_bytes: bytes,
|
||||
k: int,
|
||||
) -> bytes:
|
||||
"""Compute silent payment output script per BIP-352"""
|
||||
input_hash_bytes = get_input_hash(outpoints, GE.from_bytes(summed_pubkey_bytes))
|
||||
|
||||
# Compute shared_secret = input_hash * ecdh_share
|
||||
shared_secret_bytes = ecdh_compressed_in_raw_out(
|
||||
input_hash_bytes, ecdh_share_bytes
|
||||
).to_bytes_compressed()
|
||||
|
||||
# Compute t_k = hash_BIP0352/SharedSecret(shared_secret || k)
|
||||
t_k = Scalar.from_bytes_checked(
|
||||
tagged_hash("BIP0352/SharedSecret", shared_secret_bytes + ser_uint32(k))
|
||||
)
|
||||
|
||||
# Compute P_k = B_spend + t_k * G
|
||||
B_spend = GE.from_bytes(spend_pubkey_bytes)
|
||||
P_k = B_spend + t_k * G
|
||||
|
||||
# Return P2TR script (x-only pubkey)
|
||||
return bytes([0x51, 0x20]) + P_k.to_bytes_xonly()
|
||||
|
||||
|
||||
def get_input_hash(outpoints: List[COutPoint], sum_input_pubkeys: GE) -> bytes:
|
||||
"""Compute input hash per BIP-352"""
|
||||
lowest_outpoint = sorted(outpoints, key=lambda outpoint: outpoint.serialize())[0]
|
||||
return tagged_hash(
|
||||
"BIP0352/Inputs",
|
||||
lowest_outpoint.serialize() + sum_input_pubkeys.to_bytes_compressed(),
|
||||
)
|
||||
|
||||
|
||||
def ser_uint32(u: int) -> bytes:
|
||||
return u.to_bytes(4, "big")
|
||||
205
bip-0375/validator/inputs.py
Normal file
205
bip-0375/validator/inputs.py
Normal file
@@ -0,0 +1,205 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
PSBT input utility functions
|
||||
"""
|
||||
|
||||
import struct
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from deps.bitcoin_test.messages import CTransaction, CTxOut, from_binary
|
||||
from deps.bitcoin_test.psbt import (
|
||||
PSBT,
|
||||
PSBT_IN_BIP32_DERIVATION,
|
||||
PSBT_IN_NON_WITNESS_UTXO,
|
||||
PSBT_IN_OUTPUT_INDEX,
|
||||
PSBT_IN_REDEEM_SCRIPT,
|
||||
PSBT_IN_TAP_INTERNAL_KEY,
|
||||
PSBT_IN_WITNESS_UTXO,
|
||||
)
|
||||
from secp256k1lab.secp256k1 import GE
|
||||
|
||||
from .psbt_bip375 import BIP375PSBTMap, PSBT_GLOBAL_SP_ECDH_SHARE, PSBT_IN_SP_ECDH_SHARE
|
||||
|
||||
|
||||
def collect_input_ecdh_and_pubkey(
|
||||
psbt: PSBT, scan_key: bytes
|
||||
) -> Tuple[Optional[bytes], Optional[bytes]]:
|
||||
"""
|
||||
Collect combined ECDH share and summed pubkey for a scan key.
|
||||
|
||||
Checks global ECDH share first, falls back to per-input shares.
|
||||
Returns (ecdh_share_bytes, summed_pubkey_bytes) or (None, None).
|
||||
"""
|
||||
# Check for global ECDH share
|
||||
summed_pubkey = None
|
||||
ecdh_share = psbt.g.get_by_key(PSBT_GLOBAL_SP_ECDH_SHARE, scan_key)
|
||||
if ecdh_share:
|
||||
summed_pubkey = None
|
||||
for input_map in psbt.i:
|
||||
pubkey = pubkey_from_eligible_input(input_map)
|
||||
if pubkey is not None:
|
||||
summed_pubkey = (
|
||||
pubkey if summed_pubkey is None else summed_pubkey + pubkey
|
||||
)
|
||||
|
||||
if summed_pubkey:
|
||||
return ecdh_share, summed_pubkey.to_bytes_compressed()
|
||||
|
||||
# Check for per-input ECDH shares
|
||||
combined_ecdh = None
|
||||
for input_map in psbt.i:
|
||||
input_ecdh = input_map.get_by_key(PSBT_IN_SP_ECDH_SHARE, scan_key)
|
||||
if input_ecdh:
|
||||
if not is_input_eligible(input_map):
|
||||
continue # skip ineligible inputs
|
||||
ecdh_point = GE.from_bytes(input_ecdh)
|
||||
combined_ecdh = (
|
||||
ecdh_point if combined_ecdh is None else combined_ecdh + ecdh_point
|
||||
)
|
||||
pubkey = pubkey_from_eligible_input(input_map)
|
||||
if pubkey is not None:
|
||||
summed_pubkey = (
|
||||
pubkey if summed_pubkey is None else summed_pubkey + pubkey
|
||||
)
|
||||
|
||||
if combined_ecdh and summed_pubkey:
|
||||
return combined_ecdh.to_bytes_compressed(), summed_pubkey.to_bytes_compressed()
|
||||
return None, None
|
||||
|
||||
|
||||
def pubkey_from_eligible_input(input_map: BIP375PSBTMap) -> Optional[GE]:
|
||||
"""
|
||||
Extract the public key from a PSBT input map if eligible for silent payments
|
||||
|
||||
Returns a GE point (public key), or None if not found
|
||||
"""
|
||||
if not is_input_eligible(input_map):
|
||||
return None
|
||||
|
||||
# Try BIP32 derivation first (key_data is the pubkey)
|
||||
derivations = input_map.get_all_by_type(PSBT_IN_BIP32_DERIVATION)
|
||||
if derivations:
|
||||
pubkey, _ = derivations[0]
|
||||
if len(pubkey) == 33:
|
||||
return GE.from_bytes(pubkey)
|
||||
|
||||
# Try PSBT_IN_WITNESS_UTXO for P2TR inputs
|
||||
spk = parse_witness_utxo(input_map[PSBT_IN_WITNESS_UTXO])
|
||||
if spk and _is_p2tr(spk):
|
||||
return GE.from_bytes(bytes([0x02]) + spk[2:34])
|
||||
return None
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# scriptPubKey helpers
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def _script_pubkey_from_psbt_input(input_map: BIP375PSBTMap) -> Optional[bytes]:
|
||||
"""Extract scriptPubKey from PSBT input fields"""
|
||||
script_pubkey = None
|
||||
|
||||
# Try WITNESS_UTXO first
|
||||
if PSBT_IN_WITNESS_UTXO in input_map:
|
||||
script_pubkey = parse_witness_utxo(input_map[PSBT_IN_WITNESS_UTXO])
|
||||
|
||||
# Try NON_WITNESS_UTXO for legacy inputs
|
||||
elif PSBT_IN_NON_WITNESS_UTXO in input_map:
|
||||
non_witness_utxo = input_map[PSBT_IN_NON_WITNESS_UTXO]
|
||||
# Get the output index from PSBT_IN_OUTPUT_INDEX field
|
||||
if PSBT_IN_OUTPUT_INDEX in input_map:
|
||||
output_index_bytes = input_map[PSBT_IN_OUTPUT_INDEX]
|
||||
if len(output_index_bytes) == 4:
|
||||
output_index = struct.unpack("<I", output_index_bytes)[0]
|
||||
script_pubkey = _parse_non_witness_utxo(non_witness_utxo, output_index)
|
||||
return script_pubkey
|
||||
|
||||
|
||||
def parse_witness_utxo(witness_utxo: bytes) -> bytes:
|
||||
"""Extract scriptPubKey from witness_utxo"""
|
||||
utxo = from_binary(CTxOut, witness_utxo)
|
||||
return utxo.scriptPubKey
|
||||
|
||||
|
||||
def _parse_non_witness_utxo(non_witness_utxo: bytes, output_index: int) -> bytes:
|
||||
"""Extract scriptPubKey from non_witness_utxo"""
|
||||
tx = from_binary(CTransaction, non_witness_utxo)
|
||||
assert output_index < len(tx.vout), "Invalid output index"
|
||||
return tx.vout[output_index].scriptPubKey
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Input eligibility helpers
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def is_input_eligible(input_map: BIP375PSBTMap) -> bool:
|
||||
"""Check if input is eligible for silent payments"""
|
||||
script_pubkey = _script_pubkey_from_psbt_input(input_map)
|
||||
assert script_pubkey is not None, (
|
||||
"scriptPubKey could not be extracted from PSBT input"
|
||||
)
|
||||
|
||||
if not _has_eligible_script_type(script_pubkey):
|
||||
return False
|
||||
|
||||
NUMS_H = bytes.fromhex(
|
||||
"50929b74c1a04954b78b4b6035e97a5e078a5a0f28ec96d547bfee9ace803ac0"
|
||||
)
|
||||
if _is_p2tr(script_pubkey):
|
||||
tap_internal_key = input_map.get(PSBT_IN_TAP_INTERNAL_KEY)
|
||||
if tap_internal_key == NUMS_H:
|
||||
return False
|
||||
|
||||
if _is_p2sh(script_pubkey):
|
||||
if PSBT_IN_REDEEM_SCRIPT in input_map:
|
||||
redeem_script = input_map[PSBT_IN_REDEEM_SCRIPT]
|
||||
if not _is_p2wpkh(redeem_script):
|
||||
return False
|
||||
else:
|
||||
assert False
|
||||
return True
|
||||
|
||||
|
||||
def _has_eligible_script_type(script_pubkey: bytes) -> bool:
|
||||
"""True if scriptPubKey is eligible for silent payments"""
|
||||
return (
|
||||
_is_p2pkh(script_pubkey)
|
||||
or _is_p2wpkh(script_pubkey)
|
||||
or _is_p2tr(script_pubkey)
|
||||
or _is_p2sh(script_pubkey)
|
||||
)
|
||||
|
||||
|
||||
def _is_p2tr(spk: bytes) -> bool:
|
||||
if len(spk) != 34:
|
||||
return False
|
||||
# OP_1 OP_PUSHBYTES_32 <32 bytes>
|
||||
return (spk[0] == 0x51) & (spk[1] == 0x20)
|
||||
|
||||
|
||||
def _is_p2wpkh(spk: bytes) -> bool:
|
||||
if len(spk) != 22:
|
||||
return False
|
||||
# OP_0 OP_PUSHBYTES_20 <20 bytes>
|
||||
return (spk[0] == 0x00) & (spk[1] == 0x14)
|
||||
|
||||
|
||||
def _is_p2sh(spk: bytes) -> bool:
|
||||
if len(spk) != 23:
|
||||
return False
|
||||
# OP_HASH160 OP_PUSHBYTES_20 <20 bytes> OP_EQUAL
|
||||
return (spk[0] == 0xA9) & (spk[1] == 0x14) & (spk[-1] == 0x87)
|
||||
|
||||
|
||||
def _is_p2pkh(spk: bytes) -> bool:
|
||||
if len(spk) != 25:
|
||||
return False
|
||||
# OP_DUP OP_HASH160 OP_PUSHBYTES_20 <20 bytes> OP_EQUALVERIFY OP_CHECKSIG
|
||||
return (
|
||||
(spk[0] == 0x76)
|
||||
& (spk[1] == 0xA9)
|
||||
& (spk[2] == 0x14)
|
||||
& (spk[-2] == 0x88)
|
||||
& (spk[-1] == 0xAC)
|
||||
)
|
||||
95
bip-0375/validator/psbt_bip375.py
Normal file
95
bip-0375/validator/psbt_bip375.py
Normal file
@@ -0,0 +1,95 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
BIP-375 PSBT map extensions
|
||||
|
||||
BIP375PSBTMap (a PSBTMap subclass with BIP-375 field access helpers)
|
||||
BIP375PSBT (a PSBT subclass that deserializes into BIP375PSBTMap instances)
|
||||
"""
|
||||
|
||||
from io import BytesIO
|
||||
import struct
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from deps.bitcoin_test.messages import CTransaction, deser_compact_size, from_binary
|
||||
from deps.bitcoin_test.psbt import (
|
||||
PSBT,
|
||||
PSBTMap,
|
||||
PSBT_GLOBAL_VERSION,
|
||||
PSBT_GLOBAL_INPUT_COUNT,
|
||||
PSBT_GLOBAL_OUTPUT_COUNT,
|
||||
PSBT_GLOBAL_UNSIGNED_TX,
|
||||
)
|
||||
|
||||
PSBT_GLOBAL_SP_ECDH_SHARE = 0x07
|
||||
PSBT_GLOBAL_SP_DLEQ = 0x08
|
||||
|
||||
PSBT_IN_SP_ECDH_SHARE = 0x1D
|
||||
PSBT_IN_SP_DLEQ = 0x1E
|
||||
|
||||
PSBT_OUT_SP_V0_INFO = 0x09
|
||||
PSBT_OUT_SP_V0_LABEL = 0x0A
|
||||
|
||||
|
||||
class BIP375PSBTMap(PSBTMap):
|
||||
"""PSBTMap with BIP-375 field access helpers"""
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.map[key]
|
||||
|
||||
def __contains__(self, key):
|
||||
return key in self.map
|
||||
|
||||
def get(self, key, default=None):
|
||||
return self.map.get(key, default)
|
||||
|
||||
def get_all_by_type(self, key_type: int) -> List[Tuple[bytes, bytes]]:
|
||||
"""
|
||||
Get all entries with the given key_type
|
||||
|
||||
Returns list of (key_data, value_data) tuples. For single-byte keys (no
|
||||
key_data), key_data is b''.
|
||||
"""
|
||||
result = []
|
||||
for key, value_data in self.map.items():
|
||||
if isinstance(key, int) and key == key_type:
|
||||
result.append((b"", value_data))
|
||||
elif isinstance(key, bytes) and len(key) > 0 and key[0] == key_type:
|
||||
result.append((key[1:], value_data))
|
||||
return result
|
||||
|
||||
def get_by_key(self, key_type: int, key_data: bytes) -> Optional[bytes]:
|
||||
"""Get value_data for a specific key_type + key_data combination"""
|
||||
if key_data == b"":
|
||||
return self.map.get(key_type)
|
||||
return self.map.get(bytes([key_type]) + key_data)
|
||||
|
||||
|
||||
class BIP375PSBT(PSBT):
|
||||
"""PSBT that deserializes maps as BIP375PSBTMap instances"""
|
||||
|
||||
def deserialize(self, f):
|
||||
assert f.read(5) == b"psbt\xff"
|
||||
self.g = from_binary(BIP375PSBTMap, f)
|
||||
|
||||
self.version = 0
|
||||
if PSBT_GLOBAL_VERSION in self.g.map:
|
||||
assert PSBT_GLOBAL_INPUT_COUNT in self.g.map
|
||||
assert PSBT_GLOBAL_OUTPUT_COUNT in self.g.map
|
||||
self.version = struct.unpack("<I", self.g.map[PSBT_GLOBAL_VERSION])[0]
|
||||
assert self.version in [0, 2]
|
||||
if self.version == 2:
|
||||
self.in_count = deser_compact_size(
|
||||
BytesIO(self.g.map[PSBT_GLOBAL_INPUT_COUNT])
|
||||
)
|
||||
self.out_count = deser_compact_size(
|
||||
BytesIO(self.g.map[PSBT_GLOBAL_OUTPUT_COUNT])
|
||||
)
|
||||
else:
|
||||
assert PSBT_GLOBAL_UNSIGNED_TX in self.g.map
|
||||
tx = from_binary(CTransaction, self.g.map[PSBT_GLOBAL_UNSIGNED_TX])
|
||||
self.in_count = len(tx.vin)
|
||||
self.out_count = len(tx.vout)
|
||||
|
||||
self.i = [from_binary(BIP375PSBTMap, f) for _ in range(self.in_count)]
|
||||
self.o = [from_binary(BIP375PSBTMap, f) for _ in range(self.out_count)]
|
||||
return self
|
||||
345
bip-0375/validator/validate_psbt.py
Normal file
345
bip-0375/validator/validate_psbt.py
Normal file
@@ -0,0 +1,345 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Validates PSBTs according to BIP-375 rules
|
||||
|
||||
Provides independent checks for PSBT structure, ECDH share coverage,
|
||||
input eligibility, and output script correctness.
|
||||
"""
|
||||
|
||||
import struct
|
||||
from typing import Tuple
|
||||
|
||||
from deps.bitcoin_test.messages import COutPoint
|
||||
from deps.bitcoin_test.psbt import (
|
||||
PSBT,
|
||||
PSBT_GLOBAL_TX_MODIFIABLE,
|
||||
PSBT_IN_OUTPUT_INDEX,
|
||||
PSBT_IN_PREVIOUS_TXID,
|
||||
PSBT_IN_SIGHASH_TYPE,
|
||||
PSBT_IN_WITNESS_UTXO,
|
||||
PSBT_OUT_SCRIPT,
|
||||
)
|
||||
from deps.dleq import dleq_verify_proof
|
||||
from secp256k1lab.secp256k1 import GE
|
||||
|
||||
from .bip352_crypto import compute_silent_payment_output_script
|
||||
from .inputs import (
|
||||
collect_input_ecdh_and_pubkey,
|
||||
is_input_eligible,
|
||||
parse_witness_utxo,
|
||||
pubkey_from_eligible_input,
|
||||
)
|
||||
from .psbt_bip375 import (
|
||||
PSBT_GLOBAL_SP_ECDH_SHARE,
|
||||
PSBT_GLOBAL_SP_DLEQ,
|
||||
PSBT_IN_SP_ECDH_SHARE,
|
||||
PSBT_IN_SP_DLEQ,
|
||||
PSBT_OUT_SP_V0_INFO,
|
||||
PSBT_OUT_SP_V0_LABEL,
|
||||
)
|
||||
|
||||
|
||||
def validate_psbt_structure(psbt: PSBT) -> Tuple[bool, str]:
|
||||
"""
|
||||
Validate PSBT structure requirements
|
||||
|
||||
Checks:
|
||||
- Each output must have PSBT_OUT_SCRIPT or PSBT_OUT_SP_V0_INFO
|
||||
- PSBT_OUT_SP_V0_LABEL requires PSBT_OUT_SP_V0_INFO
|
||||
- SP_V0_INFO must be 66 bytes (33-byte scan key + 33-byte spend key)
|
||||
- ECDH shares must be 33 bytes
|
||||
- DLEQ proofs must be 64 bytes
|
||||
- TX_MODIFIABLE is zero when PSBT_OUT_SCRIPT set for SP output
|
||||
"""
|
||||
# Check output requirements
|
||||
for i, output_map in enumerate(psbt.o):
|
||||
has_script = (
|
||||
PSBT_OUT_SCRIPT in output_map and len(output_map[PSBT_OUT_SCRIPT]) > 0
|
||||
)
|
||||
has_sp_info = PSBT_OUT_SP_V0_INFO in output_map
|
||||
has_sp_label = PSBT_OUT_SP_V0_LABEL in output_map
|
||||
|
||||
# Output must have script or SP info
|
||||
if not has_script and not has_sp_info:
|
||||
return (
|
||||
False,
|
||||
f"Output {i} must have either PSBT_OUT_SCRIPT or PSBT_OUT_SP_V0_INFO",
|
||||
)
|
||||
|
||||
# SP label requires SP info
|
||||
if has_sp_label and not has_sp_info:
|
||||
return (
|
||||
False,
|
||||
f"Output {i} has PSBT_OUT_SP_V0_LABEL but missing PSBT_OUT_SP_V0_INFO",
|
||||
)
|
||||
|
||||
# Validate SP_V0_INFO field length
|
||||
if has_sp_info:
|
||||
sp_info = output_map[PSBT_OUT_SP_V0_INFO]
|
||||
if len(sp_info) != 66:
|
||||
return (
|
||||
False,
|
||||
f"Output {i} SP_V0_INFO has wrong length ({len(sp_info)} bytes, expected 66)",
|
||||
)
|
||||
|
||||
# Validate ECDH share lengths (global and per-input)
|
||||
global_ecdh_shares = psbt.g.get_all_by_type(PSBT_GLOBAL_SP_ECDH_SHARE)
|
||||
for _, ecdh_share in global_ecdh_shares:
|
||||
if len(ecdh_share) != 33:
|
||||
return (
|
||||
False,
|
||||
f"Global ECDH share has wrong length ({len(ecdh_share)} bytes, expected 33)",
|
||||
)
|
||||
|
||||
for i, input_map in enumerate(psbt.i):
|
||||
input_ecdh_shares = input_map.get_all_by_type(PSBT_IN_SP_ECDH_SHARE)
|
||||
for _, ecdh_share in input_ecdh_shares:
|
||||
if len(ecdh_share) != 33:
|
||||
return (
|
||||
False,
|
||||
f"Input {i} ECDH share has wrong length ({len(ecdh_share)} bytes, expected 33)",
|
||||
)
|
||||
|
||||
# Validate DLEQ proof lengths (global and per-input)
|
||||
global_dleq_proofs = psbt.g.get_all_by_type(PSBT_GLOBAL_SP_DLEQ)
|
||||
for _, dleq_proof in global_dleq_proofs:
|
||||
if len(dleq_proof) != 64:
|
||||
return (
|
||||
False,
|
||||
f"Global DLEQ proof has wrong length ({len(dleq_proof)} bytes, expected 64)",
|
||||
)
|
||||
|
||||
for i, input_map in enumerate(psbt.i):
|
||||
input_dleq_proofs = input_map.get_all_by_type(PSBT_IN_SP_DLEQ)
|
||||
for _, dleq_proof in input_dleq_proofs:
|
||||
if len(dleq_proof) != 64:
|
||||
return (
|
||||
False,
|
||||
f"Input {i} DLEQ proof has wrong length ({len(dleq_proof)} bytes, expected 64)",
|
||||
)
|
||||
|
||||
# Check TX_MODIFIABLE flag when PSBT_OUT_SCRIPT is set
|
||||
for output_map in psbt.o:
|
||||
if PSBT_OUT_SP_V0_INFO in output_map and PSBT_OUT_SCRIPT in output_map:
|
||||
if len(output_map.get(PSBT_OUT_SCRIPT, b"")) > 0:
|
||||
if psbt.g.get(PSBT_GLOBAL_TX_MODIFIABLE) != b"\x00":
|
||||
return (
|
||||
False,
|
||||
"PSBT_OUT_SCRIPT set for silent payments output but PSBT_GLOBAL_TX_MODIFIABLE not zeroed",
|
||||
)
|
||||
return True, None
|
||||
|
||||
|
||||
def validate_ecdh_coverage(psbt: PSBT) -> Tuple[bool, str]:
|
||||
"""
|
||||
Validate ECDH share coverage and DLEQ proof correctness
|
||||
|
||||
Checks:
|
||||
- Verify ECDH share coverage for each scan key associated with SP outputs
|
||||
- Every ECDH share must have a corresponding DLEQ proof
|
||||
- If PSBT_OUT_SCRIPT is set, all eligible inputs must have ECDH coverage
|
||||
- DLEQ proofs must verify correctly
|
||||
"""
|
||||
# Collect unique scan keys from SP outputs
|
||||
scan_keys = set()
|
||||
for output_map in psbt.o:
|
||||
if PSBT_OUT_SP_V0_INFO in output_map:
|
||||
sp_info = output_map[PSBT_OUT_SP_V0_INFO]
|
||||
scan_keys.add(sp_info[:33])
|
||||
|
||||
if not scan_keys:
|
||||
return True, None # No SP outputs, nothing to check
|
||||
|
||||
# For each scan key, verify ECDH share coverage and DLEQ proofs
|
||||
for scan_key in scan_keys:
|
||||
has_global_ecdh = psbt.g.get_by_key(PSBT_GLOBAL_SP_ECDH_SHARE, scan_key)
|
||||
has_input_ecdh = any(
|
||||
input_map.get_by_key(PSBT_IN_SP_ECDH_SHARE, scan_key)
|
||||
for input_map in psbt.i
|
||||
)
|
||||
|
||||
scan_key_has_computed_output = any(
|
||||
PSBT_OUT_SP_V0_INFO in om
|
||||
and om[PSBT_OUT_SP_V0_INFO][:33] == scan_key
|
||||
and PSBT_OUT_SCRIPT in om
|
||||
for om in psbt.o
|
||||
)
|
||||
if scan_key_has_computed_output and not has_global_ecdh and not has_input_ecdh:
|
||||
return False, "Silent payment output present but no ECDH share for scan key"
|
||||
|
||||
# Verify global DLEQ proof if global ECDH present
|
||||
if has_global_ecdh:
|
||||
ecdh_share = psbt.g.get_by_key(PSBT_GLOBAL_SP_ECDH_SHARE, scan_key)
|
||||
dleq_proof = psbt.g.get_by_key(PSBT_GLOBAL_SP_DLEQ, scan_key)
|
||||
if not dleq_proof:
|
||||
return False, "Global ECDH share missing DLEQ proof"
|
||||
|
||||
_, summed_pubkey_bytes = collect_input_ecdh_and_pubkey(psbt, scan_key)
|
||||
assert summed_pubkey_bytes is not None, "No public keys found for inputs"
|
||||
A_sum = GE.from_bytes(summed_pubkey_bytes)
|
||||
|
||||
valid, msg = validate_dleq_proof(A_sum, scan_key, ecdh_share, dleq_proof)
|
||||
if not valid:
|
||||
return False, f"Global DLEQ proof invalid: {msg}"
|
||||
|
||||
# Verify per-input coverage for eligible inputs
|
||||
if scan_key_has_computed_output and not has_global_ecdh:
|
||||
for i, input_map in enumerate(psbt.i):
|
||||
if not is_input_eligible(input_map):
|
||||
continue
|
||||
ecdh_share = input_map.get_by_key(PSBT_IN_SP_ECDH_SHARE, scan_key)
|
||||
if not ecdh_share:
|
||||
return (
|
||||
False,
|
||||
f"Output script set but eligible input {i} missing ECDH share",
|
||||
)
|
||||
else:
|
||||
# Verify per-input DLEQ proofs
|
||||
dleq_proof = input_map.get_by_key(PSBT_IN_SP_DLEQ, scan_key)
|
||||
if not dleq_proof:
|
||||
return False, f"Input {i} ECDH share missing DLEQ proof"
|
||||
|
||||
# Get input public key A
|
||||
A = pubkey_from_eligible_input(input_map)
|
||||
if A is None:
|
||||
return (
|
||||
False,
|
||||
f"Input {i} missing public key for DLEQ verification",
|
||||
)
|
||||
|
||||
valid, msg = validate_dleq_proof(
|
||||
A, scan_key, ecdh_share, dleq_proof
|
||||
)
|
||||
if not valid:
|
||||
return False, f"Input {i} DLEQ proof invalid: {msg}"
|
||||
return True, None
|
||||
|
||||
|
||||
def validate_dleq_proof(
|
||||
A: GE,
|
||||
scan_key: bytes,
|
||||
ecdh_share: bytes,
|
||||
dleq_proof: bytes,
|
||||
) -> Tuple[bool, str]:
|
||||
"""
|
||||
Verify a DLEQ proof for silent payments
|
||||
|
||||
Checks:
|
||||
- ECDH share and DLEQ proof lengths
|
||||
- Verify DLEQ proof correctness
|
||||
"""
|
||||
if len(ecdh_share) != 33:
|
||||
return (
|
||||
False,
|
||||
f"Invalid ECDH share length: {len(ecdh_share)} bytes (expected 33)",
|
||||
)
|
||||
|
||||
if len(dleq_proof) != 64:
|
||||
return (
|
||||
False,
|
||||
f"Invalid DLEQ proof length: {len(dleq_proof)} bytes (expected 64)",
|
||||
)
|
||||
|
||||
B_scan = GE.from_bytes(scan_key)
|
||||
C_ecdh = GE.from_bytes(ecdh_share)
|
||||
|
||||
# Verify DLEQ proof using BIP-374 reference
|
||||
result = dleq_verify_proof(A, B_scan, C_ecdh, dleq_proof)
|
||||
if not result:
|
||||
return False, "DLEQ proof verification failed"
|
||||
return True, None
|
||||
|
||||
|
||||
def validate_input_eligibility(psbt: PSBT) -> Tuple[bool, str]:
|
||||
"""
|
||||
Validate input eligibility constraints for silent payments
|
||||
|
||||
Checks:
|
||||
- No segwit v>1 inputs when SP outputs present
|
||||
- SIGHASH_ALL required when SP outputs present
|
||||
"""
|
||||
# Check if SP outputs exist
|
||||
has_sp_outputs = any(PSBT_OUT_SP_V0_INFO in om for om in psbt.o)
|
||||
if not has_sp_outputs:
|
||||
return True, None
|
||||
|
||||
# Check segwit version restrictions
|
||||
for i, input_map in enumerate(psbt.i):
|
||||
if PSBT_IN_WITNESS_UTXO in input_map:
|
||||
witness_utxo = input_map[PSBT_IN_WITNESS_UTXO]
|
||||
script = parse_witness_utxo(witness_utxo)
|
||||
if script and 0x51 < script[0] <= 0x60: # OP_2 or higher (segwit v2+)
|
||||
return False, f"Input {i} uses segwit version > 1 with silent payments"
|
||||
|
||||
# Check SIGHASH_ALL requirement - PSBT_IN_SIGHASH_TYPE is optional, but if set it must be SIGHASH_ALL when SP outputs are present
|
||||
for i, input_map in enumerate(psbt.i):
|
||||
if PSBT_IN_SIGHASH_TYPE in input_map:
|
||||
sighash = input_map[PSBT_IN_SIGHASH_TYPE]
|
||||
if len(sighash) >= 4:
|
||||
sighash_type = struct.unpack("<I", sighash[:4])[0]
|
||||
if sighash_type != 1: # SIGHASH_ALL
|
||||
return (
|
||||
False,
|
||||
f"Input {i} uses non-SIGHASH_ALL ({sighash_type}) with silent payments",
|
||||
)
|
||||
return True, None
|
||||
|
||||
|
||||
def validate_output_scripts(psbt: PSBT) -> Tuple[bool, str]:
|
||||
"""
|
||||
Validate computed output scripts match silent payment derivation
|
||||
|
||||
Checks:
|
||||
- For each SP output with PSBT_OUT_SCRIPT set, recomputes the expected P2TR
|
||||
script from the ECDH share and input public keys and verifies it matches
|
||||
- k values are tracked per scan key and incremented for each SP output sharing
|
||||
the same scan key (outputs with different scan keys use independent k counters)
|
||||
"""
|
||||
# Build outpoints list
|
||||
outpoints = []
|
||||
for input_map in psbt.i:
|
||||
if PSBT_IN_PREVIOUS_TXID in input_map and PSBT_IN_OUTPUT_INDEX in input_map:
|
||||
output_index_bytes = input_map.get(PSBT_IN_OUTPUT_INDEX)
|
||||
txid_int = int.from_bytes(input_map[PSBT_IN_PREVIOUS_TXID], "little")
|
||||
output_index = struct.unpack("<I", output_index_bytes)[0]
|
||||
outpoints.append(COutPoint(txid_int, output_index))
|
||||
|
||||
# Track k values per scan key
|
||||
scan_key_k_values = {}
|
||||
|
||||
# Validate each SP output
|
||||
for output_idx, output_map in enumerate(psbt.o):
|
||||
if PSBT_OUT_SP_V0_INFO not in output_map:
|
||||
continue # Skip non-SP outputs
|
||||
|
||||
sp_info = output_map[PSBT_OUT_SP_V0_INFO]
|
||||
scan_pubkey_bytes = sp_info[:33]
|
||||
spend_pubkey_bytes = sp_info[33:]
|
||||
|
||||
k = scan_key_k_values.get(scan_pubkey_bytes, 0)
|
||||
|
||||
# Get ECDH share and summed pubkey
|
||||
ecdh_share_bytes, summed_pubkey_bytes = collect_input_ecdh_and_pubkey(
|
||||
psbt, scan_pubkey_bytes
|
||||
)
|
||||
|
||||
if ecdh_share_bytes and summed_pubkey_bytes and outpoints:
|
||||
computed_script = compute_silent_payment_output_script(
|
||||
outpoints, summed_pubkey_bytes, ecdh_share_bytes, spend_pubkey_bytes, k
|
||||
)
|
||||
|
||||
if PSBT_OUT_SCRIPT in output_map:
|
||||
actual_script = output_map[PSBT_OUT_SCRIPT]
|
||||
if actual_script != computed_script:
|
||||
return (
|
||||
False,
|
||||
f"Output {output_idx} script doesn't match silent payments derivation",
|
||||
)
|
||||
|
||||
scan_key_k_values[scan_pubkey_bytes] = k + 1
|
||||
elif PSBT_OUT_SCRIPT in output_map:
|
||||
return (
|
||||
False,
|
||||
f"Output {output_idx} has PSBT_OUT_SCRIPT but missing ECDH share or input pubkeys",
|
||||
)
|
||||
return True, None
|
||||
Reference in New Issue
Block a user