1
0
mirror of https://github.com/bitcoin/bips.git synced 2026-02-09 15:23:09 +00:00
bips/bip-0089/reference.py
Jurvis Tan 57869d524a
BIP 89: Chain Code Delegation for Private Collaborative Custody (#2004)
* Add Chaincode Delegation BIP

* Update license to BSD-3-Clause and expand blinded signing documentation

* Address initial PR comments

* Update with BIP number assignment

* Fix delegator_sign test vector

* Upgrade secp256k1lab and add license file

- Upgrade vendored secp256k1lab to commit a265da1 (adds type annotations)
- Add COPYING file to satisfy MIT license requirements
- Document secp256k1lab commit reference in BIP text

* Fix type checker and linter issues in reference implementation

- Fix TweakContext to use Scalar types for gacc/tacc
- Replace HashFunction enum with Callable type alias
- Fix bytearray to bytes conversion in blind_sign
- Move imports to top of file
- Fix boolean comparison style (use 'not' instead of '== False')
- Add proper type annotations and casts for dict handling
- Remove unused imports and type ignore comments

* Address PR review comments on terminology and clarity

- Add intro explaining delegation naming (chain code is delegated, not
  signing authority)
- Reorder terminology to list Delegator before Delegatee
- Replace "quorum" with clearer "can co-sign for UTXOs" language
- Clarify derivation constraints in terms of delegatee's extended key
- Rename "Delegatee Signing" section to "Signing Modes"
- Fix "delegatee can apply" to "delegator can produce" (line 112)
- Replace undefined "caller" with "delegatee" (line 173)
- Clarify "Change outputs" to "Tweaks for change outputs" (line 98)
- Add note that message is separate from CCD bundle
- Add note on application-specific verification (addresses, amounts)
- Add transition sentence clarifying non-concurrent protocol scope

* Add changelog entry for 0.1.3

* Fix header: use Authors (plural) for multiple authors

* Fix BIP header format for CI compliance

- Change Type from 'Standards Track' to 'Specification' (valid type)
- Change 'Created' to 'Assigned' (correct field name per BIP format)
- Change 'Post-History' to 'Discussion' (recognized field in buildtable.pl)

* Apply suggestion from @murchandamus

---------

Co-authored-by: Jesse Posner <jesse.posner@gmail.com>
2026-02-04 12:58:08 -08:00

784 lines
30 KiB
Python

# BIPXXX reference implementation
#
# WARNING: This implementation is for demonstration purposes only and _not_ to
# be used in production environments. The code is vulnerable to timing attacks,
# for example.
from typing import Dict, Mapping, Optional, Sequence, Tuple, NewType, NamedTuple, List, Callable, Any, cast
import hashlib
import json
import os
import secrets
import sys
from bip32 import (
CURVE_N,
ExtendedPublicKey,
apply_tweak_to_public,
apply_tweak_to_secret,
int_to_bytes,
parse_extended_public_key,
compress_point,
decode_path,
)
from descriptor import SortedMultiDescriptorTemplate
from secp256k1lab.bip340 import schnorr_sign, schnorr_verify
from secp256k1lab.keys import pubkey_gen_plain
from secp256k1lab.secp256k1 import G, GE, Scalar
HashFunc = Callable[[bytes], Any]
PlainPk = NewType('PlainPk', bytes)
XonlyPk = NewType('XonlyPk', bytes)
def xbytes(P: GE) -> bytes:
return P.to_bytes_xonly()
def cbytes(P: GE) -> bytes:
return P.to_bytes_compressed()
def cpoint(x: bytes) -> GE:
return GE.from_bytes_compressed(x)
TweakContext = NamedTuple('TweakContext', [('Q', GE),
('gacc', Scalar),
('tacc', Scalar)])
def tweak_ctx_init(pk: PlainPk) -> TweakContext:
Q = cpoint(pk)
if Q.infinity:
raise ValueError('The public key cannot be infinity.')
gacc = Scalar(1)
tacc = Scalar(0)
return TweakContext(Q, gacc, tacc)
def apply_tweak(tweak_ctx: TweakContext, tweak: bytes, is_xonly: bool) -> TweakContext:
if len(tweak) != 32:
raise ValueError('The tweak must be a 32-byte array.')
Q, gacc, tacc = tweak_ctx
if is_xonly and not Q.has_even_y():
g = Scalar(-1)
else:
g = Scalar(1)
try:
t = Scalar.from_bytes_checked(tweak)
except ValueError:
raise ValueError('The tweak must be less than n.')
Q_ = g * Q + t * G
if Q_.infinity:
raise ValueError('The result of tweaking cannot be infinity.')
gacc_ = g * gacc
tacc_ = t + g * tacc
return TweakContext(Q_, gacc_, tacc_)
# Return the plain public key corresponding to a given secret key
def individual_pk(seckey: bytes) -> PlainPk:
return PlainPk(pubkey_gen_plain(seckey))
def bytes_xor(a: bytes, b: bytes) -> bytes:
return bytes(x ^ y for x, y in zip(a, b))
# This implementation can be sped up by storing the midstate after hashing
# tag_hash instead of rehashing it all the time.
def tagged_hash(tag: str, msg: bytes, hash_func: HashFunc = hashlib.sha256) -> bytes:
tag_hash = hash_func(tag.encode()).digest()
return hash_func(tag_hash + tag_hash + msg).digest()
def nonce_hash(rand: bytes, pk: PlainPk, extra_in: bytes) -> bytes:
buf = b''
buf += rand
buf += len(pk).to_bytes(1, 'big')
buf += pk
buf += len(extra_in).to_bytes(4, 'big')
buf += extra_in
return tagged_hash('CCD/blindnonce', buf)
def blind_nonce_gen_internal(rand_: bytes, sk: Optional[bytes], pk: Optional[PlainPk], extra_in: Optional[bytes]) -> Tuple[bytearray, bytes]:
if sk is not None:
rand = bytes_xor(sk, tagged_hash('CCD/aux', rand_))
else:
rand = rand_
if pk is None:
pk = PlainPk(b'')
if extra_in is None:
extra_in = b''
k = Scalar.from_bytes_wrapping(nonce_hash(rand, pk, extra_in))
# k == 0 cannot occur except with negligible probability.
assert k != 0
R = k * G
assert R is not None
blindpubnonce = cbytes(R)
blindsecnonce = bytearray(k.to_bytes() + pk)
return blindsecnonce, blindpubnonce
def blind_nonce_gen(sk: Optional[bytes], pk: Optional[PlainPk], extra_in: Optional[bytes]) -> Tuple[bytearray, bytes]:
if sk is not None and len(sk) != 32:
raise ValueError('The optional byte array sk must have length 32.')
rand_ = secrets.token_bytes(32)
return blind_nonce_gen_internal(rand_, sk, pk, extra_in)
SessionContext = NamedTuple('SessionContext', [('pk', PlainPk),
('blindfactor', bytes),
('challenge', bytes),
('pubnonce', bytes),
('tweaks', List[bytes]),
('is_xonly', List[bool])])
def blind_factor_hash(rand: bytes, cpk: PlainPk, blindpubnonce: bytes, msg: bytes, extra_in: bytes) -> bytes:
buf = b''
buf += rand
buf += len(cpk).to_bytes(1, 'big')
buf += cpk
buf += len(blindpubnonce).to_bytes(1, 'big')
buf += blindpubnonce
buf += len(msg).to_bytes(8, 'big')
buf += msg
buf += len(extra_in).to_bytes(4, 'big')
buf += extra_in
return tagged_hash('CCD/blindfactor', buf, hashlib.sha512)
def blind_challenge_gen_internal(rand: bytes, msg: bytes, blindpubnonce: bytes, pk: PlainPk, tweaks: List[bytes], is_xonly: List[bool], extra_in: Optional[bytes]) -> Tuple[SessionContext, bytes, bool, bool]:
if extra_in is None:
extra_in = b''
Q, gacc, tacc = pubkey_and_tweak(pk, tweaks, is_xonly)
cpk = PlainPk(cbytes(Q))
k = blind_factor_hash(rand, cpk, blindpubnonce, msg, extra_in)
a_ = Scalar.from_bytes_wrapping(k[0:32])
assert a_ != 0
b_ = Scalar.from_bytes_wrapping(k[32:64])
assert b_ != 0
g = Scalar(1) if Q.has_even_y() else Scalar(-1)
pk_parity = g * gacc == 1
X_ = cpoint(pk)
X = X_ if pk_parity else -X_
R_ = cpoint(blindpubnonce)
R = R_ + (a_ * G) + (b_ * X)
if R is None:
raise ValueError('The result of nonce blinding cannot be infinity.')
nonce_parity = R.has_even_y()
if not nonce_parity:
a = -a_
b = -b_
else:
a = a_
b = b_
e = Scalar.from_bytes_wrapping(tagged_hash("BIP0340/challenge", xbytes(R) + xbytes(Q) + msg))
e_ = e + b
session_ctx = SessionContext(pk, a.to_bytes(), e.to_bytes(), cbytes(R), tweaks, is_xonly)
return session_ctx, e_.to_bytes(), pk_parity, nonce_parity
def blind_challenge_gen(msg: bytes, blindpubnonce: bytes, pk: PlainPk, tweaks: List[bytes], is_xonly: List[bool], extra_in: Optional[bytes]) -> Tuple[SessionContext, bytes, bool, bool]:
rand = secrets.token_bytes(32)
return blind_challenge_gen_internal(rand, msg, blindpubnonce, pk, tweaks, is_xonly, extra_in)
def blind_sign(sk: bytes, blindchallenge: bytes, blindsecnonce: bytearray, pk_parity: bool, nonce_parity: bool) -> bytes:
try:
d_ = Scalar.from_bytes_checked(sk)
if d_ == 0:
raise ValueError('The secret key cannot be zero.')
except ValueError:
raise ValueError('The secret key is out of range.')
P = d_ * G
if P.infinity:
raise ValueError('The public key cannot be infinity.')
d = d_ if pk_parity else -d_
e_ = Scalar.from_bytes_checked(blindchallenge)
k_ = Scalar.from_bytes_checked(bytes(blindsecnonce[0:32]))
k = k_ if nonce_parity else -k_
# Overwrite the secnonce argument with zeros such that subsequent calls of
# sign with the same secnonce raise a ValueError.
blindsecnonce[:64] = bytearray(b'\x00'*64)
R_ = k_ * G
if R_.infinity:
raise ValueError('The blindpubnonce cannot be infinity.')
s_ = k + (e_ * d)
pk = PlainPk(cbytes(P))
blindsignature = s_.to_bytes()
assert verify_blind_signature(pk, cbytes(R_), blindchallenge, blindsignature, pk_parity, nonce_parity)
return blindsignature
def verify_blind_signature(pk: PlainPk, blindpubnonce: bytes, blindchallenge: bytes, blindsignature: bytes, pk_parity: bool, nonce_parity: bool) -> bool:
P_ = cpoint(pk)
P = P_ if pk_parity else -P_
if P.infinity:
raise ValueError('The public key cannot be infinity.')
R_ = cpoint(blindpubnonce)
R = R_ if nonce_parity else -R_
e_ = Scalar.from_bytes_checked(blindchallenge)
s_ = Scalar.from_bytes_checked(blindsignature)
R_calc = (s_ * G) + (-e_ * P)
if R_calc.infinity:
return False
return R == R_calc
def pubkey_and_tweak(pk: PlainPk, tweaks: List[bytes], is_xonly: List[bool]) -> TweakContext:
if len(tweaks) != len(is_xonly):
raise ValueError('The tweaks and is_xonly arrays must have the same length.')
tweak_ctx = tweak_ctx_init(pk)
v = len(tweaks)
for i in range(v):
tweak_ctx = apply_tweak(tweak_ctx, tweaks[i], is_xonly[i])
return tweak_ctx
def get_session_values(session_ctx: SessionContext) -> Tuple[GE, Scalar, Scalar, GE, Scalar, Scalar]:
(pk, blindfactor, challenge, pubnonce, tweaks, is_xonly) = session_ctx
Q, gacc, tacc = pubkey_and_tweak(pk, tweaks, is_xonly)
a = Scalar.from_bytes_checked(blindfactor)
e = Scalar.from_bytes_checked(challenge)
R = cpoint(pubnonce)
return Q, a, e, R, gacc, tacc
def unblind_signature(session_ctx: SessionContext, blindsignature: bytes) -> bytes:
Q, a, e, R, gacc, tacc = get_session_values(session_ctx)
s_ = Scalar.from_bytes_checked(blindsignature)
g = Scalar(1) if Q.has_even_y() else Scalar(-1)
s = s_ + a + (e * g * tacc)
return xbytes(R) + s.to_bytes()
#
# The following code is only used for testing.
#
def hx(s: str) -> bytes:
return bytes.fromhex(s)
def fromhex_all(l): # noqa: E741
return [hx(l_i) for l_i in l]
def get_error_details(tc):
et = tc["error"]["type"]
# Resolve to real class from name
exc_cls = getattr(__builtins__, et, None) or getattr(__import__("builtins"), et)
# Optional message predicate
msg = tc["error"].get("message")
if msg is None:
return exc_cls, (lambda e: True)
return exc_cls, (lambda e: msg in str(e))
def assert_raises(exc_cls, fn, pred):
try:
fn()
except Exception as e:
assert isinstance(e, exc_cls), f"Raised {type(e).__name__}, expected {exc_cls.__name__}"
assert pred(e), f"Exception message predicate failed: {e}"
return
assert False, f"Expected {exc_cls.__name__} but no exception was raised"
def build_session_ctx(obj):
pk = PlainPk(bytes.fromhex(obj["pk"]))
a = bytes.fromhex(obj["blindfactor"])
e = bytes.fromhex(obj["challenge"])
R = bytes.fromhex(obj["pubnonce"])
tweaks = fromhex_all(obj["tweaks"])
is_xonly = obj["is_xonly"]
return (pk, a, e, R, tweaks, is_xonly)
def test_blind_nonce_gen_vectors():
with open(os.path.join(sys.path[0], 'vectors', 'blind_nonce_gen_vectors.json')) as f:
tv = json.load(f)
for tc in tv["test_cases"]:
def get_bytes(key) -> bytes:
return bytes.fromhex(tc[key])
def get_bytes_maybe(key) -> Optional[bytes]:
v = tc.get(key)
return None if v is None else bytes.fromhex(v)
rand_ = get_bytes("rand_")
sk = get_bytes_maybe("sk")
pk = get_bytes_maybe("pk")
if pk is not None:
pk = PlainPk(pk)
extra_in = get_bytes_maybe("extra_in")
expected_blindsecnonce = get_bytes("expected_blindsecnonce")
expected_blindpubnonce = get_bytes("expected_blindpubnonce")
blindsecnonce, blindpubnonce = blind_nonce_gen_internal(rand_, sk, pk, extra_in)
assert bytes(blindsecnonce) == expected_blindsecnonce
assert blindpubnonce == expected_blindpubnonce
pk_len = 0 if tc["pk"] is None else 33
assert len(expected_blindsecnonce) == 32 + pk_len
assert len(expected_blindpubnonce) == 33
def test_blind_challenge_gen_vectors():
with open(os.path.join(sys.path[0], 'vectors', 'blind_challenge_gen_vectors.json')) as f:
tv = json.load(f)
# ---------- Valid cases ----------
for tc in tv["test_cases"]:
rand = bytes.fromhex(tc["rand"])
msg = bytes.fromhex(tc["msg"]) if tc["msg"] != "" else b""
blindpubnonce = bytes.fromhex(tc["blindpubnonce"])
pk = PlainPk(bytes.fromhex(tc["pk"]))
tweaks = fromhex_all(tc["tweaks"])
is_xonly = tc["is_xonly"]
extra_in = None if tc["extra_in"] is None else bytes.fromhex(tc["extra_in"])
expected_a = bytes.fromhex(tc["expected_blindfactor"])
expected_e = bytes.fromhex(tc["expected_challenge"])
expected_R = bytes.fromhex(tc["expected_pubnonce"])
expected_e_prime = bytes.fromhex(tc["expected_blindchallenge"])
expected_pk_parity = bool(tc["expected_pk_parity"])
expected_nonce_parity = bool(tc["expected_nonce_parity"])
session_ctx, blindchallenge, pk_parity, nonce_parity = blind_challenge_gen_internal(
rand, msg, blindpubnonce, pk, tweaks, is_xonly, extra_in
)
# Check tuple outputs
assert blindchallenge == expected_e_prime
assert pk_parity == expected_pk_parity
assert nonce_parity == expected_nonce_parity
# Check session_ctx fields
pk_sc, blindfactor_sc, challenge_sc, pubnonce_sc, tweaks_sc, is_xonly_sc = session_ctx
assert pk_sc == pk
assert blindfactor_sc == expected_a
assert challenge_sc == expected_e
assert pubnonce_sc == expected_R
assert tweaks_sc == tweaks
assert is_xonly_sc == is_xonly
# Extra sanity: recompute Q and e and compare
Q, gacc, tacc = pubkey_and_tweak(pk, tweaks, is_xonly)
R = cpoint(expected_R)
e_check = tagged_hash("BIP0340/challenge", xbytes(R) + xbytes(Q) + msg)
assert e_check == expected_e
# Length sanity
assert len(expected_a) == 32
assert len(expected_e) == 32
assert len(expected_R) == 33
assert len(expected_e_prime) == 32
# ---------- Error cases ----------
for tc in tv.get("error_test_cases", []):
rand = bytes.fromhex(tc["rand"])
msg = bytes.fromhex(tc["msg"]) if tc["msg"] != "" else b""
blindpubnonce = bytes.fromhex(tc["blindpubnonce"])
pk = PlainPk(bytes.fromhex(tc["pk"]))
tweaks = fromhex_all(tc["tweaks"])
is_xonly = tc["is_xonly"]
extra_in = None if tc["extra_in"] is None else bytes.fromhex(tc["extra_in"])
err = tc["error"]
err_type = err["type"]
err_message = err.get("message")
raised = False
try:
_ = blind_challenge_gen_internal(rand, msg, blindpubnonce, pk, tweaks, is_xonly, extra_in)
except Exception as e:
raised = True
# Type check
assert e.__class__.__name__ == err_type
# Optional substring match on message, if provided
if err_message is not None:
assert err_message in str(e)
assert raised, "Expected an exception but none was raised"
def test_blind_sign_and_verify_vectors():
with open(os.path.join(sys.path[0], 'vectors', 'blind_sign_and_verify_vectors.json')) as f:
tv = json.load(f)
# ------------------ Valid ------------------
for test_case in tv["valid_test_cases"]:
sk = hx(test_case["sk"])
pk = PlainPk(hx(test_case["pk"]))
blindsecnonce_all = hx(test_case["blindsecnonce"])
blindpubnonce = hx(test_case["blindpubnonce"])
blindchallenge = hx(test_case["blindchallenge"])
pk_parity = bool(test_case["pk_parity"])
nonce_parity = bool(test_case["nonce_parity"])
# R' consistency check: cbytes(k'*G) == blindpubnonce
k_ = Scalar.from_bytes_checked(blindsecnonce_all[0:32])
R_prime = k_ * G
assert cbytes(R_prime) == blindpubnonce
expected_sprime = hx(test_case["expected"]["blindsignature"])
# Copy because blind_sign zeroizes the first 64 bytes of the buffer
secnonce_buf = bytearray(blindsecnonce_all)
s_prime = blind_sign(sk, blindchallenge, secnonce_buf, pk_parity, nonce_parity)
assert s_prime == expected_sprime
checks = test_case.get("checks", {})
if checks.get("secnonce_prefix_zeroed_after_sign", False):
assert all(b == 0 for b in secnonce_buf[:64])
if checks.get("verify_returns_true", True):
ok = verify_blind_signature(pk, blindpubnonce, blindchallenge, s_prime, pk_parity, nonce_parity)
assert ok is True
if checks.get("second_call_raises_valueerror", False):
# Reuse the same (now zeroized) buffer; must raise
def try_again():
blind_sign(sk, blindchallenge, secnonce_buf, pk_parity, nonce_parity)
raised = False
try:
try_again()
except ValueError:
raised = True
assert raised, "Expected ValueError on nonce reuse"
# ------------------ Sign errors (exceptions) ------------------
for test_case in tv.get("sign_error_test_cases", []):
exception, except_fn = get_error_details(test_case)
sk = hx(test_case["sk"])
blindsecnonce_all = hx(test_case["blindsecnonce"])
blindchallenge = hx(test_case["blindchallenge"])
pk_parity = bool(test_case["pk_parity"])
nonce_parity = bool(test_case["nonce_parity"])
repeat = int(test_case.get("repeat", 1))
if repeat == 1:
# Single-call error (e.g., out-of-range e')
assert_raises(exception, lambda: blind_sign(sk, blindchallenge, bytearray(blindsecnonce_all), pk_parity, nonce_parity), except_fn)
else:
# Two-call error (nonce reuse)
buf = bytearray(blindsecnonce_all)
# First call should succeed
_ = blind_sign(sk, blindchallenge, buf, pk_parity, nonce_parity)
# Second call must raise
assert_raises(exception, lambda: blind_sign(sk, blindchallenge, buf, pk_parity, nonce_parity), except_fn)
# ------------------ Verify returns False (no exception) ------------------
for test_case in tv.get("verify_fail_test_cases", []):
pk = PlainPk(hx(test_case["pk"]))
blindpubnonce = hx(test_case["blindpubnonce"])
blindchallenge = hx(test_case["blindchallenge"])
blindsignature = hx(test_case["blindsignature"])
pk_parity = bool(test_case["pk_parity"])
nonce_parity = bool(test_case["nonce_parity"])
assert verify_blind_signature(pk, blindpubnonce, blindchallenge, blindsignature, pk_parity, nonce_parity) is False
# ------------------ Verify errors (exceptions) ------------------
for test_case in tv.get("verify_error_test_cases", []):
exception, except_fn = get_error_details(test_case)
pk = PlainPk(hx(test_case["pk"]))
blindpubnonce = hx(test_case["blindpubnonce"])
blindchallenge = hx(test_case["blindchallenge"])
blindsignature = hx(test_case["blindsignature"])
pk_parity = bool(test_case["pk_parity"])
nonce_parity = bool(test_case["nonce_parity"])
assert_raises(exception, lambda: verify_blind_signature(pk, blindpubnonce, blindchallenge, blindsignature, pk_parity, nonce_parity), except_fn)
def test_unblind_signature_vectors():
with open(os.path.join(sys.path[0], 'vectors', 'unblind_signature_vectors.json')) as f:
tv = json.load(f)
# ---------- Valid ----------
for tc in tv["valid_test_cases"]:
session_ctx = build_session_ctx(tc["session_ctx"])
msg = bytes.fromhex(tc["msg"]) if tc["msg"] != "" else b""
blindsignature = bytes.fromhex(tc["blindsignature"])
expected_sig = bytes.fromhex(tc["expected_bip340_sig"])
sig = unblind_signature(session_ctx, blindsignature)
assert sig == expected_sig
# Verify BIP340 with tweaked Q
pk, _, _, _, tweaks, is_xonly = session_ctx
Q, _, _ = pubkey_and_tweak(pk, tweaks, is_xonly)
assert schnorr_verify(msg, xbytes(Q), sig)
# ---------- Errors ----------
for tc in tv.get("error_test_cases", []):
session_ctx = build_session_ctx(tc["session_ctx"])
msg = bytes.fromhex(tc["msg"]) if tc["msg"] != "" else b""
blindsignature = bytes.fromhex(tc["blindsignature"])
err = tc["error"]
err_type = err["type"]
err_msg = err.get("message")
raised = False
try:
_ = unblind_signature(session_ctx, blindsignature)
except Exception as e:
raised = True
assert e.__class__.__name__ == err_type
if err_msg is not None:
assert err_msg in str(e)
assert raised, "Expected an exception but none was raised"
def test_sign_and_verify_random(iters: int) -> None:
for _ in range(iters):
sk = Scalar.from_bytes_wrapping(secrets.token_bytes(32))
pk = individual_pk(sk.to_bytes())
msg = Scalar.from_bytes_wrapping(secrets.token_bytes(32))
v = secrets.randbelow(4)
tweaks = [secrets.token_bytes(32) for _ in range(v)]
tweak_modes = [secrets.choice([False, True]) for _ in range(v)]
Q, _, _ = pubkey_and_tweak(pk, tweaks, tweak_modes)
assert not Q.infinity
# Round 1
# Signer
extra_in_1 = secrets.token_bytes(32)
blindsecnonce, blindpubnonce = blind_nonce_gen(sk.to_bytes(), pk, extra_in_1)
# User
extra_in_2 = secrets.token_bytes(32)
session_ctx, blindchallenge, pk_parity, nonce_parity = blind_challenge_gen(msg.to_bytes(), blindpubnonce, pk, tweaks, tweak_modes, extra_in_2)
# Round 2
# Signer
blindsignature = blind_sign(sk.to_bytes(), blindchallenge, blindsecnonce, pk_parity, nonce_parity)
# User
sig = unblind_signature(session_ctx, blindsignature)
assert schnorr_verify(msg.to_bytes(), xbytes(Q), sig)
def compute_bip32_tweak(xpub: ExtendedPublicKey, path: Sequence[int]) -> Tuple[int, ExtendedPublicKey]:
"""Compute the CCD tweak scalar for a non-hardened derivation path."""
aggregate = 0
current = xpub
for index in path:
tweak, child = current.derive_child(index)
aggregate = (aggregate + tweak) % CURVE_N
current = child
return aggregate, current
def input_verification(
descriptor_template: SortedMultiDescriptorTemplate,
witness_script: Optional[bytes],
tweaks: Mapping[bytes, int],
) -> bool:
"""Check that an input script matches the tweaked policy from CCD data."""
return _verify_tweaked_descriptor(
descriptor_template,
witness_script,
tweaks,
)
def change_output_verification(
descriptor_template: SortedMultiDescriptorTemplate,
witness_script: Optional[bytes],
tweaks: Mapping[bytes, int],
) -> bool:
"""Validate a change output script using delegated CCD tweak data."""
return _verify_tweaked_descriptor(
descriptor_template,
witness_script,
tweaks,
)
def _verify_tweaked_descriptor(
descriptor_template: SortedMultiDescriptorTemplate,
witness_script: Optional[bytes],
tweaks: Mapping[bytes, int],
) -> bool:
if witness_script is None or not tweaks:
return False
if descriptor_template.threshold > len(tweaks):
return False
tweaked_keys: List[bytes] = []
for base_key, tweak in sorted(tweaks.items(), key=lambda item: item[0]):
if len(base_key) != 33:
return False
tweaked_key = apply_tweak_to_public(base_key, tweak % CURVE_N)
tweaked_keys.append(tweaked_key)
try:
expected_witness_script = descriptor_template.witness_script(tweaked_keys)
except ValueError:
return False
return witness_script == expected_witness_script
def delegator_sign(
tweak: int,
base_secret: int,
message: bytes,
) -> bytes:
"""Derive the delegated key, sign ``message``, and return signature."""
child_secret = int_to_bytes(apply_tweak_to_secret(base_secret, tweak), 32)
message_digest = hashlib.sha256(message).digest()
signature = schnorr_sign(message_digest, child_secret, bytes(32))
return signature
def test_compute_tweak_vectors() -> None:
with open(os.path.join(sys.path[0], 'vectors', 'compute_bip32_tweak_vectors.json')) as f:
data = json.load(f)
default_xpub_data = data.get("xpub")
if default_xpub_data is None:
raise AssertionError("compute_bip32_tweak_vectors.json missing top-level 'xpub'")
for case in data.get("valid_test_cases", []):
xpub_data = case.get("xpub", default_xpub_data)
xpub = parse_extended_public_key(xpub_data)
path = decode_path(case.get("path", []))
expected = case.get("expected")
if not isinstance(expected, Mapping):
raise AssertionError("valid compute_tweak case missing 'expected'")
tweak_hex = expected.get("tweak")
if not isinstance(tweak_hex, str):
raise AssertionError("expected 'tweak' must be a string")
derived = expected.get("derived_xpub", {})
derived_compressed = derived.get("compressed")
if not isinstance(derived_compressed, str):
raise AssertionError("expected 'derived_xpub.compressed' must be a string")
derived_chain_code = derived.get("chain_code")
if not isinstance(derived_chain_code, str):
raise AssertionError("expected 'derived_xpub.chain_code' must be a string")
tweak, child = compute_bip32_tweak(xpub, path)
actual_tweak_hex = f"{tweak:064x}"
if actual_tweak_hex != tweak_hex.lower():
raise AssertionError(f"tweak mismatch: expected {tweak_hex}, got {actual_tweak_hex}")
actual_compressed = compress_point(child.point).hex()
actual_chain_code = child.chain_code.hex()
if actual_compressed != derived_compressed.lower():
raise AssertionError("derived public key mismatch")
if actual_chain_code != derived_chain_code.lower():
raise AssertionError("derived chain code mismatch")
for case in data.get("error_test_cases", []):
xpub_data = case.get("xpub", default_xpub_data)
xpub = parse_extended_public_key(xpub_data)
path = decode_path(case.get("path", []))
error_spec = case.get("error", {})
exc_type, message = resolve_error_spec(error_spec)
try:
compute_bip32_tweak(xpub, path)
except exc_type as exc:
if message and message.lower() not in str(exc).lower():
raise AssertionError(f"expected error containing '{message}' but got '{exc}'")
else:
raise AssertionError("expected failure but case succeeded")
def test_delegator_sign_vectors() -> None:
with open(os.path.join(sys.path[0], 'vectors', 'delegator_sign_vectors.json')) as f:
data = json.load(f)
for case in data.get("test_cases", []):
base_secret_hex = case.get("base_secret")
tweak_hex = case.get("tweak")
message_hex = case.get("message")
base_secret = int(base_secret_hex, 16)
tweak = int(tweak_hex, 16)
message = message_hex.encode('utf-8')
expected = case.get("expected")
if not isinstance(expected, Mapping):
raise AssertionError("delegator_sign case missing 'expected'")
expected_signature_hex = expected.get("signature")
if not isinstance(expected_signature_hex, str):
raise AssertionError("expected 'signature' must be a string")
expected_signature = bytes.fromhex(expected_signature_hex)
signature = delegator_sign(
tweak,
base_secret,
message,
)
if signature != expected_signature:
raise AssertionError("signature mismatch")
def test_input_verification_vectors() -> None:
with open(os.path.join(sys.path[0], 'vectors', 'input_verification_vectors.json')) as f:
data = json.load(f)
for case in data.get("test_cases", []):
descriptor = SortedMultiDescriptorTemplate(threshold=2)
witness_hex = case.get("witness_script")
# Get the tweak map of the bare public keys to the BIP 32 tweak
tweaks_raw = case.get("tweak_map", {})
tweaks = parse_tweak_map(tweaks_raw)
expected_bool = bool(case.get("expected", False))
result = input_verification(
descriptor,
bytes.fromhex(witness_hex),
tweaks,
)
if result != expected_bool:
raise AssertionError(
f"input_verification result {result} did not match expected {expected_bool}"
)
def test_change_output_verification_vectors() -> None:
with open(os.path.join(sys.path[0], 'vectors', 'change_output_verification_vectors.json')) as f:
data = json.load(f)
for case in data.get("test_cases", []):
descriptor = SortedMultiDescriptorTemplate(threshold=2)
witness_hex = case.get("witness_script")
# Get the tweak map of the bare public keys to the BIP 32 tweak
tweaks_raw = case.get("tweak_map", {})
tweaks = parse_tweak_map(tweaks_raw)
expected_bool = bool(case.get("expected", False))
result = change_output_verification(
descriptor,
bytes.fromhex(witness_hex),
tweaks,
)
if result != expected_bool:
raise AssertionError(
f"change_output_verification result {result} did not match expected {expected_bool}"
)
def parse_tweak_map(raw: Mapping[str, object]) -> Dict[bytes, int]:
tweaks: Dict[bytes, int] = {}
for key_hex, tweak_hex in raw.items():
base_key = bytes.fromhex(key_hex)
if not isinstance(tweak_hex, str):
raise ValueError(f"tweak value for key {key_hex} must be a string")
tweak_value = int(tweak_hex, 16)
tweaks[base_key] = tweak_value % CURVE_N
return tweaks
def resolve_error_spec(raw: object) -> Tuple[type[Exception], Optional[str]]:
mapping: Dict[str, type[Exception]] = {"value": ValueError, "assertion": AssertionError, "runtime": RuntimeError}
if not isinstance(raw, dict):
return ValueError, None
raw_dict = cast(Dict[str, Any], raw)
name = str(raw_dict.get("type", "value")).lower()
message = raw_dict.get("message")
exc_type = mapping.get(name, ValueError)
return exc_type, None if message is None else str(message)
if __name__ == '__main__':
test_blind_nonce_gen_vectors()
test_blind_challenge_gen_vectors()
test_blind_sign_and_verify_vectors()
test_unblind_signature_vectors()
test_sign_and_verify_random(6)
test_compute_tweak_vectors()
test_delegator_sign_vectors()
test_input_verification_vectors()
test_change_output_verification_vectors()
print("All tests passed")