KB-1DAA

FIX7 P0 Hardening Packet — hardened_dryrun_validator.py

19 min read Revision 1

#!/usr/bin/env python3 """FIX7 P0 dry-run + execution-readiness packet validator -- HARDENED (fail-closed).

This is a strict superset of the original dry-run packet validator (dryrun_validator.py, sha256 7fb2f11e8baec9703faefbb86c23b161366b75340d98860c5c87977ee7bcb297, inside fix7-p0-dryrun-and-execution-readiness-packet-2026-06-11, packet tree 02b200e5a3c7a21c2e620b293fbf28ccc81731a983430c3e5a202798c05e94e6).

It ADDS rollback-proof hardening per T2-REC-ROLLBACK-HARDENING-1 and removes no existing gate, so every defect class the original caught is still caught.

Hardening (T2-REC-ROLLBACK-HARDENING-1): When a staging mutation / rollback proof is claimed PROVEN_IN_STAGING, every rollback entry must show a real applied mutation: - before_hash present - after_apply_hash present - after_rollback_hash present - after_apply_hash != before_hash (apply actually changed the artifact) - after_rollback_hash == before_hash (or == expected_restored_hash if pinned) - production_rollback_status stays NOT_APPLICABLE (production mutation forbidden) This prevents a fabricated rollback proof in which nothing was ever mutated (apply hash equal to before hash) from passing as "rollback proven".

Authorizes nothing. Replaces dryrun_validator.py in a future separately authorized implementation-execution-no-production macro; here it is exercised only against frozen staging evidence.

