KB-78AB rev 2

FIX7 Real-N6 packet — real_n6_provenance_verifier.py

21 min read Revision 2
tool-kiem-thufix7n6real-n6tkt-v022026-06-11

#!/usr/bin/env python3

============================================================================

FIX7 REAL-N6 PROVENANCE VERIFIER (TKT v0.2-aligned, fail-closed)

Purpose: decide whether N6 = active_corpus_sha256 can be emitted as a REAL,

NON-REHEARSAL ENGINEERING_VERIFIED_CANDIDATE from governed evidence, and if so

emit a machine-readable provenance certificate. It NEVER emits an official

seal, never creates N7/N8/P7, never implies owner/Codex authority.

Grounded BYTE-FOR-FACT in the governed canonicalizer SSOT (N2 rev3):

evidence/fix7_canon_v1_ssot_extended.py sha256 d9caa9fe...26f3e5

canonicalizer-fix7-canon-v1-ssot.md sha256 49c386a9...b734d0

It does NOT re-implement the canonicalization math; it PINS the governed

algorithm by hash, then runs it. The corpus is the 10 frozen active docs.

Provenance model consumed verbatim from the authority encoder (S3/S4):

PROVENANCE_ALLOWED_REAL_CORPUS = {ENGINEERING_VERIFIED_CANDIDATE, OFFICIAL_PIN}

REHEARSAL / FORBIDDEN_FOR_REAL_SEAL / missing / unknown -> never real.

OFFICIAL_PIN is an authority class: in THIS lane (no owner/Codex) it is

rejected unless an explicit authority token is supplied (which it is not).

N-number table consumed as ENGINEERING CONVENTION ONLY (not owner/Codex seal):

reconciliation rev1: N6 = active_corpus_sha256, load-bearing, deps [N1].

exit 0 + certificate iff EVERY gate passes; nonzero (no certificate) otherwise.

rev2 (2026-06-11): closes Codex CODEX-N6-DUP-DECLARED / CODEX-N6-DUP-MANIFEST

fail-opens. parse_hash_manifest() now rejects duplicate paths, malformed

lines, and non-64-lowercase-hex hashes BEFORE dict insertion;

verify_corpus_membership() rejects duplicate declared member ids BEFORE the

set conversion. No certificate is emitted on any of these. Selftest 14->19.

============================================================================

import hashlib, json, os, re, sys, importlib.util, tempfile, shutil

---- pinned governed constants (grounded; NOT invented) --------------------

CANONICALIZER_PY_SHA256 = "d9caa9fe9f46854c38c996747d50d2e73bc5074705730e869fd6b1f8cc26f3e5" SSOT_MD_SHA256 = "49c386a9b9666c09786fc4f89bc79776b6046eaee6f4da6d8537d2c753b734d0" MEMBERSHIP_PIN = "f2bda8effc7be19b54722828126b82d7d2d48bee5e5e5dc0c8f347ce210fe251" EXPECTED_N6 = "d777e87c73d3b62d36789d9343f346102e98dbf301f2c93f7608470b876b258c" BLUEPRINT_PREFIX = ("knowledge/dev/reports/architecture/" "t1-fix7-existing-system-refactor-execution-blueprint-2026-06-08/") CORPUS_MEMBERS = [ "00-readme-first.md", "01-live-existing-system-inventory.md", "02-design-to-live-mapping.md", "03-gap-classification.md", "04-dependency-safe-construction-order.md", "05-rollback-blueprint.md", "06-test-guard-blueprint.md", "07-implementation-package-split.md", "08-hard-blocks-do-not-touch-list.md", "12-final-verdict.md", ]

rehearsal fixture sentinels (authority_seal_encoder FX values) -- a real N6 may

NEVER equal one of these; if produce ever yields one, it is rehearsal laundering.

REHEARSAL_FIXTURE_SENTINELS = {c * 64 for c in "0123456789abcdef"} PROVENANCE_ALLOWED_REAL_CORPUS = {"ENGINEERING_VERIFIED_CANDIDATE", "OFFICIAL_PIN"} KNOWN_PROVENANCE = { "ENGINEERING_VERIFIED_CANDIDATE", "REHEARSAL", "AUTHORITY_INPUT", "CODEX_AUTHORED", "OFFICIAL_PIN", "FORBIDDEN_FOR_REAL_SEAL", } SEG = re.compile(r"^[A-Za-z0-9.-]+$") CERT_DATE = "2026-06-11" # fixed (deterministic; no wall-clock so RERUN is byte-stable)

class N6Reject(Exception): def init(self, status, detail=""): super().init(f"{status}: {detail}") self.status = status

