#!/usr/bin/env python3 """ Bitcoin-verified OpenTimestamps height — pure Python 3 stdlib, no dependencies. The point of this module: a checkpoint's claimed Bitcoin block height is worthless for a trustless tie-break unless the .ots proof actually commits to *that* block. A self-consistent proof can walk cleanly to a BitcoinBlockHeaderAttestation at a real height and still carry a merkle root that is NOT the block's — which is exactly the failure mode a malicious server (or a malformed export) produces. Reading the height field, or even re-deriving the proof internally, does not catch it. `verify_ots_file` closes that gap: it walks the detached .ots, computes the merkle root the proof commits to, fetches the REAL merkle root of the block at the claimed height from independent explorers, and returns the height ONLY if they match. No match, no height — the anchor cannot be relied on to order anything. This is the verifier-side analogue of the server's confirm rule: never trust a field you can recompute, and recompute against ground truth, not against the proof itself. """ import json import hashlib import urllib.request # OTS Bitcoin attestation magic tag. _BITCOIN = bytes.fromhex("0588960d73d71901") # Detached-file header: magic + version(0x01) is consumed by the caller; the body # is op_sha256(0x08) + 32-byte digest + timestamp proof. _HEADER = b"\x00OpenTimestamps\x00\x00Proof\x00\xbf\x89\xe2\xe8\x84\xe8\x92\x94" def _varint(b, p): res = shift = 0 while p < len(b): x = b[p]; p += 1 res |= (x & 0x7F) << shift if not (x & 0x80): break shift += 7 return res, p def _walk(buf, msg): """Apply the proof's ops to `msg`; collect (height, committed_root_hex) for every BitcoinBlockHeaderAttestation. `root` is in explorer byte order (big-endian).""" out = [] def rec(p, m): n = len(buf) while p < n: more = buf[p] == 0xFF if more: p += 1 p = child(p, m) if not more: return p return p def child(p, m): t = buf[p]; p += 1 if t == 0x00: # attestation magic = buf[p:p + 8]; p += 8 ln, p = _varint(buf, p) payload = buf[p:p + ln]; p += ln if magic == _BITCOIN: h, _ = _varint(payload, 0) out.append((h, m[::-1].hex())) return p if t in (0xF0, 0xF1): # append / prepend ln, p = _varint(buf, p) operand = buf[p:p + ln]; p += ln m2 = m + operand if t == 0xF0 else operand + m elif t == 0x08: m2 = hashlib.sha256(m).digest() elif t == 0x67: m2 = hashlib.new("ripemd160", m).digest() elif t == 0x03: m2 = hashlib.sha1(m).digest() else: return p # unknown op: stop this branch return rec(p, m2) rec(0, msg) return out def attestations_from_ots(ots_bytes): """Parse a detached .ots file → list of (height, committed_root_hex). Raises ValueError on a bad header.""" if not ots_bytes.startswith(_HEADER): raise ValueError("not an OpenTimestamps detached proof (bad magic)") p = len(_HEADER) p += 1 # version file_op = ots_bytes[p]; p += 1 # 0x08 sha256 file-hash op if file_op != 0x08: raise ValueError(f"unexpected file-hash op 0x{file_op:02x}") digest = ots_bytes[p:p + 32]; p += 32 proof = ots_bytes[p:] return _walk(proof, digest) def real_block_merkle_root(height, fetch=None): """Real merkle root of mainnet block `height` (explorer byte order), agreed by >=2 independent explorers. `fetch` is an injectable (url)->bytes for testing.""" if fetch is None: def _http(url): return urllib.request.urlopen(url, timeout=12).read() fetch = _http explorers = [ "https://blockstream.info/api", "https://mempool.space/api", ] seen = {} for base in explorers: try: bh = fetch(f"{base}/block-height/{height}").decode().strip() if len(bh) != 64: continue blk = json.loads(fetch(f"{base}/block/{bh}")) root = blk.get("merkle_root") if root: seen[root] = seen.get(root, 0) + 1 except Exception: continue for root, count in seen.items(): if count >= 2: return root return None def check_ots(ots_bytes, fetch=None): """Inspect a detached .ots and return (status, height): 'verified' — a BitcoinBlockHeaderAttestation's committed root EQUALS the real block's root at its height; `height` is that block (lowest). 'mismatch' — an attestation's committed root differs from the real block's: a forged or malformed proof. `height` is the claimed block. 'unfetchable' — the proof parses and carries a Bitcoin attestation, but no block explorer could be reached to confirm it. INCONCLUSIVE, not a fault (a real merkle root is consensus data; explorers don't disagree on it, so a None real-root means unreachable, not a genuine conflict). 'no_attestation' — parses but carries no BitcoinBlockHeaderAttestation (still pending). 'bad_format' — not a valid detached OpenTimestamps proof. Distinguishing 'mismatch' (alarm) from 'unfetchable' (retry) is what keeps a monitoring cron from crying forgery on a transient explorer outage. """ try: atts = attestations_from_ots(ots_bytes) except ValueError: return ("bad_format", None) if not atts: return ("no_attestation", None) any_fetched = False claimed = None for height, committed_root in sorted(atts): claimed = claimed if claimed is not None else height real = real_block_merkle_root(height, fetch=fetch) if real is None: continue any_fetched = True if real == committed_root: return ("verified", height) return ("mismatch", claimed) if any_fetched else ("unfetchable", claimed) def verify_ots_file(ots_bytes, fetch=None): """Bitcoin-verified height of a detached .ots, or None if it does not verify (for any reason). Use check_ots() when you need to tell a forgery from an unreachable explorer.""" status, height = check_ots(ots_bytes, fetch=fetch) return height if status == "verified" else None def _encode_varint(x): out = bytearray() while True: b = x & 0x7F x >>= 7 out.append(b | (0x80 if x else 0)) if not x: return bytes(out) def _selftest(): """Offline: a proof whose committed root matches the real block verifies; the same proof against a different real root (the malformed-export / forgery case) does not; and a byte-mutation fuzz confirms no corrupted proof is ever accepted.""" import random digest = bytes(range(32)) committed_root = digest[::-1].hex() # no ops -> root is the digest itself height = 955202 # detached file: header + version + sha256-file-op + digest + (bitcoin attestation) att = b"\x00" + _BITCOIN + _encode_varint(len(_encode_varint(height))) + _encode_varint(height) ots = _HEADER + b"\x01\x08" + digest + att assert attestations_from_ots(ots) == [(height, committed_root)], "parse/walk mismatch" # Model a real chain: block `height` commits to `committed_root`; every other height # has an effectively-random (here: sha256-derived) root that a mutated proof cannot # collide with. So verification succeeds iff the proof commits to (height, root). def _root_for(h): return committed_root if h == height else hashlib.sha256(str(h).encode()).hexdigest() def fetch(url): if "block-height/" in url: h = url.rsplit("/", 1)[1] return h.rjust(64, "0").encode() # block hash encodes the height h = int(url.rsplit("/", 1)[1].lstrip("0") or "0") return json.dumps({"merkle_root": _root_for(h)}).encode() def fetch_wrong(url): if "block-height/" in url: return b"a" * 64 return json.dumps({"merkle_root": "00" * 32}).encode() assert verify_ots_file(ots, fetch=fetch) == height, "matching root must verify" assert verify_ots_file(ots, fetch=fetch_wrong) is None, "root mismatch must NOT verify (forgery/malformed)" assert verify_ots_file(b"not an ots file", fetch=fetch) is None, "bad magic must return None" # check_ots distinguishes a forgery from an unreachable explorer (so a cron doesn't # alarm on a transient outage). def fetch_down(url): raise OSError("explorer unreachable") assert check_ots(ots, fetch=fetch) == ("verified", height) assert check_ots(ots, fetch=fetch_wrong) == ("mismatch", height) assert check_ots(ots, fetch=fetch_down) == ("unfetchable", height) assert check_ots(b"garbage", fetch=fetch) == ("bad_format", None) # Mutation fuzz: flip/strip/insert bytes across the proof. The soundness property is # NOT "no mutation verifies" — a benign mutation (version byte, trailing bytes the walk # never reaches) leaves the commitment intact and SHOULD still verify. The bug a # verifier must never have is accepting a proof that commits to something OTHER than # the real (height, root): every mutation that verifies must still parse to exactly the # original attestation. rng = random.Random(0xA17) unsound = 0 for _ in range(5000): b = bytearray(ots) for _ in range(rng.randint(1, 3)): op = rng.randrange(3) if op == 0 and b: # flip a byte i = rng.randrange(len(b)); b[i] ^= 1 << rng.randrange(8) elif op == 1 and len(b) > len(_HEADER) + 2: # delete a byte del b[rng.randrange(len(b))] else: # insert a byte b.insert(rng.randrange(len(b) + 1), rng.randrange(256)) m = bytes(b) if m == ots or verify_ots_file(m, fetch=fetch) is None: continue try: ok = attestations_from_ots(m) == [(height, committed_root)] except ValueError: ok = False if not ok: unsound += 1 # verified a changed commitment assert unsound == 0, f"{unsound} corrupted proof(s) verified against a commitment they don't carry" print("ots_verify selftest: OK (incl. 5000-mutation soundness fuzz)") if __name__ == "__main__": _selftest()