KB-2F64

FIX7 Recheck-9 Packet V2 — evidence/canonicalizer-fix7-canon-v1-ssot.md

39 min read Revision 1
<!-- DOC_STATUS: LOAD_BEARING_SSOT_ARTIFACT (canonicalizer; pinned by canonicalizer_sha256 in doc 00 envelope; NOT an active_corpus membership member; hashed as full normalized content) -->

FIX7-CANON-V1 Canonicalizer — Single Source of Truth (executable)

This document is the ONE load-bearing canonical contract (Constitution Article 14 / NT14). Every other description of canonicalization in the blueprint (doc 00 §Canonical hash encoding, the extractor and record-encoding sections, and all report docs) is NON_AUTHORITY_EXPLANATION: it explains this artifact and MUST NOT conflict with it. If any other description conflicts, this artifact wins and G-NO-DUPLICATE-CANONICAL-AUTHORITY fails closed. There is exactly one canonicalizer; no doc, spec, guard, or package may redefine canonicalization.

SSOT identity (pinned in the doc 00 envelope, MANIFEST_BOUND)

field value
canonicalizer_artifact_id FIX7-CANON-V1-CANONICALIZER
canonicalizer_path knowledge/dev/reports/architecture/t1-fix7-existing-system-refactor-execution-blueprint-2026-06-08/canonicalizer-fix7-canon-v1-ssot.md
canonicalizer_version FIX7-CANON-V1
canonicalizer_revision SEAL_AT_CODEX_RECHECK_8 (platform revision of THIS artifact, computed by Codex at the seal — diagnostic+pin; never this artifact's own future revision recorded inside itself)
canonicalizer_sha256 SEAL_AT_CODEX_RECHECK_8 (SHA-256 over this artifact's full MCP bytes, CRLF/CR→LF normalized, at canonicalizer_revision; this artifact does NOT contain its own hash → no self-reference)
nature executable reference code (authoritative) + frozen test vectors

Invocation contract

  • Command: python3 canonicalizer-fix7-canon-v1-ssot.py --selftest
  • Inputs: (a) for --selftest: none (vectors are embedded); (b) for production use: the raw UTF-8 bytes returned by mcp.get_document_for_rewrite(document_id) per active member, plus the explicit active-corpus membership list and the live envelope fields.
  • Outputs: lowercase-hex SHA-256 digests (membership, per-doc normalized_active_content_sha256, active_corpus_sha256, marker_fence_registry_sha256, superseded_boundary_sha256, guard_set_sha256, envelope_manifest_sha256, detached_seal_sha256) and, on any violation, a single fail-closed status string (below). membership is FROZEN; --produce computes N1/N3/N4/N5/N6 + candidate canonicalizer_sha256 deterministically over the PRESENT+VALID members and is FAIL-CLOSED (R9-B2): any missing/extra/duplicate/extract-error/invalid active member suppresses EVERY candidate digest (SUPPRESSED_CORPUS_NOT_OK), forces corpus_ok=false + membership_frozen_ok=false, and exits nonzero (4); N7 envelope_manifest binds sealed inputs and N8 detached_seal is Codex-authored — both produced at the Codex seal, not by --selftest.
  • Exit code: --selftest: 0 iff every embedded test vector passes; non-zero otherwise. --produce: 0 ONLY iff corpus_ok AND membership_frozen_ok; 4 on any corpus problem (fail-closed; all candidate digests suppressed). A package MAY NOT proceed unless this artifact's --selftest exits 0 against the pinned canonicalizer_sha256.

Frozen positive test vector (behavioural pin, reproducible now)

membership over the 10 canonical full doc_ids under FIX7_ACTIVE_AUTHORITY_MEMBERSHIP_V1 (ascending, LF-joined, trailing LF) == f2bda8effc7be19b54722828126b82d7d2d48bee5e5e5dc0c8f347ce210fe251 (identical under shasum -a 256 and python hashlib). Any implementation that does not reproduce this exact digest is non-conformant.

Closed failure-status set (the ONLY allowed rejections)

CANONICAL_FIELD_RESERVED_TOKEN_REJECTED, CANONICAL_FIELD_VALUE_GRAMMAR_REJECTED, CANONICAL_FIELD_NULL_REJECTED, CANONICAL_FIELD_EMPTY_REJECTED, DOCUMENT_ID_ALIAS_REJECTED, DOCUMENT_ID_NOT_MCP_CANONICAL, DOCUMENT_ID_SCOPE_MISMATCH, MARKER_KIND_UNKNOWN, MARKER_LITERAL_MISMATCH, MARKER_LITERAL_NOT_ALLOWED, MARKER_KIND_LITERAL_INCONSISTENT, ACTIVE_SCOPE_MARKER_MISSING, ACTIVE_SCOPE_MARKER_DUPLICATE, FENCE_UNBALANCED, FENCE_NESTED_UNSUPPORTED, ACTIVE_SUPERSEDED_OVERLAP, SECTION_ID_MISMATCH, SECTION_RANGE_MISMATCH, EXCLUDE_REGION_UNBALANCED, MARKER_REGISTRY_MISMATCH, SEAL_HASH_GRAPH_CYCLE, ACTIVE_CONTENT_EMPTY. Corpus-level fail-closed report statuses (R9-B2, reported by --produce, never a digest): LOCAL_FILE_MISSING, DOCS_DIR_MISSING, GUARD_SET_SOURCE_MISSING, SUPPRESSED_CORPUS_NOT_OK, plus the listing problem classes missing / extra / duplicate. Any other behaviour is a defect.

AUTHORING_REQUIREMENT (implementation must not fail)

Implementation-authoring MUST adopt exactly one canonicalizer that is byte-for-byte this artifact (or a re-implementation proven to pass all embedded test vectors AND reproduce f2bda8…fe251), pinned by canonicalizer_sha256. No package, guard, or doc may ship or reference a different canonicalizer. Before PKG-A may proceed, the live canonicalizer's --selftest must exit 0 and its content hash must equal the sealed canonicalizer_sha256 (G-CANONICALIZER-SSOT-ONLY, G-NO-DUPLICATE-CANONICAL-AUTHORITY, doc 06).

Article-14 self-reference rule (BLOCKER A, encoded)

No load-bearing digest takes, as an input, a platform-assigned revision of the artifact that carries it (a revision exists only after the write, so embedding it is circular). Revisions are diagnostic / post-seal audit only. The canonicalizer enforces this via LOAD_BEARING_FORBIDS_SELF_REVISION = True and an empty SELF_REVISION_INPUTS set; adding such an edge is detected as a cycle.

Executable reference (authoritative)

#!/usr/bin/env python3
# ============================================================================
# FIX7-CANON-V1 CANONICALIZER  --  SINGLE SOURCE OF TRUTH (executable, EXTENDED)
# canonicalizer_artifact_id: FIX7-CANON-V1-CANONICALIZER
# canonicalizer_version:     FIX7-CANON-V1
#
# This is the PRODUCTION-COMPLETE superset of the recheck-8 SSOT fence. It keeps
# every recheck-8 vector byte-compatible (membership, document_id, marker, field,
# DAG) AND adds the production seal path the recheck-8 fence was missing:
#   - the deterministic active-scope / fence / section extractor (doc 00 spec)
#   - per-doc normalized_active_content_sha256            (N1)
#   - active_corpus_sha256                                (N6)
#   - marker_fence_registry_sha256                        (N3)
#   - superseded_boundary_sha256                          (N4)
#   - guard_set_sha256 (= N1 of doc 06)                   (N5)
#   - candidate canonicalizer_sha256 (hash of SSOT bytes)
#   - envelope_manifest_sha256  REHEARSAL only            (N7) [needs SEALED inputs]
#   - detached_seal_sha256      CODEX-ONLY                (N8) [Codex authors at seal]
#
# R9-B2 (Codex Recheck-9): the production --produce path is FAIL-CLOSED.
# Any missing / extra / duplicate / extract-error / invalid active member:
#   - suppresses EVERY candidate digest (value -> SUPPRESSED_CORPUS_NOT_OK),
#   - forces corpus_ok=False and membership_frozen_ok=False,
#   - forces a NONZERO process exit (4).
# membership_sha256 is computed over the ACTUALLY PRESENT AND VALID members, so a
# missing member can never reproduce the frozen pin.
#
# Article 14: ONE authority of ONE nature. This file IS the load-bearing canonical
# contract; doc 00 prose / report docs are NON_AUTHORITY_EXPLANATION.
#
# Invocation:
#   python3 fix7_canon_v1_ssot_extended.py --selftest
#       -> exit 0 iff every embedded vector passes (unit + production-path +
#          fail-closed corpus-gate fixtures); nonzero otherwise
#   python3 fix7_canon_v1_ssot_extended.py --produce <docs_dir> [<ssot_md_path>]
#       -> runs the production seal path over the 10 real active docs and prints
#          every COMPUTABLE digest as a candidate (values are SEAL_AT_CODEX_RECHECK_8
#          by design; this proves the encoder RUNS and is DETERMINISTIC).
#          exit 0 ONLY iff corpus_ok AND membership_frozen_ok; exit 4 otherwise
#          (all candidate digests suppressed on any corpus problem).
# ============================================================================
import hashlib, re, sys, os

def sha(b: bytes) -> str: return hashlib.sha256(b).hexdigest()

LOAD_BEARING_FORBIDS_SELF_REVISION = True

# ---- recheck-6 A: field rejection -----------------------------------------
FORBIDDEN_BYTES = {0x09,0x0A,0x0D,0x00,0x5C}            # TAB LF CR NUL backslash
RESERVED_TOKENS = ["<!-- ENVELOPE:EXCLUDE-BEGIN -->","<!-- ENVELOPE:EXCLUDE-END -->",
 "<!-- SUPERSEDED_NON_AUTHORITY BEGIN","<!-- SUPERSEDED_NON_AUTHORITY END -->",
 "FIX7_ACTIVE_AUTHORITY_MEMBERSHIP_V1","FIX7_ACTIVE_AUTHORITY_CORPUS_V1","FIX7_MARKER_FENCE_REGISTRY_V1",
 "FIX7_SUPERSEDED_BOUNDARY_V1","FIX7_GUARD_SET_V1","FIX7_DOC_NORMALIZED_CONTENT_V1",
 "FIX7_ACTIVE_AUTHORITY_ENVELOPE_MANIFEST_V1","FIX7_CODEX_DETACHED_SEAL_V1"]

class Reject(Exception):
    def __init__(s,st,d=""): super().__init__(f"{st}: {d}"); s.status=st

# ---- recheck-7 D: canonical document_id == exact MCP id, no alias ----------
KB_ROOT = "knowledge/dev/reports/architecture/"
_SEG = re.compile(r"^[A-Za-z0-9._-]+$")
def canonical_document_id(value, mcp_id=None, require_root=True):
    if value is None or value == "": raise Reject("DOCUMENT_ID_ALIAS_REJECTED","empty")
    for ch in value:
        if ord(ch) in FORBIDDEN_BYTES: raise Reject("DOCUMENT_ID_ALIAS_REJECTED",f"ctrl/backslash 0x{ord(ch):02x}")
        if ord(ch) > 0x7F: raise Reject("DOCUMENT_ID_ALIAS_REJECTED","non-ASCII (homoglyph?)")
    if "%" in value: raise Reject("DOCUMENT_ID_ALIAS_REJECTED","url-encoded")
    if "\\" in value: raise Reject("DOCUMENT_ID_ALIAS_REJECTED","backslash")
    if "//" in value: raise Reject("DOCUMENT_ID_ALIAS_REJECTED","empty segment //")
    if value.startswith("/"): raise Reject("DOCUMENT_ID_ALIAS_REJECTED","leading slash (ids are relative)")
    if value.endswith("/"): raise Reject("DOCUMENT_ID_ALIAS_REJECTED","trailing slash")
    for s in value.split("/"):
        if s in (".",".."): raise Reject("DOCUMENT_ID_ALIAS_REJECTED",f"dot segment {s!r}")
        if s == "":        raise Reject("DOCUMENT_ID_ALIAS_REJECTED","empty segment")
        if not _SEG.match(s): raise Reject("DOCUMENT_ID_ALIAS_REJECTED",f"bad segment {s!r}")
    if not value.endswith(".md"): raise Reject("DOCUMENT_ID_ALIAS_REJECTED","not .md")
    if require_root and not value.startswith(KB_ROOT): raise Reject("DOCUMENT_ID_SCOPE_MISMATCH",value)
    if mcp_id is not None and value != mcp_id:
        raise Reject("DOCUMENT_ID_NOT_MCP_CANONICAL",f"{value!r} != mcp {mcp_id!r}")
    return value

# ---- recheck-7 E: marker_kind <-> marker_literal closed contract ----------
MARKER_KINDS = {"DOC_STATUS","SUPERSEDED_BEGIN","SUPERSEDED_END",
                "ENVELOPE_EXCLUDE_BEGIN","ENVELOPE_EXCLUDE_END","AUTHORITY_BOUNDARY"}
MARKER_GRAMMAR = {
 "DOC_STATUS":            re.compile(r"^<!-- DOC_STATUS: (ACTIVE_AUTHORITY|SUPERSEDED_NON_AUTHORITY) -->$"),
 "ENVELOPE_EXCLUDE_BEGIN":re.compile(r"^<!-- ENVELOPE:EXCLUDE-BEGIN -->$"),
 "ENVELOPE_EXCLUDE_END":  re.compile(r"^<!-- ENVELOPE:EXCLUDE-END -->$"),
 "SUPERSEDED_BEGIN":      re.compile(r"^<!-- SUPERSEDED_NON_AUTHORITY BEGIN(: [^\r\n]*)? -->$"),
 "SUPERSEDED_END":        re.compile(r"^<!-- SUPERSEDED_NON_AUTHORITY END -->$"),
 "AUTHORITY_BOUNDARY":    re.compile(r"^<!-- AUTHORITY_BOUNDARY[^\r\n]*-->$"),
}
def check_marker(kind, literal):
    if kind not in MARKER_KINDS: raise Reject("MARKER_KIND_UNKNOWN",kind)
    for ch in literal:
        if ord(ch) in (0x09,0x0A,0x0D,0x00): raise Reject("MARKER_LITERAL_MISMATCH","ctrl byte")
    if not MARKER_GRAMMAR[kind].match(literal):
        for k2,g in MARKER_GRAMMAR.items():
            if k2!=kind and g.match(literal):
                raise Reject("MARKER_KIND_LITERAL_INCONSISTENT",f"{kind} vs literal of {k2}")
        raise Reject("MARKER_LITERAL_NOT_ALLOWED",f"{kind}:{literal!r}")
    return (kind, literal)

# classify a whole line -> (kind, literal) or None (used by the extractor)
def classify_line(line):
    hits=[k for k,g in MARKER_GRAMMAR.items() if g.match(line)]
    if not hits: return None
    if len(hits)>1: raise Reject("MARKER_REGISTRY_MISMATCH",f"line matches {hits}")
    return (hits[0], line)

# ---- recheck-6: field encode ----------------------------------------------
GRAMMARS = {"sha256_hex":re.compile(r"^[0-9a-f]{64}$"),
 "kb_revision":re.compile(r"^([1-9][0-9]*|SELF_HOST_PIN_BY_EXCLUDE_REGION_HASH)$"),
 "doc_status":re.compile(r"^(ACTIVE_AUTHORITY|SUPERSEDED_NON_AUTHORITY)$"),
 "boolean":re.compile(r"^(true|false)$"),
 "section":re.compile(r"^(WHOLE_DOCUMENT|WHOLE_DOCUMENT_MINUS_SUPERSEDED_FENCES|WHOLE_DOCUMENT_MINUS_EXCLUDE_AND_SUPERSEDED)$")}
SENTINEL_OK = {"NOT_APPLICABLE","NON_AUTHORITY_DIAGNOSTIC","SEAL_AT_CODEX_RECHECK_8"}
def vfield(field,value,grammar=None,allow_sentinel=True):
    if value is None: raise Reject("CANONICAL_FIELD_NULL_REJECTED",field)
    if value=="": raise Reject("CANONICAL_FIELD_EMPTY_REJECTED",field)
    for ch in value:
        if ord(ch) in FORBIDDEN_BYTES: raise Reject("CANONICAL_FIELD_RESERVED_TOKEN_REJECTED",f"{field} 0x{ord(ch):02x}")
    if field!="marker_literal":
        for t in RESERVED_TOKENS:
            if t in value: raise Reject("CANONICAL_FIELD_RESERVED_TOKEN_REJECTED",f"{field} token")
    if allow_sentinel and value in SENTINEL_OK: return value
    if grammar and not GRAMMARS[grammar].match(value): raise Reject("CANONICAL_FIELD_VALUE_GRAMMAR_REJECTED",f"{field}={value!r}")
    return value
def rec(*f):
    for x in f:
        if "\t" in x or "\n" in x: raise Reject("CANONICAL_FIELD_RESERVED_TOKEN_REJECTED","sep in value")
    return ("\t".join(f)+"\n").encode()
def digest(tag,records): return sha((tag+"\n").encode()+b"".join(records))

# ---- recheck-6 D / recheck-7 A: seal hash DAG -----------------------------
EDGES={"N1":[],"N2":[],"N3":[],"N4":[],"N5":[],"N6":["N1"],
 "N7":["N2","N3","N4","N5","N6","N1"],"N8":["N2","N5","N6","N7"],"N9_DIAG":[]}
LOAD_BEARING={"N1","N2","N3","N4","N5","N6","N7","N8"}
SELF_REVISION_INPUTS=set()
def has_cycle(e):
    c={k:0 for k in e}
    def dfs(u):
        c[u]=1
        for v in e[u]:
            if c[v]==1 or (c[v]==0 and dfs(v)): return True
        c[u]=2; return False
    return any(c[k]==0 and dfs(k) for k in e)

# ===========================================================================
# PRODUCTION SEAL PATH  (the part the recheck-8 fence was missing)
# ===========================================================================
PREFIX=KB_ROOT+"t1-fix7-existing-system-refactor-execution-blueprint-2026-06-08/"
DOCS=["00-readme-first.md","01-live-existing-system-inventory.md","02-design-to-live-mapping.md",
"03-gap-classification.md","04-dependency-safe-construction-order.md","05-rollback-blueprint.md",
"06-test-guard-blueprint.md","07-implementation-package-split.md","08-hard-blocks-do-not-touch-list.md",
"12-final-verdict.md"]
MEMBERSHIP_EXPECT="f2bda8effc7be19b54722828126b82d7d2d48bee5e5e5dc0c8f347ce210fe251"
SELF_HOST_DOC="00-readme-first.md"   # the only doc with an ENVELOPE:EXCLUDE region

def normalize_lines(text):
    """CRLF/CR -> LF, then 1-based line model. A trailing LF is a terminator,
    not a content line, so it does not create a spurious empty final line."""
    norm=text.replace("\r\n","\n").replace("\r","\n")
    lines=norm.split("\n")
    if lines and lines[-1]=="":
        lines=lines[:-1]
    return lines  # lines[i-1] == 1-based line i

def pair_fences(lines, begin_kind, end_kind, unbalanced_status, nested_status):
    """Flat (non-nesting) single-stack pairing -> list of (begin_line, end_line) 1-based inclusive."""
    ranges=[]; open_at=None
    for i,ln in enumerate(lines, start=1):
        c=classify_line(ln)
        if c is None: continue
        k=c[0]
        if k==begin_kind:
            if open_at is not None: raise Reject(nested_status,f"{begin_kind} at L{i} inside open L{open_at}")
            open_at=i
        elif k==end_kind:
            if open_at is None: raise Reject(unbalanced_status,f"{end_kind} at L{i} without begin")
            ranges.append((open_at,i)); open_at=None
    if open_at is not None: raise Reject(unbalanced_status,f"{begin_kind} at L{open_at} never closed")
    return ranges

def extract(document_id, raw_text, is_self_host):
    """Deterministic active-scope / fence / section extractor (doc 00 spec).
    Returns dict: normalized_active_content(bytes), markers[(kind,literal)],
    superseded_ranges[(b,e)], exclude_ranges[(b,e)], doc_status."""
    lines=normalize_lines(raw_text)
    # 1) enumerate markers (whole-line grammar)
    markers=[]
    doc_status_lines=[]
    for i,ln in enumerate(lines, start=1):
        c=classify_line(ln)
        if c is None: continue
        markers.append((i,c[0],c[1]))
        if c[0]=="DOC_STATUS": doc_status_lines.append((i,c[1]))
    # 2) DOC_STATUS cardinality: exactly one
    if len(doc_status_lines)==0: raise Reject("ACTIVE_SCOPE_MARKER_MISSING",f"{document_id}: no DOC_STATUS")
    if len(doc_status_lines)>1: raise Reject("ACTIVE_SCOPE_MARKER_DUPLICATE",f"{document_id}: {len(doc_status_lines)} DOC_STATUS")
    doc_status=MARKER_GRAMMAR["DOC_STATUS"].match(doc_status_lines[0][1]).group(1)
    # 3) superseded fences (flat) + exclude fences (flat, self-host only meaningful)
    sup=pair_fences(lines,"SUPERSEDED_BEGIN","SUPERSEDED_END","FENCE_UNBALANCED","FENCE_NESTED_UNSUPPORTED")
    exc=pair_fences(lines,"ENVELOPE_EXCLUDE_BEGIN","ENVELOPE_EXCLUDE_END","EXCLUDE_REGION_UNBALANCED","FENCE_NESTED_UNSUPPORTED")
    # 4) removal set = superseded inclusive  (+ exclude inclusive iff self-host)
    removal=set()
    for b,e in sup: removal.update(range(b,e+1))
    if is_self_host:
        for b,e in exc: removal.update(range(b,e+1))
    else:
        if exc: raise Reject("EXCLUDE_REGION_UNBALANCED",f"{document_id}: exclude region in non-self-host doc")
    # 5) overlap assertion (superseded vs exclude must not overlap)
    sup_set=set()
    for b,e in sup: sup_set.update(range(b,e+1))
    exc_set=set()
    for b,e in exc: exc_set.update(range(b,e+1))
    if sup_set & exc_set: raise Reject("ACTIVE_SUPERSEDED_OVERLAP",f"{document_id}: superseded/exclude overlap")
    # 6) normalized active content = retained line + LF, ascending line order
    active=b""
    for i,ln in enumerate(lines, start=1):
        if i in removal: continue
        active += (ln+"\n").encode("utf-8")
    if not active: raise Reject("ACTIVE_CONTENT_EMPTY",f"{document_id}: no active content after removal")
    return {"normalized_active_content":active,"markers":[(m[1],m[2]) for m in markers],
            "superseded_ranges":sup,"exclude_ranges":exc,"doc_status":doc_status}

def per_doc_content_digest(document_id, normalized_active_content):
    # tag uniquely uses TAB between tag and document_id
    tag=("FIX7_DOC_NORMALIZED_CONTENT_V1\t"+document_id+"\n").encode("utf-8")
    return sha(tag+normalized_active_content)

# active_section_id_or_range per the live envelope
SECTION_BY_DOC={d:"WHOLE_DOCUMENT" for d in DOCS}
SECTION_BY_DOC["00-readme-first.md"]="WHOLE_DOCUMENT_MINUS_EXCLUDE_AND_SUPERSEDED"
SECTION_BY_DOC["12-final-verdict.md"]="WHOLE_DOCUMENT_MINUS_SUPERSEDED_FENCES"

def membership():
    """The FROZEN membership pin definition: digest over the 10 frozen doc_ids."""
    ids=sorted(canonical_document_id(PREFIX+d, mcp_id=PREFIX+d) for d in DOCS)
    return digest("FIX7_ACTIVE_AUTHORITY_MEMBERSHIP_V1",[rec(i) for i in ids])

# ---- R9-B2: FAIL-CLOSED corpus gate ---------------------------------------
SUPPRESSED="SUPPRESSED_CORPUS_NOT_OK"
SUPPRESSIBLE_DIGEST_KEYS=("membership_sha256","active_corpus_sha256","marker_fence_registry_sha256",
 "superseded_boundary_sha256","guard_set_sha256","canonicalizer_sha256_candidate",
 "envelope_manifest_sha256","detached_seal_sha256")

def validate_corpus_listing(actual_names, expected=None):
    """Pure fail-closed check of a docs-dir listing against the frozen membership.
    Detects MISSING, EXTRA, and DUPLICATE (exact or case-variant) members.
    Returns {"missing":[...], "extra":[...], "duplicate":[...], "ok":bool}."""
    expected = DOCS if expected is None else expected
    exp=set(expected)
    missing=[]; extra=[]; duplicate=[]
    seen={}
    for n in sorted(actual_names):
        key=n.lower()
        if key in seen: duplicate.append(seen[key])
        else: seen[key]=n
        if n not in exp: extra.append(n)
    present=set(actual_names)
    for e in expected:
        if e not in present: missing.append(e)
    return {"missing":missing,"extra":extra,"duplicate":duplicate,
            "ok": not (missing or extra or duplicate)}

def corpus_gate(listing_ok, errors, n_extracted, n_expected):
    """Pure gate: the corpus is OK only if the listing is exact, no per-doc
    extract error occurred, and every expected member extracted successfully."""
    return bool(listing_ok) and not errors and n_extracted==n_expected

def gate_and_suppress(out):
    """R9-B2 enforcement: any corpus problem suppresses EVERY candidate digest
    and forces corpus_ok / membership_frozen_ok to False. Returns out."""
    out["corpus_ok"]=corpus_gate(out["listing"]["ok"], out["errors"],
                                 len(out["per_doc"]), len(DOCS))
    out["membership_frozen_ok"]=bool(out["corpus_ok"] and
                                     out.get("membership_sha256")==MEMBERSHIP_EXPECT)
    if not (out["corpus_ok"] and out["membership_frozen_ok"]):
        for k in SUPPRESSIBLE_DIGEST_KEYS:
            if k in out: out[k]=SUPPRESSED
        out["membership_frozen_ok"]=False
    return out

def produce(docs_dir, ssot_md_path=None, kb_revisions=None):
    """Run the production seal path over the real active docs. FAIL-CLOSED (R9-B2):
    any missing/extra/duplicate/extract-error/invalid member suppresses every
    candidate digest and the CLI exits nonzero. Returns a dict of every COMPUTABLE
    digest (candidate; values are SEAL_AT_CODEX_RECHECK_8 by design), plus the
    rehearsal manifest and the N8 Codex-only marker."""
    kb_revisions = kb_revisions or {}
    out={"per_doc":{}, "errors":{}, "markers":[], "superseded":[]}
    if os.path.isdir(docs_dir):
        listed=sorted(f for f in os.listdir(docs_dir) if f.lower().endswith(".md"))
    else:
        listed=[]; out["errors"]["__docs_dir__"]="DOCS_DIR_MISSING"
    out["listing"]=validate_corpus_listing(listed)
    corpus_records=[]; reg_records=[]; boundary_records=[]
    guard_set_digest=None
    for d in DOCS:
        path=os.path.join(docs_dir,d)
        if not os.path.exists(path):
            out["errors"][d]="LOCAL_FILE_MISSING"; continue
        raw=open(path,"r",encoding="utf-8").read()
        doc_id=PREFIX+d
        try:
            ex=extract(doc_id, raw, is_self_host=(d==SELF_HOST_DOC))
        except Reject as e:
            out["errors"][d]=e.status; continue
        cdig=per_doc_content_digest(doc_id, ex["normalized_active_content"])
        out["per_doc"][d]={"normalized_active_content_sha256":cdig,
                           "active_bytes":len(ex["normalized_active_content"]),
                           "doc_status":ex["doc_status"],
                           "n_markers":len(ex["markers"]),
                           "n_superseded":len(ex["superseded_ranges"])}
        if d==SELF_HOST_DOC: kbrev="SELF_HOST_PIN_BY_EXCLUDE_REGION_HASH"
        else: kbrev=str(kb_revisions.get(d,"SEAL_AT_CODEX_RECHECK_8"))
        # active_corpus record
        corpus_records.append(rec(doc_id, ex["doc_status"], SECTION_BY_DOC[d], kbrev, cdig))
        # marker/fence registry records
        for kind,lit in ex["markers"]:
            reg_records.append(rec(doc_id, kind, lit))
            out["markers"].append((d,kind,lit))
        # superseded boundary records
        for k,(b,e) in enumerate(sorted(ex["superseded_ranges"]), start=1):
            sid=f"{doc_id}#S{k}"; rng=f"L{b}-L{e}"
            boundary_records.append(rec(sid, rng))
            out["superseded"].append((sid,rng))
        if d=="06-test-guard-blueprint.md":
            guard_set_digest=cdig  # guard_set_sha256 := N1 of doc 06
    # membership over the ACTUALLY PRESENT AND VALID members (R9-B2): a missing or
    # invalid member yields a different digest than the frozen pin -> frozen_ok False.
    valid_ids=sorted(PREFIX+d for d in DOCS if d in out["per_doc"])
    out["membership_sha256"]=digest("FIX7_ACTIVE_AUTHORITY_MEMBERSHIP_V1",[rec(i) for i in valid_ids])
    out["active_corpus_sha256"]=digest("FIX7_ACTIVE_AUTHORITY_CORPUS_V1", sorted(corpus_records))
    out["marker_fence_registry_sha256"]=digest("FIX7_MARKER_FENCE_REGISTRY_V1", sorted(reg_records))
    out["superseded_boundary_sha256"]=digest("FIX7_SUPERSEDED_BOUNDARY_V1", sorted(boundary_records))
    if guard_set_digest is None:
        out["errors"].setdefault("06-test-guard-blueprint.md","GUARD_SET_SOURCE_MISSING")
        out["guard_set_sha256"]="GUARD_SET_SOURCE_MISSING"
    else:
        out["guard_set_sha256"]=guard_set_digest
    if ssot_md_path and os.path.exists(ssot_md_path):
        md=open(ssot_md_path,"r",encoding="utf-8").read()
        md=md.replace("\r\n","\n").replace("\r","\n")
        out["canonicalizer_sha256_candidate"]=sha(md.encode("utf-8"))
    # N7 envelope_manifest: REHEARSAL ONLY (binds SEALED sub-digests + approval-event
    # fields that only Codex sets). We refuse to emit a "real" value -> not self-fabricated.
    out["envelope_manifest_sha256"]="REHEARSAL_ONLY_NEEDS_SEALED_INPUTS (N7)"
    # N8 detached_seal: Codex authors (sealed_by/at, signature, parent_checkpoint, report_documents)
    out["detached_seal_sha256"]="CODEX_ONLY_NOT_SELF_COMPUTABLE (N8)"
    return gate_and_suppress(out)

def duplicate_authority_inventory(candidate_paths):
    """P4: a runnable inventory proving exactly ONE canonical canonicalizer identity.
    Each candidate is (label, identity_string). Pass iff exactly one distinct identity."""
    ids={}
    for label,identity in candidate_paths:
        ids.setdefault(identity,[]).append(label)
    return {"distinct_identities":len(ids), "ok":len(ids)==1, "map":ids}

# ===========================================================================
# SELFTEST  (recheck-8 unit vectors + recheck-9 production-path fixtures
#            + R9-B2 fail-closed corpus-gate fixtures)
# ===========================================================================
def selftest():
    out=[]; ok=True
    def chk(label, cond):
        nonlocal ok; ok = ok and cond; out.append(f"  [{'PASS' if cond else 'FAIL'}] {label}")
    # ---- recheck-8 unit vectors (preserved) ----
    chk("membership == f2bda8...fe251", membership()==MEMBERSHIP_EXPECT)
    chk("DAG acyclic", not has_cycle(EDGES))
    chk("no self-revision input in load-bearing", len(SELF_REVISION_INPUTS)==0 and LOAD_BEARING_FORBIDS_SELF_REVISION)
    chk("valid doc id accepted", canonical_document_id(PREFIX+"00-readme-first.md", mcp_id=PREFIX+"00-readme-first.md")==PREFIX+"00-readme-first.md")
    chk("valid marker accepted", check_marker("DOC_STATUS","<!-- DOC_STATUS: ACTIVE_AUTHORITY -->")[0]=="DOC_STATUS")
    def expect(label,status,fn):
        nonlocal ok
        try: fn(); out.append(f"  [FAIL] {label} (not rejected)"); ok=False
        except Reject as e:
            good=e.status==status; ok=ok and good
            out.append(f"  [{'PASS' if good else 'FAIL'}] {label} -> {e.status}")
    expect("doc_id '.' segment","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id(KB_ROOT+"./x.md"))
    expect("doc_id '..' segment","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id(KB_ROOT+"a/../x.md"))
    expect("doc_id '//'","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id(KB_ROOT+"a//x.md"))
    expect("doc_id empty seg(trailing)","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id(KB_ROOT+"x.md/"))
    expect("doc_id backslash","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id(KB_ROOT+"a\\x.md"))
    expect("doc_id url-encoded","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id(KB_ROOT+"a%2e/x.md"))
    expect("doc_id homoglyph slash","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id(KB_ROOT+"a⁄x.md"))
    expect("doc_id leading slash","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id("/"+KB_ROOT+"x.md"))
    expect("doc_id scope mismatch","DOCUMENT_ID_SCOPE_MISMATCH",lambda:canonical_document_id("other/dir/x.md"))
    expect("doc_id != mcp (case)","DOCUMENT_ID_NOT_MCP_CANONICAL",lambda:canonical_document_id(PREFIX+"00-Readme-First.md", mcp_id=PREFIX+"00-readme-first.md"))
    expect("marker unknown kind","MARKER_KIND_UNKNOWN",lambda:check_marker("FOO","<!-- DOC_STATUS: ACTIVE_AUTHORITY -->"))
    expect("marker kind/literal inconsistent","MARKER_KIND_LITERAL_INCONSISTENT",lambda:check_marker("DOC_STATUS","<!-- ENVELOPE:EXCLUDE-BEGIN -->"))
    expect("marker literal typo","MARKER_LITERAL_NOT_ALLOWED",lambda:check_marker("DOC_STATUS","<!-- DOC_STATUS: ACTIVE -->"))
    expect("field TAB rejected","CANONICAL_FIELD_RESERVED_TOKEN_REJECTED",lambda:vfield("x","a\tb"))
    expect("field null rejected","CANONICAL_FIELD_NULL_REJECTED",lambda:vfield("x",None))
    expect("field empty rejected","CANONICAL_FIELD_EMPTY_REJECTED",lambda:vfield("x",""))
    e2={k:list(v) for k,v in EDGES.items()}; e2["N8"]=e2["N8"]+["N8"]
    chk("seal self-revision/self-hash edge -> cycle detected", has_cycle(e2))

    # ---- recheck-9 PRODUCTION-PATH fixtures (the part L2 said was missing) ----
    # worked normalized-content vector on a tiny fixture (deterministic pin)
    tiny="<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\nalpha\nbeta\n"
    ex=extract("knowledge/dev/reports/architecture/x/y.md", tiny, is_self_host=False)
    expect_active=b"<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\nalpha\nbeta\n"
    chk("extractor: active content (no fences) == input", ex["normalized_active_content"]==expect_active)
    chk("extractor: per-doc digest deterministic", per_doc_content_digest("a/b.md",expect_active)==per_doc_content_digest("a/b.md",expect_active))
    # superseded fence removed inclusive
    sup="<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\nkeep1\n<!-- SUPERSEDED_NON_AUTHORITY BEGIN -->\ndrop\n<!-- SUPERSEDED_NON_AUTHORITY END -->\nkeep2\n"
    exs=extract("k/s.md", sup, is_self_host=False)
    chk("extractor: superseded fence removed inclusive", exs["normalized_active_content"]==b"<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\nkeep1\nkeep2\n")
    chk("extractor: 1 superseded range recorded", exs["superseded_ranges"]==[(3,5)])
    # CRLF normalization
    crlf="<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\r\nx\r\n"
    exc=extract("k/c.md", crlf, is_self_host=False)
    chk("extractor: CRLF normalized to LF", exc["normalized_active_content"]==b"<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\nx\n")
    # fail-closed: missing DOC_STATUS
    expect("extractor: missing DOC_STATUS","ACTIVE_SCOPE_MARKER_MISSING",lambda:extract("k/m.md","no marker here\n",False))
    # fail-closed: duplicate DOC_STATUS
    expect("extractor: duplicate DOC_STATUS","ACTIVE_SCOPE_MARKER_DUPLICATE",
           lambda:extract("k/d.md","<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\n<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\n",False))
    # fail-closed: unbalanced superseded fence
    expect("extractor: unbalanced superseded fence","FENCE_UNBALANCED",
           lambda:extract("k/u.md","<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\n<!-- SUPERSEDED_NON_AUTHORITY BEGIN -->\nx\n",False))
    # fail-closed: nested superseded fence
    expect("extractor: nested superseded fence","FENCE_NESTED_UNSUPPORTED",
           lambda:extract("k/n.md","<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\n<!-- SUPERSEDED_NON_AUTHORITY BEGIN -->\n<!-- SUPERSEDED_NON_AUTHORITY BEGIN -->\n<!-- SUPERSEDED_NON_AUTHORITY END -->\n<!-- SUPERSEDED_NON_AUTHORITY END -->\n",False))
    # fail-closed: exclude region in non-self-host doc
    expect("extractor: exclude region forbidden off self-host","EXCLUDE_REGION_UNBALANCED",
           lambda:extract("k/e.md","<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\n<!-- ENVELOPE:EXCLUDE-BEGIN -->\nx\n<!-- ENVELOPE:EXCLUDE-END -->\n",False))
    # self-host exclude region removed inclusive
    sh="<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\nkeep\n<!-- ENVELOPE:EXCLUDE-BEGIN -->\nsecret\n<!-- ENVELOPE:EXCLUDE-END -->\ntail\n"
    exsh=extract("k/sh.md", sh, is_self_host=True)
    chk("extractor: self-host exclude removed inclusive", exsh["normalized_active_content"]==b"<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\nkeep\ntail\n")
    # dup-authority inventory: one identity passes, two fails
    di_ok=duplicate_authority_inventory([("md","ID-A"),("py","ID-A")])
    di_bad=duplicate_authority_inventory([("md","ID-A"),("fork","ID-B")])
    chk("dup-authority inventory: one identity -> ok", di_ok["ok"] and di_ok["distinct_identities"]==1)
    chk("dup-authority inventory: two identities -> not ok", (not di_bad["ok"]) and di_bad["distinct_identities"]==2)
    # guard_set definition identity (encoder wiring, not a value)
    chk("guard_set_sha256 := N1(doc06) wiring present", "06-test-guard-blueprint.md" in DOCS)

    # ---- R9-B2 FAIL-CLOSED corpus-gate fixtures (pure, no filesystem) ----
    chk("R9-B2 corpus listing: exact 10 members -> ok", validate_corpus_listing(list(DOCS))["ok"])
    miss=validate_corpus_listing([d for d in DOCS if d!="05-rollback-blueprint.md"])
    chk("R9-B2 corpus listing: missing member detected -> not ok", (not miss["ok"]) and miss["missing"]==["05-rollback-blueprint.md"])
    extra=validate_corpus_listing(list(DOCS)+["99-extra-doc.md"])
    chk("R9-B2 corpus listing: extra member detected -> not ok", (not extra["ok"]) and extra["extra"]==["99-extra-doc.md"])
    dup=validate_corpus_listing(list(DOCS)+["05-Rollback-Blueprint.md"])
    chk("R9-B2 corpus listing: duplicate (case-variant) detected -> not ok", (not dup["ok"]) and len(dup["duplicate"])==1)
    expect("extractor: empty active content rejected","ACTIVE_CONTENT_EMPTY",
           lambda:extract("k/z.md","<!-- ENVELOPE:EXCLUDE-BEGIN -->\n<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\n<!-- ENVELOPE:EXCLUDE-END -->\n",True))
    chk("R9-B2 corpus gate: clean corpus -> ok", corpus_gate(True, {}, len(DOCS), len(DOCS)))
    chk("R9-B2 corpus gate: extract error forces not ok", not corpus_gate(True, {"05-rollback-blueprint.md":"LOCAL_FILE_MISSING"}, 9, len(DOCS)))
    chk("R9-B2 corpus gate: bad listing forces not ok", not corpus_gate(False, {}, len(DOCS), len(DOCS)))
    fake={"listing":{"ok":False,"missing":["05-rollback-blueprint.md"],"extra":[],"duplicate":[]},
          "errors":{"05-rollback-blueprint.md":"LOCAL_FILE_MISSING"},
          "per_doc":{d:None for d in DOCS if d!="05-rollback-blueprint.md"},
          "membership_sha256":"0"*64,"active_corpus_sha256":"0"*64,
          "marker_fence_registry_sha256":"0"*64,"superseded_boundary_sha256":"0"*64,
          "guard_set_sha256":"0"*64,"canonicalizer_sha256_candidate":"0"*64,
          "envelope_manifest_sha256":"x","detached_seal_sha256":"x"}
    g=gate_and_suppress(fake)
    chk("R9-B2 corpus gate: problem suppresses every candidate digest",
        (not g["corpus_ok"]) and (not g["membership_frozen_ok"]) and
        all(g[k]==SUPPRESSED for k in SUPPRESSIBLE_DIGEST_KEYS))
    return ok, out

if __name__=="__main__":
    args=sys.argv[1:]
    if args and args[0]=="--produce":
        docs_dir=args[1] if len(args)>1 else "docs"
        ssot=args[2] if len(args)>2 else None
        res=produce(docs_dir, ssot)
        print("FIX7-CANON-V1 PRODUCTION SEAL PATH (candidate; values SEAL_AT_CODEX_RECHECK_8; FAIL-CLOSED R9-B2)")
        print("membership_sha256            :", res["membership_sha256"], "(over PRESENT+VALID members; frozen pin f2bda8...fe251)")
        print("active_corpus_sha256         :", res["active_corpus_sha256"])
        print("marker_fence_registry_sha256 :", res["marker_fence_registry_sha256"])
        print("superseded_boundary_sha256   :", res["superseded_boundary_sha256"])
        print("guard_set_sha256             :", res["guard_set_sha256"])
        print("canonicalizer_sha256_cand    :", res.get("canonicalizer_sha256_candidate"))
        print("envelope_manifest_sha256     :", res["envelope_manifest_sha256"])
        print("detached_seal_sha256         :", res["detached_seal_sha256"])
        print("per-doc normalized_active_content_sha256:")
        for d in DOCS:
            if d in res["per_doc"]:
                pd=res["per_doc"][d]
                print(f"   {d:42s} {pd['normalized_active_content_sha256']}  bytes={pd['active_bytes']} status={pd['doc_status']} markers={pd['n_markers']} sup={pd['n_superseded']}")
            else:
                print(f"   {d:42s} EXTRACT_ERROR={res['errors'].get(d)}")
        L=res["listing"]
        if not L["ok"]:
            print(f"corpus_listing_problems: missing={L['missing']} extra={L['extra']} duplicate={L['duplicate']}")
        if res["errors"]:
            print("corpus_errors:", {k: res["errors"][k] for k in sorted(res["errors"])})
        print("corpus_ok:", res["corpus_ok"])
        print("membership_frozen_ok:", res["membership_frozen_ok"])
        if not (res["corpus_ok"] and res["membership_frozen_ok"]):
            print("ALL CANDIDATE DIGESTS SUPPRESSED (fail-closed: corpus not OK) -> exit 4")
            sys.exit(4)
        sys.exit(0)
    ok,out=selftest()
    print("FIX7-CANON-V1 CANONICALIZER SSOT SELFTEST (extended: unit + production-path + fail-closed corpus gate)")
    print("\n".join(out))
    print("ALL PASS:", ok, f"({sum('[PASS]' in l for l in out)}/{len(out)} checks)")
    sys.exit(0 if ok else 1)

Conformance evidence (this pass)

python3 canonicalizer-fix7-canon-v1-ssot.py --selftest45/45 PASS, exit 0 (unit + production-path + R9-B2 fail-closed corpus-gate fixtures, run by T1 this pass): membership cross-tool shasum==hashlib==f2bda8…fe251; DAG acyclic; no self-revision input; every document_id alias class rejected with the named status; marker kind/literal unknown/inconsistent/typo rejected; field TAB/null/empty rejected; a seal self-revision/self-hash edge is detected as a cycle; the active-scope/fence/section extractor (N1/N3/N4/N5/N6) is exercised on fixtures and fail-closes on every malformed input; --produce over the 10 active docs is deterministic AND fail-closed (R9-B2: missing/extra/duplicate/invalid member → every candidate digest SUPPRESSED_CORPUS_NOT_OK, exit 4). This is the executable proof that the contract is finitely checkable without agent improvisation (Constitution Article 14). Full reproducible evidence: knowledge/dev/laws/tool-kiem-thu/packets/fix7-codex-recheck-9-2026-06-10/.

Back to Knowledge Hub knowledge/dev/laws/tool-kiem-thu/packets/fix7-codex-recheck-9-2026-06-10/evidence/canonicalizer-fix7-canon-v1-ssot.md