def sha_bytes(b): return hashlib.sha256(b).hexdigest() def sha_file(p): return sha_bytes(open(p, "rb").read())

---- gate 1: governed algorithm pin ---------------------------------------

def load_canonicalizer(py_path): if not os.path.isfile(py_path): raise N6Reject("N6_ALGORITHM_FILE_MISSING", py_path) got = sha_file(py_path) if got != CANONICALIZER_PY_SHA256: raise N6Reject("N6_ALGORITHM_HASH_MISMATCH", f"{got} != pinned {CANONICALIZER_PY_SHA256}") spec = importlib.util.spec_from_file_location("fix7_canon_governed", py_path) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) return mod

---- gate 2: explicit, governed, byte-exact corpus membership -------------

def parse_hash_manifest(hm_path): if not os.path.isfile(hm_path): raise N6Reject("N6_HASH_MANIFEST_MISSING", hm_path) entries = {} for lineno, raw in enumerate(open(hm_path), 1): line = raw.rstrip("\n") if not line.strip() or line.lstrip().startswith("#"): continue h, sep, rel = line.partition(" ") # CODEX-N6-DUP-MANIFEST fix: a duplicate path, a malformed line, or a # non-64-lowercase-hex hash must FAIL CLOSED before dictionary insertion. # shasum -c alone does not catch a duplicate path, and a plain dict # silently overwrites it -> the verifier would certify an ambiguous corpus. if sep != " " or not rel.strip(): raise N6Reject("N6_MANIFEST_MALFORMED_LINE", f"line {lineno}: {line!r}") rel = rel.strip() if not re.fullmatch(r"[0-9a-f]{64}", h): raise N6Reject("N6_MANIFEST_BAD_HASH", f"line {lineno}: {h!r}") if rel in entries: raise N6Reject("N6_MANIFEST_DUPLICATE", f"duplicate manifest path {rel!r} (line {lineno})") entries[rel] = h return entries

def _check_member_path(name): # reject path traversal / alias / non-.md / non-ASCII (homoglyph) member ids if name == "" or name is None: raise N6Reject("N6_CORPUS_PATH_ALIAS", "empty member name") for ch in name: if ord(ch) > 0x7F: raise N6Reject("N6_CORPUS_PATH_ALIAS", f"non-ASCII in {name!r}") if ord(ch) in (0x09, 0x0A, 0x0D, 0x00, 0x5C): raise N6Reject("N6_CORPUS_PATH_ALIAS", f"ctrl/backslash in {name!r}") if name.startswith("/"): raise N6Reject("N6_CORPUS_PATH_TRAVERSAL", f"leading slash {name!r}") if "//" in name: raise N6Reject("N6_CORPUS_PATH_TRAVERSAL", f"empty segment // {name!r}") for seg in name.split("/"): if seg in (".", ".."): raise N6Reject("N6_CORPUS_PATH_TRAVERSAL", f"dot segment {seg!r}") if not _SEG.match(seg): raise N6Reject("N6_CORPUS_PATH_ALIAS", f"bad segment {seg!r}") if not name.endswith(".md"): raise N6Reject("N6_CORPUS_PATH_ALIAS", f"not .md {name!r}")

def verify_corpus_membership(corpus_dir, hm_entries, declared_members): """Fail-closed: every declared member must be present, .md, governed (pinned in HASH_MANIFEST under docs/<name>), and byte-exact to the pin. Detects MISSING / EXTRA / DUPLICATE / PATH-TRAVERSAL / PATH-ALIAS / STALE-HASH / BYTE-HASH-MISMATCH / MANIFEST-MISSING-ENTRY.""" if not os.path.isdir(corpus_dir): raise N6Reject("N6_CORPUS_DIR_MISSING", corpus_dir) listed = sorted(f for f in os.listdir(corpus_dir) if f.lower().endswith(".md")) # prose-only / empty corpus -> reject (we require real bytes on disk) if not listed: raise N6Reject("N6_CORPUS_PROSE_ONLY", "no .md files on disk") # duplicate (case-variant) detection seen = {} for n in listed: k = n.lower() if k in seen: raise N6Reject("N6_CORPUS_DUPLICATE", f"{n} ~ {seen[k]}") seen[k] = n declared = list(declared_members) # CODEX-N6-DUP-DECLARED fix: reject duplicate declared member ids BEFORE the # set conversion below. A set silently absorbs duplicates, so without this a # roster like CORPUS_MEMBERS + [CORPUS_MEMBERS[0]] would pass and be certified. _dup_seen = set() for _m in declared: if _m in _dup_seen: raise N6Reject("N6_CORPUS_DUPLICATE", f"duplicate declared member {_m!r}") _dup_seen.add(_m) declared_set = set(declared) listed_set = set(listed) missing = declared_set - listed_set if missing: raise N6Reject("N6_CORPUS_MEMBER_MISSING", f"{sorted(missing)}") extra = listed_set - declared_set if extra: raise N6Reject("N6_CORPUS_MEMBER_EXTRA", f"{sorted(extra)}") per_doc_hashes = {} for name in declared: _check_member_path(name) rel = "docs/" + name path = os.path.join(corpus_dir, name) if not os.path.isfile(path): raise N6Reject("N6_CORPUS_MEMBER_MISSING", name) if rel not in hm_entries: raise N6Reject("N6_MANIFEST_MISSING_MEMBER", rel) got = sha_file(path) if got != hm_entries[rel]: raise N6Reject("N6_CORPUS_HASH_MISMATCH", f"{name}: {got} != pinned {hm_entries[rel]}") per_doc_hashes[name] = got return per_doc_hashes

