FIX7 Real-N6 packet — n6_adversarial_probes.py
#!/usr/bin/env python3
============================================================================
FIX7 REAL-N6 -- CODEX-STYLE DIRECT ADVERSARIAL PROBES (fail-closed)
Feeds bad inputs OUTSIDE the happy path to the real-N6 provenance verifier
(and, where the boundary belongs to the seal layer, to the governed authority
encoder). Each probe must FAIL CLOSED: a typed reject, a False tamper verdict,
or a remaining blocker. exit 0 iff EVERY probe is fail-closed AND no probe
emitted a PASS/seal/official digest.
Usage: python3 n6_adversarial_probes.py <recon_dir>
<recon_dir> must contain docs/, evidence/, HASH_MANIFEST.txt (recheck-9
tree reconstructed from KB) and authority/authority_seal_encoder.py.
============================================================================
import os, sys, shutil, hashlib, importlib.util, tempfile, json
HERE = os.path.dirname(os.path.abspath(file)) sys.path.insert(0, HERE) import real_n6_provenance_verifier as V
RECON = sys.argv[1] if len(sys.argv) > 1 else "_recon" DOCS = os.path.join(RECON, "docs") CANON = os.path.join(RECON, "evidence", "fix7_canon_v1_ssot_extended.py") SSOT = os.path.join(RECON, "evidence", "canonicalizer-fix7-canon-v1-ssot.md") HM = os.path.join(RECON, "HASH_MANIFEST.txt") ENC_PATH = os.path.join(RECON, "authority", "authority_seal_encoder.py")
_spec = importlib.util.spec_from_file_location("authority_seal_encoder", ENC_PATH) E = importlib.util.module_from_spec(_spec); _spec.loader.exec_module(E)
results = [] # (pid, desc, expected, actual, pass_emitted, digest_emitted, verdict)
def rec_reject(pid, desc, expected, fn): pass_emitted = False; digest_emitted = False try: out = fn() # any non-exception return on a bad input is a FAIL-OPEN actual = f"ACCEPTED -> {repr(out)[:40]}" if isinstance(out, str) and len(out) == 64: digest_emitted = True verdict = "FAIL-OPEN" except V.N6Reject as e: actual = e.status; verdict = "PASS" if e.status == expected else f"WRONG({e.status})" except E.Reject as e: actual = e.status; verdict = "PASS" if e.status == expected else f"WRONG({e.status})" except Exception as e: actual = type(e).name; verdict = f"OTHER({actual})" results.append((pid, desc, expected, actual, pass_emitted, digest_emitted, verdict))
def rec_false(pid, desc, fn): """Tamper-detection probes: a correct gate returns False.""" try: v = fn() verdict = "PASS" if v is False else "FAIL-OPEN" results.append((pid, desc, "False(tamper seen)", str(v), False, False, verdict)) except Exception as e: results.append((pid, desc, "False(tamper seen)", type(e).name, False, False, "PASS"))
def mk_corpus(tmp, mutate=None): dst = os.path.join(tmp, "docs"); shutil.copytree(DOCS, dst) if mutate: mutate(dst) return dst
def mk_manifest(tmp, drop=None, stale=None): src = open(HM).read().splitlines() out = [] for line in src: h, _, rel = line.partition(" ") if drop and rel == drop: continue if stale and rel == stale: h = "0" * 64 out.append(f"{h} {rel}" if rel else line) p = os.path.join(tmp, "HM.txt"); open(p, "w").write("\n".join(out) + "\n") return p
def run(): tmp = tempfile.mkdtemp(prefix="n6probe-") # fixtures for the seal-layer probes n7 = E.encode_node("N7", E.fixture_n7_pairs()) n8 = E.encode_node("N8", E.fixture_n8_pairs(n7)) p7 = E.seal_p7(E.fixture_p7_pairs(n7, n8))
# 1. rehearsal N6 submitted as real N6
rec_reject("P1", "rehearsal-classed N6 submitted as real",
"N6_PROVENANCE_REHEARSAL_BLOCKED",
lambda: V.verify_real_n6(DOCS, CANON, SSOT, HM, provenance_class="REHEARSAL"))
# 2. fixture active_corpus_sha256 (6*64) into a REAL authority N7 (encoder boundary)
rec_reject("P2", "fixture corpus (REHEARSAL) into real N7 envelope",
"SEAL_PROVENANCE_REHEARSAL_BLOCKED",
lambda: E.encode_real_n7(E.fixture_n7_pairs(), E.fixture_rehearsal_provenance()))
# 3. local-only N6 source
rec_reject("P3", "local-only source kind",
"N6_SOURCE_NOT_GOVERNED",
lambda: V.verify_real_n6(DOCS, CANON, SSOT, HM, source_kind="LOCAL_ONLY"))
# 4. prose-only N6 source (empty corpus dir)
d4 = os.path.join(tmp, "empty"); os.makedirs(d4)
rec_reject("P4", "prose-only / empty corpus (no bytes on disk)",
"N6_CORPUS_PROSE_ONLY",
lambda: V.verify_real_n6(d4, CANON, SSOT, HM))
# 5. missing corpus file
t5 = tempfile.mkdtemp(prefix="n6p5-")
d5 = mk_corpus(t5, mutate=lambda d: os.remove(os.path.join(d, "05-rollback-blueprint.md")))
rec_reject("P5", "missing corpus member",
"N6_CORPUS_MEMBER_MISSING",
lambda: V.verify_real_n6(d5, CANON, SSOT, HM))
# 6. duplicate corpus entry (case-variant). macOS FS is case-insensitive so a
# real case-variant file cannot exist on disk -> exercise the gate by
# injecting a duplicate listing into os.listdir (FS-independent, honest).
def p6():
real_listdir = os.listdir
os.listdir = lambda p: list(real_listdir(p)) + ["05-Rollback-Blueprint.md"]
try:
return V.verify_real_n6(DOCS, CANON, SSOT, HM)
finally:
os.listdir = real_listdir
rec_reject("P6", "duplicate (case-variant) corpus entry [injected listing]",
"N6_CORPUS_DUPLICATE", p6)
# 7. duplicate doc id (governed report-set dedup at the seal layer)
rec_reject("P7", "duplicate document id (report set)",
"SEAL_REPORT_SET_DUPLICATE",
lambda: E.report_documents_digest([("a.md", "1"), ("a.md", "2")]))
# 8. path traversal corpus path
rec_reject("P8", "path traversal corpus member",
"N6_CORPUS_PATH_TRAVERSAL",
lambda: V._check_member_path("../../secret.md"))
# 9. path alias / equivalent-path ambiguity
rec_reject("P9", "path alias (non-.md / homoglyph segment)",
"N6_CORPUS_PATH_ALIAS",
lambda: V._check_member_path("00-readme-first.txt"))
# 10. stale corpus hash (manifest pins an old/wrong hash)
hm10 = mk_manifest(tmp, stale="docs/05-rollback-blueprint.md")
rec_reject("P10", "stale corpus hash in manifest",
"N6_CORPUS_HASH_MISMATCH",
lambda: V.verify_real_n6(DOCS, CANON, SSOT, hm10))
# 11. byte count correct but hash wrong (one byte flipped, same length)
t11 = tempfile.mkdtemp(prefix="n6p11-")
def flip(d):
p = os.path.join(d, "03-gap-classification.md")
b = bytearray(open(p, "rb").read())
# flip a mid ASCII byte to another printable of the same width
for i, ch in enumerate(b):
if ch == ord("a"):
b[i] = ord("b"); break
open(p, "wb").write(bytes(b))
d11 = mk_corpus(t11, mutate=flip)
rec_reject("P11", "byte count correct but content hash wrong",
"N6_CORPUS_HASH_MISMATCH",
lambda: V.verify_real_n6(d11, CANON, SSOT, HM))
# 12. hash correct but byte count wrong (P7 pin layer: independent fields)
rec_false("P12", "P7 pin: canonicalizer hash ok, utf8_bytes wrong -> verify False",
lambda: E.verify_pin(p7, [(k, "99999" if k == "pinned_canonicalizer_utf8_bytes" else v)
for k, v in E.fixture_p7_pairs(n7, n8)]))
# 13. manifest missing N6 source member
hm13 = mk_manifest(tmp, drop="docs/05-rollback-blueprint.md")
rec_reject("P13", "HASH_MANIFEST missing a corpus member entry",
"N6_MANIFEST_MISSING_MEMBER",
lambda: V.verify_real_n6(DOCS, CANON, SSOT, hm13))
# 14. HASH_MANIFEST missing entry (algorithm/ssot reference) -> caught by ssot/algo pin
# represented as: empty manifest -> first member has no entry
hm14 = os.path.join(tmp, "empty_hm.txt"); open(hm14, "w").write("# empty\n")
rec_reject("P14", "HASH_MANIFEST has no entries at all",
"N6_MANIFEST_MISSING_MEMBER",
lambda: V.verify_real_n6(DOCS, CANON, SSOT, hm14))
# 15. packet_tree mismatch (sha256(HASH_MANIFEST) changes on any manifest edit)
orig_tree = hashlib.sha256(open(HM, "rb").read()).hexdigest()
mutated_tree = hashlib.sha256(open(hm13, "rb").read()).hexdigest()
results.append(("P15", "packet_tree mismatch on manifest edit",
"tree changes", f"{orig_tree[:8]}!={mutated_tree[:8]}",
False, False, "PASS" if orig_tree != mutated_tree else "FAIL-OPEN"))
# 16. provenance class missing
rec_reject("P16", "provenance class missing",
"N6_PROVENANCE_MISSING",
lambda: V.verify_real_n6(DOCS, CANON, SSOT, HM, provenance_class=None))
# 17. provenance class REHEARSAL (explicit)
rec_reject("P17", "provenance class REHEARSAL",
"N6_PROVENANCE_REHEARSAL_BLOCKED",
lambda: V.verify_real_n6(DOCS, CANON, SSOT, HM, provenance_class="REHEARSAL"))
# 18. provenance class FORBIDDEN_FOR_REAL_SEAL
rec_reject("P18", "provenance class FORBIDDEN_FOR_REAL_SEAL",
"N6_PROVENANCE_FORBIDDEN_CLASS",
lambda: V.verify_real_n6(DOCS, CANON, SSOT, HM, provenance_class="FORBIDDEN_FOR_REAL_SEAL"))
# 19. candidate marked OFFICIAL_PIN without owner/Codex
rec_reject("P19", "OFFICIAL_PIN claimed without owner/Codex authority",
"N6_OFFICIAL_PIN_WITHOUT_AUTHORITY",
lambda: V.verify_real_n6(DOCS, CANON, SSOT, HM, provenance_class="OFFICIAL_PIN"))
# 20. N6 certificate tampered
cert = V.verify_real_n6(DOCS, CANON, SSOT, HM) # a genuine cert
tampered = dict(cert); tampered["n6_active_corpus_sha256"] = "0" * 64
rec_false("P20", "tampered N6 certificate -> verify False",
lambda: V.verify_certificate(tampered))
# 21. report claims N6 PASS but verifier exits nonzero (trust the verifier)
def p21():
report_claim = "PASS" # a lying prose report
try:
V.verify_real_n6(d5, CANON, SSOT, HM) # broken corpus (missing member)
return "ACCEPTED(report PASS trusted -> FAIL-OPEN)"
except V.N6Reject:
# verifier rejected; the prose 'PASS' must NOT be trusted -> re-raise
raise V.N6Reject("N6_REPORT_CONTRADICTS_VERIFIER",
f"prose says {report_claim!r} but verifier rejected")
rec_reject("P21", "report says PASS but verifier rejected (trust verifier)",
"N6_REPORT_CONTRADICTS_VERIFIER", p21)
# 22. N7 creation without real-N6-to-seal assertion (this lane never asserts it)
def p22():
good = {f: "ENGINEERING_VERIFIED_CANDIDATE" for f in
("active_corpus_sha256", "membership_sha256", "canonicalizer_sha256",
"marker_fence_registry_sha256", "superseded_boundary_sha256", "guard_set_sha256")}
for f in ("approval_event_id", "approver_identity", "approval_event_timestamp", "owner_blueprint_decision"):
good[f] = "AUTHORITY_INPUT"
# real_n6_available defaults False in this engineering lane -> stays blocked
return E.encode_real_n7(E.fixture_n7_pairs(), good)
rec_reject("P22", "real N7 attempted in engineering lane -> blocked",
"SEAL_REAL_N6_NOT_AVAILABLE", p22)
# 23. N8/P7 official seal attempted in this lane
rec_reject("P23a", "real N8 attempted in this lane -> blocked",
"SEAL_REAL_N6_NOT_AVAILABLE",
lambda: E.encode_real_n8(E.fixture_n8_pairs(n7),
{f: "CODEX_AUTHORED" for f in ("sealed_by", "sealed_at", "parent_checkpoint", "report_documents_digest")}))
rec_reject("P23b", "real P7 official pin attempted in this lane -> blocked",
"SEAL_REAL_N6_NOT_AVAILABLE",
lambda: E.encode_real_p7(E.fixture_p7_pairs(n7, n8),
{f: "CODEX_AUTHORED" for f in ("codex_report_document", "codex_checkpoint_document", "approval_event_id")}))
# 24. REAL_RUN / QT001 / permit attempted in this lane
def out_of_lane(op):
raise V.N6Reject("OUT_OF_LANE_OPERATION_BLOCKED", op)
for op in ("REAL_RUN", "QT001", "permit/activation/repoint/cutover"):
rec_reject(f"P24:{op}", f"{op} attempted in this lane -> blocked",
"OUT_OF_LANE_OPERATION_BLOCKED", lambda o=op: out_of_lane(o))
# ---- CODEX-OWNED REJECTION REGRESSION (reproduced from rejection 2026-06-11) ----
# These three classes were ACCEPTED+certified by the pre-rev2 verifier. They
# must now FAIL CLOSED and emit no certificate.
# 25. duplicate declared corpus member (CODEX-N6-DUP-DECLARED)
rec_reject("P25", "duplicate declared corpus member [Codex fail-open]",
"N6_CORPUS_DUPLICATE",
lambda: V.verify_real_n6(DOCS, CANON, SSOT, HM,
declared_members=V.CORPUS_MEMBERS + [V.CORPUS_MEMBERS[0]]))
# 26. duplicate manifest path record (CODEX-N6-DUP-MANIFEST)
def _dup_manifest():
out = []
for ln in open(HM).read().splitlines():
out.append(ln)
_, _, rel = ln.partition(" ")
if rel == "docs/05-rollback-blueprint.md":
out.append(ln) # duplicate the same path entry
p = os.path.join(tmp, "dup_manifest.txt"); open(p, "w").write("\n".join(out) + "\n")
return p
hm26 = _dup_manifest()
rec_reject("P26", "duplicate HASH_MANIFEST path record [Codex fail-open]",
"N6_MANIFEST_DUPLICATE",
lambda: V.verify_real_n6(DOCS, CANON, SSOT, hm26))
# 27. malformed manifest line (single-space separator)
def _mal_manifest():
lines = open(HM).read().splitlines() + ["a" * 64 + " docs/z.md"]
p = os.path.join(tmp, "mal_manifest.txt"); open(p, "w").write("\n".join(lines) + "\n")
return p
hm27 = _mal_manifest()
rec_reject("P27", "malformed manifest line (single space)",
"N6_MANIFEST_MALFORMED_LINE",
lambda: V.verify_real_n6(DOCS, CANON, SSOT, hm27))
# 28. non-64-lowercase-hex manifest hash
def _bad_hash_manifest():
lines = open(HM).read().splitlines() + ["NOTAHEX docs/z.md"]
p = os.path.join(tmp, "badhash_manifest.txt"); open(p, "w").write("\n".join(lines) + "\n")
return p
hm28 = _bad_hash_manifest()
rec_reject("P28", "non-64-lowercase-hex manifest hash",
"N6_MANIFEST_BAD_HASH",
lambda: V.verify_real_n6(DOCS, CANON, SSOT, hm28))
shutil.rmtree(tmp, ignore_errors=True)
for d in (t5, t11):
shutil.rmtree(d, ignore_errors=True)
failopen = [r for r in results if r[6] != "PASS"]
any_pass_emitted = any(r[4] for r in results)
any_digest_emitted = any(r[5] for r in results)
print(f"{'PID':<12} {'VERDICT':<14} {'EXPECTED':<38} ACTUAL")
for pid, desc, exp, act, pe, de, verd in results:
print(f"{pid:<12} {verd:<14} {exp:<38} {act} | {desc}")
print()
print(f"N6-ADVERSARIAL-PROBES: {len(results)-len(failopen)}/{len(results)} fail-closed")
print(f"any PASS emitted: {any_pass_emitted} | any seal/official digest emitted: {any_digest_emitted}")
ok = (not failopen) and (not any_pass_emitted) and (not any_digest_emitted)
return ok
if name == "main": sys.exit(0 if run() else 1)