Exit 0 => every gate passes (full-packet mode) / selftest proof holds. Exit 1 => a fail-closed condition fired. Exit 2 => a required file is missing or unreadable. """ import json import os import sys

HERE = os.path.dirname(os.path.abspath(file))

EXPECTED_N7 = "efb0c5747ae5f56c0e8b5d99c35438a3d6231253570e6ab4d2658ab9e1853d32" EXPECTED_N8 = "daa70c39a91a64696fb1a24ca2d3f620d04049583c9047689902ebf26117e1a1" EXPECTED_P7 = "9ddb27c35a06ca11ee616e3b0399c705d0d97f5b8f284c12045c396f7c034550" EXPECTED_SEAL_TREE = "3890cd3467720b3f2c105625e2e1dc627fd2e7fdfb74d9059bad95bece12a234" EXPECTED_PLANNING_TREE = "f470d0d019f9af63794ce943a64ea08ff31a17906a7857f4124d0b1e14a0fe8f" EXPECTED_CANON = "49c386a9b9666c09786fc4f89bc79776b6046eaee6f4da6d8537d2c753b734d0" EXPECTED_OWNER_DECISION = ( "OPT3_AUTHORIZE_CODEX_AUTHORITY_SEAL_AND_POST_SEAL_IMPLEMENTATION_PLANNING_ONLY" ) FORBIDDEN_OVERCLAIMS = [ "IU_TRACEABILITY_PASS", "SEMANTIC_TEXT_AS_CODE_PASS", "RELEASE_BUNDLE_PASS", ]

tokens that must never appear as "requested"/"authorized"/"executed" in this lane

FORBIDDEN_ACTIONS = [ "REAL_RUN", "QT001", "permit", "activation", "repoint", "cutover", ] VALID_REVIEW_VERDICTS = [ "READY_FOR_OWNER_EXECUTION_REVIEW", "TRUE_ENVIRONMENT_BLOCKER", "ENGINEERING_REMAINS", ]

sentinel meaning "artifact did not exist before/after" for additive surfaces;

treated as a present, valid hash value (file-absent state).

ABSENT = "ABSENT"

def _fail(r, code): r.append(code)

def check_seal(seal): r = [] if not seal.get("p7_present", False): _fail(r, "MISSING_P7") if seal.get("p7_digest") != EXPECTED_P7: _fail(r, "P7_DIGEST_MISMATCH") if not seal.get("n8_present", False): _fail(r, "MISSING_N8") if seal.get("n8_digest") != EXPECTED_N8: _fail(r, "N8_DIGEST_MISMATCH") if seal.get("n7_digest") != EXPECTED_N7: _fail(r, "N7_DIGEST_MISMATCH") if seal.get("seal_packet_tree_sha256") != EXPECTED_SEAL_TREE: _fail(r, "SEAL_TREE_MISMATCH") if seal.get("implementation_execution_allowed_now") is not False: _fail(r, "IMPLEMENTATION_EXECUTION_CLAIMED_NOW") if seal.get("production_mutation_allowed_now") is not False: _fail(r, "PRODUCTION_MUTATION_ALLOWED_NOW") if seal.get("owner_decision_consumed") != "AUTHORIZE_DRYRUN_ONLY": _fail(r, "OWNER_DECISION_NOT_DRYRUN_ONLY") if seal.get("original_seal_owner_decision") != EXPECTED_OWNER_DECISION: _fail(r, "ORIGINAL_OWNER_DECISION_NOT_OPT3") if seal.get("implementation_authorized_by_p7_alone") is not False: _fail(r, "P7_ALONE_AUTHORIZES_IMPL") return r

def check_planning_consumption(pl): r = [] if pl.get("planning_packet_tree_sha256") != EXPECTED_PLANNING_TREE: _fail(r, "STALE_PLANNING_PACKET_TREE") if pl.get("planning_validator_exit") != 0: _fail(r, "PLANNING_VALIDATOR_NOT_PASS") if pl.get("planning_probes_any_fail_open") is not False: _fail(r, "PLANNING_PROBES_FAIL_OPEN") if pl.get("planning_rerun_result") != "PASS": _fail(r, "PLANNING_RERUN_NOT_PASS") if pl.get("reconstruct_byte_exact") is not True: _fail(r, "PLANNING_RECONSTRUCT_NOT_BYTE_EXACT") return r

def check_precondition_recheck(pc): r = [] items = pc.get("items") valid = {"PASS", "FAIL", "UNKNOWN", "DRYRUN_ALLOWED_WITH_LIMIT", "BLOCKS_DRYRUN", "BLOCKS_EXECUTION_ONLY"} if not isinstance(items, list) or not items: _fail(r, "PRECONDITION_RECHECK_EMPTY") return r for it in items: if it.get("recheck_classification") not in valid: _fail(r, "PRECONDITION_BAD_CLASSIFICATION:" + str(it.get("id"))) if not str(it.get("evidence", "")).strip(): _fail(r, "PRECONDITION_NO_EVIDENCE:" + str(it.get("id"))) # anything that blocks the dry-run means the dry-run must not be declared safe if it.get("recheck_classification") == "BLOCKS_DRYRUN" and pc.get("dryrun_can_proceed") is True: _fail(r, "DRYRUN_PROCEEDS_DESPITE_BLOCKER:" + str(it.get("id"))) if pc.get("execution_ready") is True: _fail(r, "EXECUTION_READY_CLAIMED") return r

def check_isolation(iso): r = [] if iso.get("is_production") is not False: _fail(r, "WORKSPACE_IS_PRODUCTION") for k in ("connects_to_production_pg", "connects_to_production_directus", "connects_to_system_issues", "connects_to_registry_row"): if iso.get(k) is not False: _fail(r, "ISOLATION_PROD_CONNECTION:" + k) if iso.get("disposable") is not True: _fail(r, "WORKSPACE_NOT_DISPOSABLE") if not str(iso.get("cleanup_method", "")).strip(): _fail(r, "ISOLATION_NO_CLEANUP") if iso.get("isolation_proven") is not True: _fail(r, "ISOLATION_NOT_PROVEN") wp = str(iso.get("workspace_path", "")) if not (wp.startswith("/tmp/") or wp.startswith("/private/var/folders/")): _fail(r, "WORKSPACE_NOT_TEMP_PATH") return r

def check_dryrun_evidence(dr): r = [] if dr.get("production_target") is not False: _fail(r, "DRYRUN_TARGETS_PRODUCTION") # no-vector raw-evidence pointer is required at packet level if not str(dr.get("no_vector_raw_evidence_pointer", "")).strip(): _fail(r, "MISSING_NO_VECTOR_POINTER") steps = dr.get("steps") if not isinstance(steps, list) or not steps: _fail(r, "DRYRUN_NO_STEPS") return r for st in steps: sid = str(st.get("id")) for field in ("command", "cwd", "expected_exit", "actual_exit", "log", "verdict"): if field not in st or str(st.get(field)) == "": _fail(r, "DRYRUN_STEP_FIELD_MISSING:" + sid + ":" + field) if st.get("production") is not False: _fail(r, "DRYRUN_STEP_PRODUCTION:" + sid) # exit code consistency: actual must equal expected, or verdict must not be PASS if st.get("expected_exit") != st.get("actual_exit") and st.get("verdict") == "PASS": _fail(r, "DRYRUN_EXIT_CODE_MISMATCH:" + sid) # readonly steps need no hash; mutating steps must carry an artifact hash if not st.get("readonly", False) and not str(st.get("artifact_hash", "")).strip(): _fail(r, "DRYRUN_MUTATING_STEP_NO_HASH:" + sid) # report-vs-file: a PASS verdict requires real captured evidence if st.get("verdict") == "PASS" and st.get("evidence_present") is not True: _fail(r, "REPORT_PASS_BUT_NO_EVIDENCE:" + sid) return r

def check_rollback_proof(rb): r = [] status = rb.get("rollback_proof_status") if status not in ("PROVEN_IN_STAGING", "NOT_APPLICABLE"): _fail(r, "ROLLBACK_PROOF_STATUS_INVALID") # never claim production rollback proof (production mutation forbidden in this lane) if rb.get("production_rollback_status") not in (None, "NOT_APPLICABLE"): _fail(r, "PRODUCTION_ROLLBACK_CLAIMED") entries = rb.get("entries", []) mutated = rb.get("staging_mutation_occurred") if status == "PROVEN_IN_STAGING": if mutated is not True: _fail(r, "ROLLBACK_PROVEN_BUT_NO_MUTATION") if not entries: _fail(r, "ROLLBACK_PROVEN_NO_ENTRIES") for e in entries: eid = str(e.get("id")) for field in ("before_hash", "after_apply_hash", "after_rollback_hash"): if not str(e.get(field, "")).strip(): _fail(r, "ROLLBACK_ENTRY_MISSING_HASH:" + eid + ":" + field) before = e.get("before_hash") after_apply = e.get("after_apply_hash") after_rb = e.get("after_rollback_hash") # ---- HARDENING (T2-REC-ROLLBACK-HARDENING-1) ---- # a claimed staging mutation must actually change the artifact: # the applied hash must differ from the before hash. A fabricated # proof where nothing was mutated (apply == before) fails closed. if (str(before or "").strip() and str(after_apply or "").strip() and after_apply == before): _fail(r, "ROLLBACK_APPLY_DID_NOT_MUTATE:" + eid) # rollback must restore original: before == after_rollback, and apply changed it if before != after_rb: _fail(r, "ROLLBACK_NOT_RESTORED:" + eid) # if an explicit pinned restore target is given, the restored hash must equal it exp = str(e.get("expected_restored_hash", "")).strip() if exp and after_rb != exp: _fail(r, "ROLLBACK_NOT_RESTORED_TO_PIN:" + eid) if e.get("restored_match") is not True: _fail(r, "ROLLBACK_RESTORED_MATCH_FALSE:" + eid) elif status == "NOT_APPLICABLE": # may only be NA if no staging mutation occurred if mutated is True: _fail(r, "ROLLBACK_NA_BUT_MUTATION_OCCURRED") if not str(rb.get("not_applicable_reason", "")).strip(): _fail(r, "ROLLBACK_NA_NO_REASON") return r

def check_tkt_base(tk): r = [] for lvl, key in (("L0", "l0_file"), ("L1", "l1_reconstruction"), ("L2", "l2_fail_closed"), ("L3", "l3_governance")): if tk.get(key) != "PASS": fail(r, "TKT_BASE" + lvl + "_NOT_PASS") if tk.get("level_reached") != "L3": _fail(r, "TKT_BASE_LEVEL_NOT_L3") # ceiling: deferred semantic levels must stay deferred, never claimed PASS for key, oc in (("l4_iu_traceability", "IU_TRACEABILITY_PASS"), ("l5_semantic_text_as_code", "SEMANTIC_TEXT_AS_CODE_PASS"), ("l6_release_bundle", "RELEASE_BUNDLE_PASS")): if str(tk.get(key)).upper() in ("PASS", oc): _fail(r, "TKT_BASE_OVERCLAIM:" + oc) if tk.get("forbidden_overclaims_present") is not False: _fail(r, "TKT_BASE_OVERCLAIM_FLAG") return r

def check_bad_input(bi): r = [] if bi.get("any_fail_open") is not False: _fail(r, "BAD_INPUT_ANY_FAIL_OPEN") if bi.get("controls_pass") is not True: _fail(r, "BAD_INPUT_CONTROLS_BROKEN") if int(bi.get("total", 0)) < 18: _fail(r, "BAD_INPUT_TOO_FEW_PROBES") if int(bi.get("fail_closed", 0)) != int(bi.get("total", 0)): _fail(r, "BAD_INPUT_NOT_ALL_FAIL_CLOSED") return r

def check_review(rv): r = [] if rv.get("verdict") not in VALID_REVIEW_VERDICTS: _fail(r, "REVIEW_VERDICT_INVALID") if rv.get("production_ready") is not False: _fail(r, "REVIEW_CLAIMS_PRODUCTION_READY") if rv.get("implementation_execution_authorized") is not False: _fail(r, "REVIEW_CLAIMS_IMPL_EXEC_AUTHORIZED") if rv.get("production_rollback_proven") is not False: _fail(r, "REVIEW_CLAIMS_PRODUCTION_ROLLBACK") # forbidden actions must not be marked requested/authorized/executed fa = rv.get("forbidden_actions_status", {}) for act in FORBIDDEN_ACTIONS: st = str(fa.get(act, "BLOCKED")) if st not in ("BLOCKED", "NOT_REQUESTED", "NOT_AUTHORIZED"): _fail(r, "FORBIDDEN_ACTION_ENABLED:" + act) return r

def check_owner(ow): r = [] if ow.get("default_decision") != "HOLD": _fail(r, "DEFAULT_NOT_HOLD") if ow.get("execution_authorization_status") != "NOT_AUTHORIZED": _fail(r, "EXECUTION_AUTHORIZED_IN_TEMPLATE") sig = str(ow.get("owner_signature", "")).strip().upper() selected = ow.get("selected_option") if selected not in (None, "", "HOLD") and sig in ("", "UNSIGNED"): _fail(r, "OWNER_APPROVAL_FABRICATED") return r

def check_governance(gv): r = [] ids = gv.get("object_ids", []) if len(ids) != len(set(ids)): _fail(r, "DUPLICATE_OBJECT_ID") used = set() for a, b in gv.get("existing_reserved_ranges", []): used.update(range(a, b + 1)) used.update(range(1, gv.get("canonical_registry_max_id", 0) + 1)) collisions = sorted(set(ids) & used) if collisions: _fail(r, "OBJECT_ID_COLLISION:" + ",".join(map(str, collisions))) if gv.get("orphan_objects") not in (None, [], False): _fail(r, "ORPHAN_OBJECT") if gv.get("registered_via_addendum") is not True: _fail(r, "OBJECTS_NOT_GOVERNED") if gv.get("canonical_fold_applied") is not False: _fail(r, "CANONICAL_FOLD_APPLIED") return r

FILES = { "seal": "seal-consumption.json", "pl": "planning-consumption.json", "pc": "precondition-recheck.json", "iso": "staging-isolation-proof.json", "dr": "dryrun-execution-evidence.json", "rb": "rollback-recovery-proof.json", "tk": "tkt-base-l0-l3-check.json", "bi": "bad-input-probes.json", "rv": "execution-readiness-review.json", "ow": "owner-next-decision-template.json", "gv": "governance-objects.json", }

CHECKS = [ ("seal-consumption", "seal", check_seal), ("planning-consumption", "pl", check_planning_consumption), ("precondition-recheck", "pc", check_precondition_recheck), ("staging-isolation", "iso", check_isolation), ("dryrun-execution-evidence", "dr", check_dryrun_evidence), ("rollback-recovery-proof", "rb", check_rollback_proof), ("tkt-base-l0-l3", "tk", check_tkt_base), ("bad-input-probes", "bi", check_bad_input), ("execution-readiness-review", "rv", check_review), ("owner-next-decision", "ow", check_owner), ("governance-objects", "gv", check_governance), ]

def load_all(base=HERE): data = {} for key, fn in FILES.items(): path = os.path.join(base, fn) if not os.path.exists(path): print("MISSING_FILE:" + fn) sys.exit(2) try: with open(path, "r", encoding="utf-8") as fh: data[key] = json.load(fh) except Exception as exc: # noqa: BLE001 print("UNREADABLE_FILE:" + fn + ":" + str(exc)) sys.exit(2) return data

def run(data): all_fail = [] for label, key, fn in CHECKS: fails = fn(data[key]) status = "PASS" if not fails else "FAIL" print("[%s] %s%s" % (status, label, "" if not fails else " -> " + "; ".join(fails))) all_fail.extend(fails) return all_fail

def _fake_no_mutation_entry(real_entry): """A rollback entry that claims mutation but apply==before (nothing changed).""" e = dict(real_entry) # pick a concrete (non-ABSENT) before hash so apply==before is a real collision base = real_entry.get("before_hash") if not base or base == ABSENT: base = real_entry.get("after_apply_hash") or ("a" * 64) e["id"] = "FAKE-" + str(real_entry.get("id")) e["before_hash"] = base e["after_apply_hash"] = base # <-- fabricated: apply changed nothing e["after_rollback_hash"] = base e["restored_match"] = True return e

def selftest(base=HERE): """Standalone proof: real frozen T1 rollback evidence passes; a fabricated no-mutation rollback proof fails closed under the hardened gate.""" path = os.path.join(base, "valid_evidence_recheck.json") if not os.path.exists(path): print("MISSING_FILE:valid_evidence_recheck.json") sys.exit(2) with open(path, "r", encoding="utf-8") as fh: rec = json.load(fh) rb = rec["rollback_evidence"]

good_fails = check_rollback_proof(rb)
good_ok = not good_fails
print("[%s] real T1 rollback evidence passes hardened gate%s"
      % ("PASS" if good_ok else "FAIL",
         "" if good_ok else " -> " + "; ".join(good_fails)))

import copy
real_entry = next((x for x in rb.get("entries", [])
                   if str(x.get("before_hash")) not in ("", ABSENT)), rb["entries"][0])
fake = copy.deepcopy(rb)
fake["entries"] = [_fake_no_mutation_entry(real_entry)]
fake_fails = check_rollback_proof(fake)
fake_closed = any(f.startswith("ROLLBACK_APPLY_DID_NOT_MUTATE") for f in fake_fails)
print("[%s] fabricated no-mutation rollback fails closed%s"
      % ("PASS" if fake_closed else "FAIL",
         " (" + "; ".join(fake_fails) + ")" if fake_fails else " (FAIL-OPEN: passed!)"))

ok = good_ok and fake_closed
print("HARDENED_VALIDATOR_SELFTEST: %s" % ("PASS" if ok else "FAIL"))
return ok

def main(): if "--selftest" in sys.argv: sys.exit(0 if selftest() else 1) data = load_all() fails = run(data) if fails: print("HARDENED_DRYRUN_VALIDATOR_RESULT: FAIL (%d fail-closed conditions)" % len(fails)) sys.exit(1) print("HARDENED_DRYRUN_VALIDATOR_RESULT: PASS (staging dry-run + rollback proven " "with real applied mutation; execution/production/REAL_RUN/QT001/cutover still blocked)") sys.exit(0)

if name == "main": main()

Back to Knowledge Hub knowledge/dev/reports/architecture/fix7-p0-rollback-validator-hardening-packet-2026-06-11/hardened_dryrun_validator.py