---- gate 3: provenance class ---------------------------------------------

def assert_provenance(provenance_class, authority_token=None): if provenance_class is None or provenance_class == "": raise N6Reject("N6_PROVENANCE_MISSING", "no provenance class") if provenance_class not in KNOWN_PROVENANCE: raise N6Reject("N6_PROVENANCE_UNKNOWN_CLASS", repr(provenance_class)) if provenance_class == "REHEARSAL": raise N6Reject("N6_PROVENANCE_REHEARSAL_BLOCKED", "rehearsal evidence may not be a real N6 candidate") if provenance_class == "FORBIDDEN_FOR_REAL_SEAL": raise N6Reject("N6_PROVENANCE_FORBIDDEN_CLASS", provenance_class) if provenance_class not in PROVENANCE_ALLOWED_REAL_CORPUS: raise N6Reject("N6_PROVENANCE_FORBIDDEN_CLASS", f"{provenance_class} not in {sorted(PROVENANCE_ALLOWED_REAL_CORPUS)}") if provenance_class == "OFFICIAL_PIN" and not authority_token: # an OFFICIAL_PIN is an authority act; this engineering lane has none. raise N6Reject("N6_OFFICIAL_PIN_WITHOUT_AUTHORITY", "OFFICIAL_PIN requires owner/Codex authority not present in this lane")

---- certificate ----------------------------------------------------------

def _binding_input(cert): # deterministic canonical JSON over the load-bearing fields (excl. the binding) payload = {k: cert[k] for k in cert if k != "cert_binding_sha256"} return json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")

def make_certificate(n6_value, per_doc, membership_value, algo_sha, ssot_sha, provenance_class, source_kind): cert = { "certificate_kind": "FIX7_REAL_N6_PROVENANCE_CERTIFICATE_V1", "date": CERT_DATE, "node": "N6", "node_name": "active_corpus_sha256", "n6_active_corpus_sha256": n6_value, "provenance_class": provenance_class, "source_kind": source_kind, "authority": "NOT_A_SEAL", "is_official_pin": False, "creates_n7_n8_p7": False, "owner_codex_required_for_promotion": True, "n_number_table": "ENGINEERING_CONVENTION_ONLY_NOT_RATIFIED", "governed_corpus_membership_sha256": membership_value, "membership_frozen_pin_match": (membership_value == MEMBERSHIP_PIN), "canonicalizer_algorithm_sha256": algo_sha, "canonicalizer_ssot_md_sha256": ssot_sha, "blueprint_prefix": BLUEPRINT_PREFIX, "corpus_members": list(CORPUS_MEMBERS), "per_doc_normalized_active_content_sha256": per_doc, "expected_n6_crosscheck": EXPECTED_N6, "n6_matches_on_record_candidate": (n6_value == EXPECTED_N6), "standing_authority_blocker": "SEAL_REAL_N6_NOT_AVAILABLE (authority half: owner/Codex inputs + ratification)", } cert["cert_binding_sha256"] = sha_bytes(_binding_input(cert)) return cert

def verify_certificate(cert): """Recompute the binding digest; True iff the certificate is intact.""" try: return cert.get("cert_binding_sha256") == sha_bytes(_binding_input(cert)) except Exception: return False

---- the verifier ----------------------------------------------------------

