FIX7 Recheck-9 Packet V2 — evidence/canonicalizer-fix7-canon-v1-ssot.py
#!/usr/bin/env python3
============================================================================
FIX7-CANON-V1 CANONICALIZER -- SINGLE SOURCE OF TRUTH (executable, EXTENDED)
canonicalizer_artifact_id: FIX7-CANON-V1-CANONICALIZER
canonicalizer_version: FIX7-CANON-V1
This is the PRODUCTION-COMPLETE superset of the recheck-8 SSOT fence. It keeps
every recheck-8 vector byte-compatible (membership, document_id, marker, field,
DAG) AND adds the production seal path the recheck-8 fence was missing:
- the deterministic active-scope / fence / section extractor (doc 00 spec)
- per-doc normalized_active_content_sha256 (N1)
- active_corpus_sha256 (N6)
- marker_fence_registry_sha256 (N3)
- superseded_boundary_sha256 (N4)
- guard_set_sha256 (= N1 of doc 06) (N5)
- candidate canonicalizer_sha256 (hash of SSOT bytes)
- envelope_manifest_sha256 REHEARSAL only (N7) [needs SEALED inputs]
- detached_seal_sha256 CODEX-ONLY (N8) [Codex authors at seal]
R9-B2 (Codex Recheck-9): the production --produce path is FAIL-CLOSED.
Any missing / extra / duplicate / extract-error / invalid active member:
- suppresses EVERY candidate digest (value -> SUPPRESSED_CORPUS_NOT_OK),
- forces corpus_ok=False and membership_frozen_ok=False,
- forces a NONZERO process exit (4).
membership_sha256 is computed over the ACTUALLY PRESENT AND VALID members, so a
missing member can never reproduce the frozen pin.
Article 14: ONE authority of ONE nature. This file IS the load-bearing canonical
contract; doc 00 prose / report docs are NON_AUTHORITY_EXPLANATION.
Invocation:
python3 fix7_canon_v1_ssot_extended.py --selftest
-> exit 0 iff every embedded vector passes (unit + production-path +
fail-closed corpus-gate fixtures); nonzero otherwise
python3 fix7_canon_v1_ssot_extended.py --produce <docs_dir> [<ssot_md_path>]
-> runs the production seal path over the 10 real active docs and prints
every COMPUTABLE digest as a candidate (values are SEAL_AT_CODEX_RECHECK_8
by design; this proves the encoder RUNS and is DETERMINISTIC).
exit 0 ONLY iff corpus_ok AND membership_frozen_ok; exit 4 otherwise
(all candidate digests suppressed on any corpus problem).
============================================================================
import hashlib, re, sys, os
def sha(b: bytes) -> str: return hashlib.sha256(b).hexdigest()
LOAD_BEARING_FORBIDS_SELF_REVISION = True
---- recheck-6 A: field rejection -----------------------------------------
FORBIDDEN_BYTES = {0x09,0x0A,0x0D,0x00,0x5C} # TAB LF CR NUL backslash RESERVED_TOKENS = ["<!-- ENVELOPE:EXCLUDE-BEGIN -->","<!-- ENVELOPE:EXCLUDE-END -->", "<!-- SUPERSEDED_NON_AUTHORITY BEGIN","<!-- SUPERSEDED_NON_AUTHORITY END -->", "FIX7_ACTIVE_AUTHORITY_MEMBERSHIP_V1","FIX7_ACTIVE_AUTHORITY_CORPUS_V1","FIX7_MARKER_FENCE_REGISTRY_V1", "FIX7_SUPERSEDED_BOUNDARY_V1","FIX7_GUARD_SET_V1","FIX7_DOC_NORMALIZED_CONTENT_V1", "FIX7_ACTIVE_AUTHORITY_ENVELOPE_MANIFEST_V1","FIX7_CODEX_DETACHED_SEAL_V1"]
class Reject(Exception): def init(s,st,d=""): super().init(f"{st}: {d}"); s.status=st
---- recheck-7 D: canonical document_id == exact MCP id, no alias ----------
KB_ROOT = "knowledge/dev/reports/architecture/" SEG = re.compile(r"^[A-Za-z0-9.-]+$") def canonical_document_id(value, mcp_id=None, require_root=True): if value is None or value == "": raise Reject("DOCUMENT_ID_ALIAS_REJECTED","empty") for ch in value: if ord(ch) in FORBIDDEN_BYTES: raise Reject("DOCUMENT_ID_ALIAS_REJECTED",f"ctrl/backslash 0x{ord(ch):02x}") if ord(ch) > 0x7F: raise Reject("DOCUMENT_ID_ALIAS_REJECTED","non-ASCII (homoglyph?)") if "%" in value: raise Reject("DOCUMENT_ID_ALIAS_REJECTED","url-encoded") if "\" in value: raise Reject("DOCUMENT_ID_ALIAS_REJECTED","backslash") if "//" in value: raise Reject("DOCUMENT_ID_ALIAS_REJECTED","empty segment //") if value.startswith("/"): raise Reject("DOCUMENT_ID_ALIAS_REJECTED","leading slash (ids are relative)") if value.endswith("/"): raise Reject("DOCUMENT_ID_ALIAS_REJECTED","trailing slash") for s in value.split("/"): if s in (".",".."): raise Reject("DOCUMENT_ID_ALIAS_REJECTED",f"dot segment {s!r}") if s == "": raise Reject("DOCUMENT_ID_ALIAS_REJECTED","empty segment") if not _SEG.match(s): raise Reject("DOCUMENT_ID_ALIAS_REJECTED",f"bad segment {s!r}") if not value.endswith(".md"): raise Reject("DOCUMENT_ID_ALIAS_REJECTED","not .md") if require_root and not value.startswith(KB_ROOT): raise Reject("DOCUMENT_ID_SCOPE_MISMATCH",value) if mcp_id is not None and value != mcp_id: raise Reject("DOCUMENT_ID_NOT_MCP_CANONICAL",f"{value!r} != mcp {mcp_id!r}") return value
---- recheck-7 E: marker_kind <-> marker_literal closed contract ----------
MARKER_KINDS = {"DOC_STATUS","SUPERSEDED_BEGIN","SUPERSEDED_END", "ENVELOPE_EXCLUDE_BEGIN","ENVELOPE_EXCLUDE_END","AUTHORITY_BOUNDARY"} MARKER_GRAMMAR = { "DOC_STATUS": re.compile(r"^<!-- DOC_STATUS: (ACTIVE_AUTHORITY|SUPERSEDED_NON_AUTHORITY) -->$"), "ENVELOPE_EXCLUDE_BEGIN":re.compile(r"^<!-- ENVELOPE:EXCLUDE-BEGIN -->$"), "ENVELOPE_EXCLUDE_END": re.compile(r"^<!-- ENVELOPE:EXCLUDE-END -->$"), "SUPERSEDED_BEGIN": re.compile(r"^<!-- SUPERSEDED_NON_AUTHORITY BEGIN(: [^\r\n]*)? -->$"), "SUPERSEDED_END": re.compile(r"^<!-- SUPERSEDED_NON_AUTHORITY END -->$"), "AUTHORITY_BOUNDARY": re.compile(r"^<!-- AUTHORITY_BOUNDARY[^\r\n]*-->$"), } def check_marker(kind, literal): if kind not in MARKER_KINDS: raise Reject("MARKER_KIND_UNKNOWN",kind) for ch in literal: if ord(ch) in (0x09,0x0A,0x0D,0x00): raise Reject("MARKER_LITERAL_MISMATCH","ctrl byte") if not MARKER_GRAMMAR[kind].match(literal): for k2,g in MARKER_GRAMMAR.items(): if k2!=kind and g.match(literal): raise Reject("MARKER_KIND_LITERAL_INCONSISTENT",f"{kind} vs literal of {k2}") raise Reject("MARKER_LITERAL_NOT_ALLOWED",f"{kind}:{literal!r}") return (kind, literal)
classify a whole line -> (kind, literal) or None (used by the extractor)
def classify_line(line): hits=[k for k,g in MARKER_GRAMMAR.items() if g.match(line)] if not hits: return None if len(hits)>1: raise Reject("MARKER_REGISTRY_MISMATCH",f"line matches {hits}") return (hits[0], line)
---- recheck-6: field encode ----------------------------------------------
GRAMMARS = {"sha256_hex":re.compile(r"^[0-9a-f]{64}$"), "kb_revision":re.compile(r"^([1-9][0-9]*|SELF_HOST_PIN_BY_EXCLUDE_REGION_HASH)$"), "doc_status":re.compile(r"^(ACTIVE_AUTHORITY|SUPERSEDED_NON_AUTHORITY)$"), "boolean":re.compile(r"^(true|false)$"), "section":re.compile(r"^(WHOLE_DOCUMENT|WHOLE_DOCUMENT_MINUS_SUPERSEDED_FENCES|WHOLE_DOCUMENT_MINUS_EXCLUDE_AND_SUPERSEDED)$")} SENTINEL_OK = {"NOT_APPLICABLE","NON_AUTHORITY_DIAGNOSTIC","SEAL_AT_CODEX_RECHECK_8"} def vfield(field,value,grammar=None,allow_sentinel=True): if value is None: raise Reject("CANONICAL_FIELD_NULL_REJECTED",field) if value=="": raise Reject("CANONICAL_FIELD_EMPTY_REJECTED",field) for ch in value: if ord(ch) in FORBIDDEN_BYTES: raise Reject("CANONICAL_FIELD_RESERVED_TOKEN_REJECTED",f"{field} 0x{ord(ch):02x}") if field!="marker_literal": for t in RESERVED_TOKENS: if t in value: raise Reject("CANONICAL_FIELD_RESERVED_TOKEN_REJECTED",f"{field} token") if allow_sentinel and value in SENTINEL_OK: return value if grammar and not GRAMMARS[grammar].match(value): raise Reject("CANONICAL_FIELD_VALUE_GRAMMAR_REJECTED",f"{field}={value!r}") return value def rec(*f): for x in f: if "\t" in x or "\n" in x: raise Reject("CANONICAL_FIELD_RESERVED_TOKEN_REJECTED","sep in value") return ("\t".join(f)+"\n").encode() def digest(tag,records): return sha((tag+"\n").encode()+b"".join(records))
---- recheck-6 D / recheck-7 A: seal hash DAG -----------------------------
EDGES={"N1":[],"N2":[],"N3":[],"N4":[],"N5":[],"N6":["N1"], "N7":["N2","N3","N4","N5","N6","N1"],"N8":["N2","N5","N6","N7"],"N9_DIAG":[]} LOAD_BEARING={"N1","N2","N3","N4","N5","N6","N7","N8"} SELF_REVISION_INPUTS=set() def has_cycle(e): c={k:0 for k in e} def dfs(u): c[u]=1 for v in e[u]: if c[v]==1 or (c[v]==0 and dfs(v)): return True c[u]=2; return False return any(c[k]==0 and dfs(k) for k in e)
===========================================================================
PRODUCTION SEAL PATH (the part the recheck-8 fence was missing)
===========================================================================
PREFIX=KB_ROOT+"t1-fix7-existing-system-refactor-execution-blueprint-2026-06-08/" DOCS=["00-readme-first.md","01-live-existing-system-inventory.md","02-design-to-live-mapping.md", "03-gap-classification.md","04-dependency-safe-construction-order.md","05-rollback-blueprint.md", "06-test-guard-blueprint.md","07-implementation-package-split.md","08-hard-blocks-do-not-touch-list.md", "12-final-verdict.md"] MEMBERSHIP_EXPECT="f2bda8effc7be19b54722828126b82d7d2d48bee5e5e5dc0c8f347ce210fe251" SELF_HOST_DOC="00-readme-first.md" # the only doc with an ENVELOPE:EXCLUDE region
def normalize_lines(text): """CRLF/CR -> LF, then 1-based line model. A trailing LF is a terminator, not a content line, so it does not create a spurious empty final line.""" norm=text.replace("\r\n","\n").replace("\r","\n") lines=norm.split("\n") if lines and lines[-1]=="": lines=lines[:-1] return lines # lines[i-1] == 1-based line i
def pair_fences(lines, begin_kind, end_kind, unbalanced_status, nested_status): """Flat (non-nesting) single-stack pairing -> list of (begin_line, end_line) 1-based inclusive.""" ranges=[]; open_at=None for i,ln in enumerate(lines, start=1): c=classify_line(ln) if c is None: continue k=c[0] if k==begin_kind: if open_at is not None: raise Reject(nested_status,f"{begin_kind} at L{i} inside open L{open_at}") open_at=i elif k==end_kind: if open_at is None: raise Reject(unbalanced_status,f"{end_kind} at L{i} without begin") ranges.append((open_at,i)); open_at=None if open_at is not None: raise Reject(unbalanced_status,f"{begin_kind} at L{open_at} never closed") return ranges
def extract(document_id, raw_text, is_self_host): """Deterministic active-scope / fence / section extractor (doc 00 spec). Returns dict: normalized_active_content(bytes), markers[(kind,literal)], superseded_ranges[(b,e)], exclude_ranges[(b,e)], doc_status.""" lines=normalize_lines(raw_text) # 1) enumerate markers (whole-line grammar) markers=[] doc_status_lines=[] for i,ln in enumerate(lines, start=1): c=classify_line(ln) if c is None: continue markers.append((i,c[0],c[1])) if c[0]=="DOC_STATUS": doc_status_lines.append((i,c[1])) # 2) DOC_STATUS cardinality: exactly one if len(doc_status_lines)==0: raise Reject("ACTIVE_SCOPE_MARKER_MISSING",f"{document_id}: no DOC_STATUS") if len(doc_status_lines)>1: raise Reject("ACTIVE_SCOPE_MARKER_DUPLICATE",f"{document_id}: {len(doc_status_lines)} DOC_STATUS") doc_status=MARKER_GRAMMAR["DOC_STATUS"].match(doc_status_lines[0][1]).group(1) # 3) superseded fences (flat) + exclude fences (flat, self-host only meaningful) sup=pair_fences(lines,"SUPERSEDED_BEGIN","SUPERSEDED_END","FENCE_UNBALANCED","FENCE_NESTED_UNSUPPORTED") exc=pair_fences(lines,"ENVELOPE_EXCLUDE_BEGIN","ENVELOPE_EXCLUDE_END","EXCLUDE_REGION_UNBALANCED","FENCE_NESTED_UNSUPPORTED") # 4) removal set = superseded inclusive (+ exclude inclusive iff self-host) removal=set() for b,e in sup: removal.update(range(b,e+1)) if is_self_host: for b,e in exc: removal.update(range(b,e+1)) else: if exc: raise Reject("EXCLUDE_REGION_UNBALANCED",f"{document_id}: exclude region in non-self-host doc") # 5) overlap assertion (superseded vs exclude must not overlap) sup_set=set() for b,e in sup: sup_set.update(range(b,e+1)) exc_set=set() for b,e in exc: exc_set.update(range(b,e+1)) if sup_set & exc_set: raise Reject("ACTIVE_SUPERSEDED_OVERLAP",f"{document_id}: superseded/exclude overlap") # 6) normalized active content = retained line + LF, ascending line order active=b"" for i,ln in enumerate(lines, start=1): if i in removal: continue active += (ln+"\n").encode("utf-8") if not active: raise Reject("ACTIVE_CONTENT_EMPTY",f"{document_id}: no active content after removal") return {"normalized_active_content":active,"markers":[(m[1],m[2]) for m in markers], "superseded_ranges":sup,"exclude_ranges":exc,"doc_status":doc_status}
def per_doc_content_digest(document_id, normalized_active_content): # tag uniquely uses TAB between tag and document_id tag=("FIX7_DOC_NORMALIZED_CONTENT_V1\t"+document_id+"\n").encode("utf-8") return sha(tag+normalized_active_content)
active_section_id_or_range per the live envelope
SECTION_BY_DOC={d:"WHOLE_DOCUMENT" for d in DOCS} SECTION_BY_DOC["00-readme-first.md"]="WHOLE_DOCUMENT_MINUS_EXCLUDE_AND_SUPERSEDED" SECTION_BY_DOC["12-final-verdict.md"]="WHOLE_DOCUMENT_MINUS_SUPERSEDED_FENCES"
def membership(): """The FROZEN membership pin definition: digest over the 10 frozen doc_ids.""" ids=sorted(canonical_document_id(PREFIX+d, mcp_id=PREFIX+d) for d in DOCS) return digest("FIX7_ACTIVE_AUTHORITY_MEMBERSHIP_V1",[rec(i) for i in ids])
---- R9-B2: FAIL-CLOSED corpus gate ---------------------------------------
SUPPRESSED="SUPPRESSED_CORPUS_NOT_OK" SUPPRESSIBLE_DIGEST_KEYS=("membership_sha256","active_corpus_sha256","marker_fence_registry_sha256", "superseded_boundary_sha256","guard_set_sha256","canonicalizer_sha256_candidate", "envelope_manifest_sha256","detached_seal_sha256")
def validate_corpus_listing(actual_names, expected=None): """Pure fail-closed check of a docs-dir listing against the frozen membership. Detects MISSING, EXTRA, and DUPLICATE (exact or case-variant) members. Returns {"missing":[...], "extra":[...], "duplicate":[...], "ok":bool}.""" expected = DOCS if expected is None else expected exp=set(expected) missing=[]; extra=[]; duplicate=[] seen={} for n in sorted(actual_names): key=n.lower() if key in seen: duplicate.append(seen[key]) else: seen[key]=n if n not in exp: extra.append(n) present=set(actual_names) for e in expected: if e not in present: missing.append(e) return {"missing":missing,"extra":extra,"duplicate":duplicate, "ok": not (missing or extra or duplicate)}
def corpus_gate(listing_ok, errors, n_extracted, n_expected): """Pure gate: the corpus is OK only if the listing is exact, no per-doc extract error occurred, and every expected member extracted successfully.""" return bool(listing_ok) and not errors and n_extracted==n_expected
def gate_and_suppress(out): """R9-B2 enforcement: any corpus problem suppresses EVERY candidate digest and forces corpus_ok / membership_frozen_ok to False. Returns out.""" out["corpus_ok"]=corpus_gate(out["listing"]["ok"], out["errors"], len(out["per_doc"]), len(DOCS)) out["membership_frozen_ok"]=bool(out["corpus_ok"] and out.get("membership_sha256")==MEMBERSHIP_EXPECT) if not (out["corpus_ok"] and out["membership_frozen_ok"]): for k in SUPPRESSIBLE_DIGEST_KEYS: if k in out: out[k]=SUPPRESSED out["membership_frozen_ok"]=False return out
def produce(docs_dir, ssot_md_path=None, kb_revisions=None): """Run the production seal path over the real active docs. FAIL-CLOSED (R9-B2): any missing/extra/duplicate/extract-error/invalid member suppresses every candidate digest and the CLI exits nonzero. Returns a dict of every COMPUTABLE digest (candidate; values are SEAL_AT_CODEX_RECHECK_8 by design), plus the rehearsal manifest and the N8 Codex-only marker.""" kb_revisions = kb_revisions or {} out={"per_doc":{}, "errors":{}, "markers":[], "superseded":[]} if os.path.isdir(docs_dir): listed=sorted(f for f in os.listdir(docs_dir) if f.lower().endswith(".md")) else: listed=[]; out["errors"]["docs_dir"]="DOCS_DIR_MISSING" out["listing"]=validate_corpus_listing(listed) corpus_records=[]; reg_records=[]; boundary_records=[] guard_set_digest=None for d in DOCS: path=os.path.join(docs_dir,d) if not os.path.exists(path): out["errors"][d]="LOCAL_FILE_MISSING"; continue raw=open(path,"r",encoding="utf-8").read() doc_id=PREFIX+d try: ex=extract(doc_id, raw, is_self_host=(d==SELF_HOST_DOC)) except Reject as e: out["errors"][d]=e.status; continue cdig=per_doc_content_digest(doc_id, ex["normalized_active_content"]) out["per_doc"][d]={"normalized_active_content_sha256":cdig, "active_bytes":len(ex["normalized_active_content"]), "doc_status":ex["doc_status"], "n_markers":len(ex["markers"]), "n_superseded":len(ex["superseded_ranges"])} if d==SELF_HOST_DOC: kbrev="SELF_HOST_PIN_BY_EXCLUDE_REGION_HASH" else: kbrev=str(kb_revisions.get(d,"SEAL_AT_CODEX_RECHECK_8")) # active_corpus record corpus_records.append(rec(doc_id, ex["doc_status"], SECTION_BY_DOC[d], kbrev, cdig)) # marker/fence registry records for kind,lit in ex["markers"]: reg_records.append(rec(doc_id, kind, lit)) out["markers"].append((d,kind,lit)) # superseded boundary records for k,(b,e) in enumerate(sorted(ex["superseded_ranges"]), start=1): sid=f"{doc_id}#S{k}"; rng=f"L{b}-L{e}" boundary_records.append(rec(sid, rng)) out["superseded"].append((sid,rng)) if d=="06-test-guard-blueprint.md": guard_set_digest=cdig # guard_set_sha256 := N1 of doc 06 # membership over the ACTUALLY PRESENT AND VALID members (R9-B2): a missing or # invalid member yields a different digest than the frozen pin -> frozen_ok False. valid_ids=sorted(PREFIX+d for d in DOCS if d in out["per_doc"]) out["membership_sha256"]=digest("FIX7_ACTIVE_AUTHORITY_MEMBERSHIP_V1",[rec(i) for i in valid_ids]) out["active_corpus_sha256"]=digest("FIX7_ACTIVE_AUTHORITY_CORPUS_V1", sorted(corpus_records)) out["marker_fence_registry_sha256"]=digest("FIX7_MARKER_FENCE_REGISTRY_V1", sorted(reg_records)) out["superseded_boundary_sha256"]=digest("FIX7_SUPERSEDED_BOUNDARY_V1", sorted(boundary_records)) if guard_set_digest is None: out["errors"].setdefault("06-test-guard-blueprint.md","GUARD_SET_SOURCE_MISSING") out["guard_set_sha256"]="GUARD_SET_SOURCE_MISSING" else: out["guard_set_sha256"]=guard_set_digest if ssot_md_path and os.path.exists(ssot_md_path): md=open(ssot_md_path,"r",encoding="utf-8").read() md=md.replace("\r\n","\n").replace("\r","\n") out["canonicalizer_sha256_candidate"]=sha(md.encode("utf-8")) # N7 envelope_manifest: REHEARSAL ONLY (binds SEALED sub-digests + approval-event # fields that only Codex sets). We refuse to emit a "real" value -> not self-fabricated. out["envelope_manifest_sha256"]="REHEARSAL_ONLY_NEEDS_SEALED_INPUTS (N7)" # N8 detached_seal: Codex authors (sealed_by/at, signature, parent_checkpoint, report_documents) out["detached_seal_sha256"]="CODEX_ONLY_NOT_SELF_COMPUTABLE (N8)" return gate_and_suppress(out)
def duplicate_authority_inventory(candidate_paths): """P4: a runnable inventory proving exactly ONE canonical canonicalizer identity. Each candidate is (label, identity_string). Pass iff exactly one distinct identity.""" ids={} for label,identity in candidate_paths: ids.setdefault(identity,[]).append(label) return {"distinct_identities":len(ids), "ok":len(ids)==1, "map":ids}
===========================================================================
SELFTEST (recheck-8 unit vectors + recheck-9 production-path fixtures
+ R9-B2 fail-closed corpus-gate fixtures)
===========================================================================
def selftest(): out=[]; ok=True def chk(label, cond): nonlocal ok; ok = ok and cond; out.append(f" [{'PASS' if cond else 'FAIL'}] {label}") # ---- recheck-8 unit vectors (preserved) ---- chk("membership == f2bda8...fe251", membership()==MEMBERSHIP_EXPECT) chk("DAG acyclic", not has_cycle(EDGES)) chk("no self-revision input in load-bearing", len(SELF_REVISION_INPUTS)==0 and LOAD_BEARING_FORBIDS_SELF_REVISION) chk("valid doc id accepted", canonical_document_id(PREFIX+"00-readme-first.md", mcp_id=PREFIX+"00-readme-first.md")==PREFIX+"00-readme-first.md") chk("valid marker accepted", check_marker("DOC_STATUS","<!-- DOC_STATUS: ACTIVE_AUTHORITY -->")[0]=="DOC_STATUS") def expect(label,status,fn): nonlocal ok try: fn(); out.append(f" [FAIL] {label} (not rejected)"); ok=False except Reject as e: good=e.status==status; ok=ok and good out.append(f" [{'PASS' if good else 'FAIL'}] {label} -> {e.status}") expect("doc_id '.' segment","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id(KB_ROOT+"./x.md")) expect("doc_id '..' segment","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id(KB_ROOT+"a/../x.md")) expect("doc_id '//'","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id(KB_ROOT+"a//x.md")) expect("doc_id empty seg(trailing)","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id(KB_ROOT+"x.md/")) expect("doc_id backslash","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id(KB_ROOT+"a\x.md")) expect("doc_id url-encoded","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id(KB_ROOT+"a%2e/x.md")) expect("doc_id homoglyph slash","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id(KB_ROOT+"a⁄x.md")) expect("doc_id leading slash","DOCUMENT_ID_ALIAS_REJECTED",lambda:canonical_document_id("/"+KB_ROOT+"x.md")) expect("doc_id scope mismatch","DOCUMENT_ID_SCOPE_MISMATCH",lambda:canonical_document_id("other/dir/x.md")) expect("doc_id != mcp (case)","DOCUMENT_ID_NOT_MCP_CANONICAL",lambda:canonical_document_id(PREFIX+"00-Readme-First.md", mcp_id=PREFIX+"00-readme-first.md")) expect("marker unknown kind","MARKER_KIND_UNKNOWN",lambda:check_marker("FOO","<!-- DOC_STATUS: ACTIVE_AUTHORITY -->")) expect("marker kind/literal inconsistent","MARKER_KIND_LITERAL_INCONSISTENT",lambda:check_marker("DOC_STATUS","<!-- ENVELOPE:EXCLUDE-BEGIN -->")) expect("marker literal typo","MARKER_LITERAL_NOT_ALLOWED",lambda:check_marker("DOC_STATUS","<!-- DOC_STATUS: ACTIVE -->")) expect("field TAB rejected","CANONICAL_FIELD_RESERVED_TOKEN_REJECTED",lambda:vfield("x","a\tb")) expect("field null rejected","CANONICAL_FIELD_NULL_REJECTED",lambda:vfield("x",None)) expect("field empty rejected","CANONICAL_FIELD_EMPTY_REJECTED",lambda:vfield("x","")) e2={k:list(v) for k,v in EDGES.items()}; e2["N8"]=e2["N8"]+["N8"] chk("seal self-revision/self-hash edge -> cycle detected", has_cycle(e2))
# ---- recheck-9 PRODUCTION-PATH fixtures (the part L2 said was missing) ----
# worked normalized-content vector on a tiny fixture (deterministic pin)
tiny="<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\nalpha\nbeta\n"
ex=extract("knowledge/dev/reports/architecture/x/y.md", tiny, is_self_host=False)
expect_active=b"<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\nalpha\nbeta\n"
chk("extractor: active content (no fences) == input", ex["normalized_active_content"]==expect_active)
chk("extractor: per-doc digest deterministic", per_doc_content_digest("a/b.md",expect_active)==per_doc_content_digest("a/b.md",expect_active))
# superseded fence removed inclusive
sup="<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\nkeep1\n<!-- SUPERSEDED_NON_AUTHORITY BEGIN -->\ndrop\n<!-- SUPERSEDED_NON_AUTHORITY END -->\nkeep2\n"
exs=extract("k/s.md", sup, is_self_host=False)
chk("extractor: superseded fence removed inclusive", exs["normalized_active_content"]==b"<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\nkeep1\nkeep2\n")
chk("extractor: 1 superseded range recorded", exs["superseded_ranges"]==[(3,5)])
# CRLF normalization
crlf="<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\r\nx\r\n"
exc=extract("k/c.md", crlf, is_self_host=False)
chk("extractor: CRLF normalized to LF", exc["normalized_active_content"]==b"<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\nx\n")
# fail-closed: missing DOC_STATUS
expect("extractor: missing DOC_STATUS","ACTIVE_SCOPE_MARKER_MISSING",lambda:extract("k/m.md","no marker here\n",False))
# fail-closed: duplicate DOC_STATUS
expect("extractor: duplicate DOC_STATUS","ACTIVE_SCOPE_MARKER_DUPLICATE",
lambda:extract("k/d.md","<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\n<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\n",False))
# fail-closed: unbalanced superseded fence
expect("extractor: unbalanced superseded fence","FENCE_UNBALANCED",
lambda:extract("k/u.md","<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\n<!-- SUPERSEDED_NON_AUTHORITY BEGIN -->\nx\n",False))
# fail-closed: nested superseded fence
expect("extractor: nested superseded fence","FENCE_NESTED_UNSUPPORTED",
lambda:extract("k/n.md","<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\n<!-- SUPERSEDED_NON_AUTHORITY BEGIN -->\n<!-- SUPERSEDED_NON_AUTHORITY BEGIN -->\n<!-- SUPERSEDED_NON_AUTHORITY END -->\n<!-- SUPERSEDED_NON_AUTHORITY END -->\n",False))
# fail-closed: exclude region in non-self-host doc
expect("extractor: exclude region forbidden off self-host","EXCLUDE_REGION_UNBALANCED",
lambda:extract("k/e.md","<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\n<!-- ENVELOPE:EXCLUDE-BEGIN -->\nx\n<!-- ENVELOPE:EXCLUDE-END -->\n",False))
# self-host exclude region removed inclusive
sh="<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\nkeep\n<!-- ENVELOPE:EXCLUDE-BEGIN -->\nsecret\n<!-- ENVELOPE:EXCLUDE-END -->\ntail\n"
exsh=extract("k/sh.md", sh, is_self_host=True)
chk("extractor: self-host exclude removed inclusive", exsh["normalized_active_content"]==b"<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\nkeep\ntail\n")
# dup-authority inventory: one identity passes, two fails
di_ok=duplicate_authority_inventory([("md","ID-A"),("py","ID-A")])
di_bad=duplicate_authority_inventory([("md","ID-A"),("fork","ID-B")])
chk("dup-authority inventory: one identity -> ok", di_ok["ok"] and di_ok["distinct_identities"]==1)
chk("dup-authority inventory: two identities -> not ok", (not di_bad["ok"]) and di_bad["distinct_identities"]==2)
# guard_set definition identity (encoder wiring, not a value)
chk("guard_set_sha256 := N1(doc06) wiring present", "06-test-guard-blueprint.md" in DOCS)
# ---- R9-B2 FAIL-CLOSED corpus-gate fixtures (pure, no filesystem) ----
chk("R9-B2 corpus listing: exact 10 members -> ok", validate_corpus_listing(list(DOCS))["ok"])
miss=validate_corpus_listing([d for d in DOCS if d!="05-rollback-blueprint.md"])
chk("R9-B2 corpus listing: missing member detected -> not ok", (not miss["ok"]) and miss["missing"]==["05-rollback-blueprint.md"])
extra=validate_corpus_listing(list(DOCS)+["99-extra-doc.md"])
chk("R9-B2 corpus listing: extra member detected -> not ok", (not extra["ok"]) and extra["extra"]==["99-extra-doc.md"])
dup=validate_corpus_listing(list(DOCS)+["05-Rollback-Blueprint.md"])
chk("R9-B2 corpus listing: duplicate (case-variant) detected -> not ok", (not dup["ok"]) and len(dup["duplicate"])==1)
expect("extractor: empty active content rejected","ACTIVE_CONTENT_EMPTY",
lambda:extract("k/z.md","<!-- ENVELOPE:EXCLUDE-BEGIN -->\n<!-- DOC_STATUS: ACTIVE_AUTHORITY -->\n<!-- ENVELOPE:EXCLUDE-END -->\n",True))
chk("R9-B2 corpus gate: clean corpus -> ok", corpus_gate(True, {}, len(DOCS), len(DOCS)))
chk("R9-B2 corpus gate: extract error forces not ok", not corpus_gate(True, {"05-rollback-blueprint.md":"LOCAL_FILE_MISSING"}, 9, len(DOCS)))
chk("R9-B2 corpus gate: bad listing forces not ok", not corpus_gate(False, {}, len(DOCS), len(DOCS)))
fake={"listing":{"ok":False,"missing":["05-rollback-blueprint.md"],"extra":[],"duplicate":[]},
"errors":{"05-rollback-blueprint.md":"LOCAL_FILE_MISSING"},
"per_doc":{d:None for d in DOCS if d!="05-rollback-blueprint.md"},
"membership_sha256":"0"*64,"active_corpus_sha256":"0"*64,
"marker_fence_registry_sha256":"0"*64,"superseded_boundary_sha256":"0"*64,
"guard_set_sha256":"0"*64,"canonicalizer_sha256_candidate":"0"*64,
"envelope_manifest_sha256":"x","detached_seal_sha256":"x"}
g=gate_and_suppress(fake)
chk("R9-B2 corpus gate: problem suppresses every candidate digest",
(not g["corpus_ok"]) and (not g["membership_frozen_ok"]) and
all(g[k]==SUPPRESSED for k in SUPPRESSIBLE_DIGEST_KEYS))
return ok, out
if name=="main": args=sys.argv[1:] if args and args[0]=="--produce": docs_dir=args[1] if len(args)>1 else "docs" ssot=args[2] if len(args)>2 else None res=produce(docs_dir, ssot) print("FIX7-CANON-V1 PRODUCTION SEAL PATH (candidate; values SEAL_AT_CODEX_RECHECK_8; FAIL-CLOSED R9-B2)") print("membership_sha256 :", res["membership_sha256"], "(over PRESENT+VALID members; frozen pin f2bda8...fe251)") print("active_corpus_sha256 :", res["active_corpus_sha256"]) print("marker_fence_registry_sha256 :", res["marker_fence_registry_sha256"]) print("superseded_boundary_sha256 :", res["superseded_boundary_sha256"]) print("guard_set_sha256 :", res["guard_set_sha256"]) print("canonicalizer_sha256_cand :", res.get("canonicalizer_sha256_candidate")) print("envelope_manifest_sha256 :", res["envelope_manifest_sha256"]) print("detached_seal_sha256 :", res["detached_seal_sha256"]) print("per-doc normalized_active_content_sha256:") for d in DOCS: if d in res["per_doc"]: pd=res["per_doc"][d] print(f" {d:42s} {pd['normalized_active_content_sha256']} bytes={pd['active_bytes']} status={pd['doc_status']} markers={pd['n_markers']} sup={pd['n_superseded']}") else: print(f" {d:42s} EXTRACT_ERROR={res['errors'].get(d)}") L=res["listing"] if not L["ok"]: print(f"corpus_listing_problems: missing={L['missing']} extra={L['extra']} duplicate={L['duplicate']}") if res["errors"]: print("corpus_errors:", {k: res["errors"][k] for k in sorted(res["errors"])}) print("corpus_ok:", res["corpus_ok"]) print("membership_frozen_ok:", res["membership_frozen_ok"]) if not (res["corpus_ok"] and res["membership_frozen_ok"]): print("ALL CANDIDATE DIGESTS SUPPRESSED (fail-closed: corpus not OK) -> exit 4") sys.exit(4) sys.exit(0) ok,out=selftest() print("FIX7-CANON-V1 CANONICALIZER SSOT SELFTEST (extended: unit + production-path + fail-closed corpus gate)") print("\n".join(out)) print("ALL PASS:", ok, f"({sum('[PASS]' in l for l in out)}/{len(out)} checks)") sys.exit(0 if ok else 1)