FIX7 authority-input packet — authority_input_validator.py
#!/usr/bin/env python3
============================================================================
FIX7 N7/N8/P7 AUTHORITY-INPUT VALIDATOR (fail-closed)
Validates the authority-input packet that prepares the next owner/Codex
authoring phase. It PREPARES inputs; it NEVER authors N7/N8/P7, never seals,
never approves, never promotes. exit 0 iff EVERY rule holds for the supplied
packet directory. A single broken rule -> nonzero + a stable reject code.
Ratified facts this validator enforces against (Codex, 2026-06-11):
* N-number table : RATIFIED_FOR_BINDING_USE
* N6 d777e87c... : RATIFIED_ENGINEERING_VERIFIED_CANDIDATE (NOT a seal/pin)
Usage:
python3 authority_input_validator.py --selftest
python3 authority_input_validator.py --validate <packet_dir>
============================================================================
import json, os, sys
RATIFIED_N6 = "d777e87c73d3b62d36789d9343f346102e98dbf301f2c93f7608470b876b258c" RATIFIED_N6_CERT_BINDING = "055828db8303aaaad0ba22adfff54eecc7d31b1fabc90d021be5503bdf746b96" ALLOWED_N6_STATUS = {"RATIFIED_ENGINEERING_VERIFIED_CANDIDATE"} FORBIDDEN_N6_STATUS = {"OFFICIAL_SEAL", "OFFICIAL_PIN", "SEALED", "PROMOTED", "SEAL_REAL_N6_NOT_AVAILABLE", "REHEARSAL", "NOT_AVAILABLE"} ALLOWED_NNUMBER_STATUS = {"RATIFIED_FOR_BINDING_USE"} AUTHORITY_FIELDS = ["A1_approval_event_id", "A2_approver_identity", "A3_approval_event_timestamp", "A5_owner_blueprint_decision"]
Tokens that, if they appear as a concrete authored seal value, mean a seal was
illegally authored inside this input packet.
SEAL_OUTPUT_FIELDS = ["envelope_manifest_sha256", "detached_seal_sha256", "authority_seal_pin_sha256"] MISSING_TOKENS = {"MISSING_AUTHORITY_INPUT", "MISSING", "", None, "DEPENDS_ON_REAL_N7", "DEPENDS_ON_REAL_N8"}
class AuthInputReject(Exception): def init(self, status, detail=""): self.status = status self.detail = detail super().init(f"{status}: {detail}")
def _load(packet_dir, name): p = os.path.join(packet_dir, name) if not os.path.exists(p): raise AuthInputReject("AUTHINPUT_FILE_MISSING", name) with open(p, "r", encoding="utf-8") as f: try: return json.load(f) except json.JSONDecodeError as e: raise AuthInputReject("AUTHINPUT_FILE_MALFORMED", f"{name}: {e}")
def _is_hex64(s): return isinstance(s, str) and len(s) == 64 and all(c in "0123456789abcdef" for c in s)
def validate_packet(packet_dir): """Return list of (rule, ok, note). Raises AuthInputReject on the first structural failure; soft rule failures are returned with ok=False.""" rules = []
def rule(name, ok, note=""):
rules.append((name, bool(ok), note))
return bool(ok)
roster = _load(packet_dir, "report-set-candidate.json")
owner = _load(packet_dir, "owner-decision-template.json")
p7 = _load(packet_dir, "p7-id-proposal.json")
n6status = _load(packet_dir, "n7-envelope-n6-status.json")
# ---- R1: N6 is the ratified candidate digest, never anything else --------
rule("R1 N6 digest == ratified candidate",
n6status.get("n6_value") == RATIFIED_N6,
n6status.get("n6_value", "<none>"))
if n6status.get("n6_value") != RATIFIED_N6:
raise AuthInputReject("AUTHINPUT_N6_DIGEST_MISMATCH", str(n6status.get("n6_value")))
# ---- R2: N6 current status is the allowed ratified-candidate status ------
st = n6status.get("n6_current_status")
if st in FORBIDDEN_N6_STATUS:
raise AuthInputReject("AUTHINPUT_N6_FALSELY_OFFICIAL", str(st))
rule("R2 N6 status is ratified engineering candidate", st in ALLOWED_N6_STATUS, str(st))
if st not in ALLOWED_N6_STATUS:
raise AuthInputReject("AUTHINPUT_N6_STATUS_INVALID", str(st))
# ---- R3: N6 NOT marked official seal / pin -------------------------------
if n6status.get("n6_is_official_seal") or n6status.get("n6_is_official_pin"):
raise AuthInputReject("AUTHINPUT_N6_FALSELY_OFFICIAL", "is_official flag set")
rule("R3 N6 not official seal/pin", True)
# ---- R4: N6 does not by itself authorize N7/N8/P7 ------------------------
if n6status.get("n6_authorizes_n7_n8_p7_alone") is not False:
raise AuthInputReject("AUTHINPUT_N6_OVERREACH", "n6 claims to authorize seals alone")
rule("R4 N6 alone does NOT authorize N7/N8/P7", True)
# ---- R5: N6 certificate present + bound ----------------------------------
rule("R5 N6 certificate binding present",
n6status.get("n6_certificate_binding_sha256") == RATIFIED_N6_CERT_BINDING)
if n6status.get("n6_certificate_binding_sha256") != RATIFIED_N6_CERT_BINDING:
raise AuthInputReject("AUTHINPUT_N6_CERT_MISSING",
str(n6status.get("n6_certificate_binding_sha256")))
# ---- R6: N-number table ratified for binding use -------------------------
nn = n6status.get("n_number_table_status")
rule("R6 N-number table ratified for binding", nn in ALLOWED_NNUMBER_STATUS, str(nn))
if nn not in ALLOWED_NNUMBER_STATUS:
raise AuthInputReject("AUTHINPUT_NNUMBER_NOT_RATIFIED", str(nn))
# ---- R7: membership label is un-numbered (no stale N1 membership) --------
ml = str(n6status.get("membership_label", ""))
if ml.strip().upper() in {"N1", "(N1)", "MEMBERSHIP_SHA256 (N1)"} or ml.strip() == "N1":
raise AuthInputReject("AUTHINPUT_STALE_N1_MEMBERSHIP_LABEL", ml)
rule("R7 membership label un-numbered (not N1)", "membership" in ml.lower(), ml)
# ---- R8: owner approval not fabricated; default safe HOLD ----------------
dec = owner.get("current_decision")
rule("R8 owner default decision is NOT_APPROVED_HOLD",
dec == "NOT_APPROVED_HOLD" or (owner.get("owner_signed") is True and
owner.get("owner_identity")),
str(dec))
# An APPROVED decision is only valid if owner_signed AND owner_identity present.
if dec not in (None, "NOT_APPROVED_HOLD"):
if not (owner.get("owner_signed") is True and owner.get("owner_identity")):
raise AuthInputReject("AUTHINPUT_OWNER_APPROVAL_FABRICATED", str(dec))
if owner.get("owner_signed") and not owner.get("owner_identity"):
raise AuthInputReject("AUTHINPUT_OWNER_APPROVAL_FABRICATED", "signed w/o identity")
# ---- R9: A1/A2/A3/A5 declared MISSING, not fabricated; not ready --------
# The roster lives in the authority-input-roster.json (machine roster).
rosterj = _load(packet_dir, "authority-input-roster.json")
inputs = {r["input"]: r for r in rosterj.get("inputs", [])}
for f in AUTHORITY_FIELDS:
rec = inputs.get(f)
if rec is None:
raise AuthInputReject("AUTHINPUT_ROSTER_INCOMPLETE", f)
present = rec.get("present")
val = rec.get("value")
if present is False and val not in MISSING_TOKENS:
raise AuthInputReject("AUTHINPUT_FABRICATED_AUTHORITY_VALUE", f"{f}={val}")
rule(f"R9 {f} declared MISSING (not fabricated)",
present is False and val in MISSING_TOKENS)
# ---- R10: ready_to_author must be False while inputs missing -------------
ready = rosterj.get("ready_to_author")
any_missing = any(inputs[f].get("present") is False for f in AUTHORITY_FIELDS)
if ready is True and any_missing:
raise AuthInputReject("AUTHINPUT_READY_WITH_MISSING_INPUTS",
"ready_to_author True with A1/A2/A3/A5 missing")
rule("R10 not ready_to_author while inputs missing", ready is False, str(ready))
# ---- R11: N8 signer not fabricated --------------------------------------
if roster.get("is_codex_authored_n8") is not False:
raise AuthInputReject("AUTHINPUT_N8_SIGNER_FABRICATED", "is_codex_authored_n8 set")
for f in ("sealed_by", "sealed_at"):
if roster.get(f) not in (None,):
raise AuthInputReject("AUTHINPUT_N8_SIGNER_FABRICATED", f"{f} present")
rule("R11 N8 signer not fabricated (CODEX_ONLY)", True)
# ---- R12: P7 not marked official ----------------------------------------
if p7.get("is_official_p7") or p7.get("is_official_pin") or p7.get("p7_authored"):
raise AuthInputReject("AUTHINPUT_P7_OFFICIAL_FABRICATED", "p7 official flag set")
rule("R12 P7 ids proposed only (not official)", True)
# ---- R13: no seal authored inside this input packet ---------------------
for blob, label in ((roster, "report-set"), (owner, "owner"), (p7, "p7"),
(n6status, "n6status")):
for sf in SEAL_OUTPUT_FIELDS:
v = blob.get(sf)
if _is_hex64(v):
raise AuthInputReject("AUTHINPUT_SEAL_AUTHORED_IN_INPUT_PACKET",
f"{label}.{sf}")
rule("R13 no N7/N8/P7 seal authored in input packet", True)
return rules
----------------------------------------------------------------------------
Selftest: build a known-good packet in a temp dir + a battery of bad mutations
and assert good->exit0, each bad->specific reject.
----------------------------------------------------------------------------
def _good_fixture(d): os.makedirs(d, exist_ok=True) json.dump({ "n6_value": RATIFIED_N6, "n6_current_status": "RATIFIED_ENGINEERING_VERIFIED_CANDIDATE", "n6_is_official_seal": False, "n6_is_official_pin": False, "n6_authorizes_n7_n8_p7_alone": False, "n6_certificate_binding_sha256": RATIFIED_N6_CERT_BINDING, "membership_label": "membership (un-numbered)", "n_number_table_status": "RATIFIED_FOR_BINDING_USE", }, open(os.path.join(d, "n7-envelope-n6-status.json"), "w")) json.dump({"is_codex_authored_n8": False, "is_official": False, "report_documents": [{"document_id": "x", "revision": 1}]}, open(os.path.join(d, "report-set-candidate.json"), "w")) json.dump({"current_decision": "NOT_APPROVED_HOLD", "owner_signed": False, "owner_identity": None, "default_safe_decision": "NOT_APPROVED_HOLD"}, open(os.path.join(d, "owner-decision-template.json"), "w")) json.dump({"is_official_p7": False, "is_official_pin": False, "p7_authored": False}, open(os.path.join(d, "p7-id-proposal.json"), "w")) json.dump({"ready_to_author": False, "inputs": [ {"input": f, "present": False, "value": "MISSING_AUTHORITY_INPUT"} for f in AUTHORITY_FIELDS]}, open(os.path.join(d, "authority-input-roster.json"), "w"))
def _selftest(): import tempfile, shutil ok = True base = tempfile.mkdtemp(prefix="authinput-selftest-") try: good = os.path.join(base, "good") _good_fixture(good) try: validate_packet(good) print(" [PASS] good fixture validates (exit 0 path)") except AuthInputReject as e: ok = False print(f" [FAIL] good fixture rejected: {e.status}")
# (mutator, expected reject code)
cases = [
("N6 marked official seal", "AUTHINPUT_N6_FALSELY_OFFICIAL",
lambda j: j["n6"].__setitem__("n6_current_status", "OFFICIAL_SEAL")),
("N6 official_seal flag", "AUTHINPUT_N6_FALSELY_OFFICIAL",
lambda j: j["n6"].__setitem__("n6_is_official_seal", True)),
("N6 digest changed", "AUTHINPUT_N6_DIGEST_MISMATCH",
lambda j: j["n6"].__setitem__("n6_value", "0" * 64)),
("N6 cert removed", "AUTHINPUT_N6_CERT_MISSING",
lambda j: j["n6"].__setitem__("n6_certificate_binding_sha256", None)),
("stale SEAL_REAL_N6_NOT_AVAILABLE status", "AUTHINPUT_N6_FALSELY_OFFICIAL",
lambda j: j["n6"].__setitem__("n6_current_status", "SEAL_REAL_N6_NOT_AVAILABLE")),
("N-number unratified", "AUTHINPUT_NNUMBER_NOT_RATIFIED",
lambda j: j["n6"].__setitem__("n_number_table_status", "PROPOSED")),
("stale N1 membership label", "AUTHINPUT_STALE_N1_MEMBERSHIP_LABEL",
lambda j: j["n6"].__setitem__("membership_label", "N1")),
("owner approval fabricated", "AUTHINPUT_OWNER_APPROVAL_FABRICATED",
lambda j: j["owner"].__setitem__("current_decision", "APPROVED")),
("A1 fabricated value", "AUTHINPUT_FABRICATED_AUTHORITY_VALUE",
lambda j: j["roster"]["inputs"].__setitem__(0, {"input": "A1_approval_event_id", "present": False, "value": "APPROVAL-EVENT-2026-FAKE"})),
("ready_to_author with missing", "AUTHINPUT_READY_WITH_MISSING_INPUTS",
lambda j: j["roster"].__setitem__("ready_to_author", True)),
("N8 signer fabricated", "AUTHINPUT_N8_SIGNER_FABRICATED",
lambda j: j["rs"].__setitem__("sealed_by", "codex-signer-7")),
("N8 codex-authored flag", "AUTHINPUT_N8_SIGNER_FABRICATED",
lambda j: j["rs"].__setitem__("is_codex_authored_n8", True)),
("P7 official", "AUTHINPUT_P7_OFFICIAL_FABRICATED",
lambda j: j["p7"].__setitem__("is_official_p7", True)),
("seal authored in packet", "AUTHINPUT_SEAL_AUTHORED_IN_INPUT_PACKET",
lambda j: j["p7"].__setitem__("authority_seal_pin_sha256", "a" * 64)),
]
for label, expect, mut in cases:
d = os.path.join(base, "bad-" + label.replace(" ", "_")[:24])
_good_fixture(d)
j = {
"n6": json.load(open(os.path.join(d, "n7-envelope-n6-status.json"))),
"owner": json.load(open(os.path.join(d, "owner-decision-template.json"))),
"p7": json.load(open(os.path.join(d, "p7-id-proposal.json"))),
"rs": json.load(open(os.path.join(d, "report-set-candidate.json"))),
"roster": json.load(open(os.path.join(d, "authority-input-roster.json"))),
}
mut(j)
json.dump(j["n6"], open(os.path.join(d, "n7-envelope-n6-status.json"), "w"))
json.dump(j["owner"], open(os.path.join(d, "owner-decision-template.json"), "w"))
json.dump(j["p7"], open(os.path.join(d, "p7-id-proposal.json"), "w"))
json.dump(j["rs"], open(os.path.join(d, "report-set-candidate.json"), "w"))
json.dump(j["roster"], open(os.path.join(d, "authority-input-roster.json"), "w"))
try:
validate_packet(d)
ok = False
print(f" [FAIL] '{label}' was ACCEPTED (expected {expect})")
except AuthInputReject as e:
if e.status == expect:
print(f" [PASS] '{label}' -> {e.status}")
else:
ok = False
print(f" [FAIL] '{label}' -> {e.status} (expected {expect})")
return ok
finally:
shutil.rmtree(base, ignore_errors=True)
def main(argv): if "--selftest" in argv: ok = _selftest() print(f"AUTHORITY-INPUT-VALIDATOR SELFTEST: {'PASS' if ok else 'FAIL'}") return 0 if ok else 1 if "--validate" in argv: d = argv[argv.index("--validate") + 1] try: rules = validate_packet(d) except AuthInputReject as e: print(f" [REJECT] {e.status}: {e.detail}") print("AUTHORITY-INPUT-VALIDATOR: FAIL (fail-closed)") return 2 npass = sum(1 for _, o, _ in rules if o) for name, o, note in rules: print(f" [{'PASS' if o else 'FAIL'}] {name}" + (f" ({note})" if note else "")) allok = all(o for _, o, _ in rules) print(f"AUTHORITY-INPUT-VALIDATOR: {npass}/{len(rules)} rules hold -> " f"{'PASS' if allok else 'FAIL'}") return 0 if allok else 1 print(doc) return 64
if name == "main": sys.exit(main(sys.argv[1:]))