def verify_real_n6(corpus_dir, canonicalizer_py, ssot_md, hash_manifest, provenance_class="ENGINEERING_VERIFIED_CANDIDATE", source_kind="GOVERNED_KB", authority_token=None, declared_members=None): declared_members = declared_members or CORPUS_MEMBERS # gate 0: source must be governed (not local-only / not prose-only) if source_kind != "GOVERNED_KB": raise N6Reject("N6_SOURCE_NOT_GOVERNED", f"source_kind={source_kind!r}; only GOVERNED_KB allowed") # gate 3: provenance class (early, cheap, before compute) assert_provenance(provenance_class, authority_token) # gate 1: pin + load governed algorithm mod = load_canonicalizer(canonicalizer_py) if os.path.isfile(ssot_md): ssot_sha = sha_file(ssot_md) if ssot_sha != SSOT_MD_SHA256: raise N6Reject("N6_SSOT_MD_HASH_MISMATCH", f"{ssot_sha} != {SSOT_MD_SHA256}") else: raise N6Reject("N6_SSOT_MD_MISSING", ssot_md) # gate 2: explicit, governed, byte-exact corpus membership hm = parse_hash_manifest(hash_manifest) per_doc_file_hashes = verify_corpus_membership(corpus_dir, hm, declared_members) # gate 4: run the GOVERNED produce path; require corpus_ok + membership_frozen_ok res = mod.produce(corpus_dir, ssot_md) if not res.get("corpus_ok"): raise N6Reject("N6_PRODUCE_CORPUS_NOT_OK", str(res.get("errors"))) if not res.get("membership_frozen_ok"): raise N6Reject("N6_PRODUCE_MEMBERSHIP_NOT_FROZEN", "membership != frozen pin") membership_value = res["membership_sha256"] if membership_value != MEMBERSHIP_PIN: raise N6Reject("N6_MEMBERSHIP_PIN_MISMATCH", membership_value) n6 = res["active_corpus_sha256"] # gate 5: value sanity -- 64 hex, not a rehearsal fixture sentinel if not (isinstance(n6, str) and re.match(r"^[0-9a-f]{64}$", n6)): raise N6Reject("N6_VALUE_NOT_HEX", repr(n6)) if n6 in REHEARSAL_FIXTURE_SENTINELS: raise N6Reject("N6_VALUE_IS_REHEARSAL_FIXTURE", n6) # gate 6: defence-in-depth cross-check against the on-record candidate if n6 != EXPECTED_N6: raise N6Reject("N6_VALUE_DRIFT", f"{n6} != on-record {EXPECTED_N6}") per_doc = {d: res["per_doc"][d]["normalized_active_content_sha256"] for d in declared_members} # sanity: per-doc digests are deterministic over byte-exact corpus cert = make_certificate(n6, per_doc, membership_value, CANONICALIZER_PY_SHA256, SSOT_MD_SHA256, provenance_class, source_kind) return cert

---- selftest (positive + the fail-closed gates, pure where possible) ------

def _expect(status, fn): try: fn() except N6Reject as e: return e.status == status, e.status return False, "NO_REJECT"

def selftest(): res = [] def chk(name, ok, note=""): res.append((name, ok, note)) # provenance gate (pure) chk("REHEARSAL provenance rejected", *_expect("N6_PROVENANCE_REHEARSAL_BLOCKED", lambda: assert_provenance("REHEARSAL"))) chk("missing provenance rejected", *_expect("N6_PROVENANCE_MISSING", lambda: assert_provenance(None))) chk("unknown provenance rejected", *_expect("N6_PROVENANCE_UNKNOWN_CLASS", lambda: assert_provenance("MADE_UP"))) chk("FORBIDDEN provenance rejected", *_expect("N6_PROVENANCE_FORBIDDEN_CLASS", lambda: assert_provenance("FORBIDDEN_FOR_REAL_SEAL"))) chk("AUTHORITY_INPUT not a corpus class -> rejected", *_expect("N6_PROVENANCE_FORBIDDEN_CLASS", lambda: assert_provenance("AUTHORITY_INPUT"))) chk("OFFICIAL_PIN without authority rejected", *_expect("N6_OFFICIAL_PIN_WITHOUT_AUTHORITY", lambda: assert_provenance("OFFICIAL_PIN"))) try: assert_provenance("ENGINEERING_VERIFIED_CANDIDATE"); ok = True except N6Reject: ok = False chk("ENGINEERING_VERIFIED_CANDIDATE accepted", ok) # path checks (pure) chk("path traversal '..' rejected", *_expect("N6_CORPUS_PATH_TRAVERSAL", lambda: _check_member_path("../x.md"))) chk("path alias non-.md rejected", *_expect("N6_CORPUS_PATH_ALIAS", lambda: _check_member_path("x.txt"))) chk("path leading-slash rejected", _expect("N6_CORPUS_PATH_TRAVERSAL", lambda: _check_member_path("/x.md"))) # certificate binding (pure) cert = make_certificate(EXPECTED_N6, {d: "0" * 64 for d in CORPUS_MEMBERS}, MEMBERSHIP_PIN, CANONICALIZER_PY_SHA256, SSOT_MD_SHA256, "ENGINEERING_VERIFIED_CANDIDATE", "GOVERNED_KB") chk("fresh certificate verifies", verify_certificate(cert)) tampered = dict(cert); tampered["n6_active_corpus_sha256"] = "0" * 64 chk("tampered certificate fails verify", not verify_certificate(tampered)) chk("certificate is not a seal", cert["authority"] == "NOT_A_SEAL" and not cert["is_official_pin"]) chk("certificate does not create N7/N8/P7", cert["creates_n7_n8_p7"] is False) # ---- CODEX-N6-DUP- duplicate/malformed manifest + duplicate declared (fail-closed) ---- _t = tempfile.mkdtemp(prefix="n6st-") try: # clean manifest parses (positive) good_hm = os.path.join(_t, "good.txt") open(good_hm, "w").write(f"{'a'*64} docs/x.md\n{'b'*64} docs/y.md\n") try: parse_hash_manifest(good_hm); ok = True except N6Reject: ok = False chk("clean HASH_MANIFEST parses (positive)", ok) # duplicate manifest path -> reject (CODEX-N6-DUP-MANIFEST) dup_hm = os.path.join(_t, "dup.txt") open(dup_hm, "w").write(f"{'a'*64} docs/x.md\n{'c'*64} docs/x.md\n") chk("duplicate manifest path rejected", *_expect("N6_MANIFEST_DUPLICATE", lambda: parse_hash_manifest(dup_hm))) # malformed manifest line (single space) -> reject mal_hm = os.path.join(_t, "mal.txt") open(mal_hm, "w").write(f"{'a'*64} docs/x.md\n") chk("malformed manifest line rejected", *_expect("N6_MANIFEST_MALFORMED_LINE", lambda: parse_hash_manifest(mal_hm))) # non-64-lowercase-hex hash -> reject bad_hm = os.path.join(_t, "bad.txt") open(bad_hm, "w").write("NOTAHEXHASH docs/x.md\n") chk("non-hex manifest hash rejected", *_expect("N6_MANIFEST_BAD_HASH", lambda: parse_hash_manifest(bad_hm))) # duplicate declared corpus member -> reject (CODEX-N6-DUP-DECLARED) cdir = os.path.join(_t, "docs"); os.makedirs(cdir) open(os.path.join(cdir, "a.md"), "w").write("x") chk("duplicate declared corpus member rejected", *_expect("N6_CORPUS_DUPLICATE", lambda: verify_corpus_membership(cdir, {}, ["a.md", "a.md"]))) finally: shutil.rmtree(_t, ignore_errors=True) npass = sum(1 for _, ok, _ in res if ok) for name, ok, note in res: print(f" [{'PASS' if ok else 'FAIL'}] {name}" + (f" ({note})" if note and not ok else "")) print(f"REAL-N6-VERIFIER SELFTEST: {npass}/{len(res)} PASS") return all(ok for _, ok, _ in res)

def _arg(flag, default=None): a = sys.argv return a[a.index(flag) + 1] if flag in a and a.index(flag) + 1 < len(a) else default

if name == "main": if "--selftest" in sys.argv: sys.exit(0 if selftest() else 1) if "--verify" in sys.argv: corpus = _arg("--corpus") canon = _arg("--canonicalizer") ssot = _arg("--ssot") hm = _arg("--hash-manifest") prov = _arg("--provenance", "ENGINEERING_VERIFIED_CANDIDATE") src = _arg("--source", "GOVERNED_KB") emit = _arg("--emit-cert") try: cert = verify_real_n6(corpus, canon, ssot, hm, provenance_class=prov, source_kind=src) except N6Reject as e: print(f"REAL-N6 VERIFY: REJECTED [{e.status}] {e}") sys.exit(5) out = json.dumps(cert, indent=2, sort_keys=True) + "\n" if emit: open(emit, "w", encoding="utf-8").write(out) print(out) print(f"REAL-N6 VERIFY: OK n6={cert['n6_active_corpus_sha256']} " f"class={cert['provenance_class']} authority={cert['authority']}") sys.exit(0) # default: selftest sys.exit(0 if selftest() else 1)

Back to Knowledge Hub knowledge/dev/reports/architecture/fix7-real-n6-provenance-under-tkt-v02-2026-06-11/real_n6_provenance_verifier.py