KB-5726

dryrun_validator.py

15 min read Revision 1

#!/usr/bin/env python3 """FIX7 P0 dry-run + execution-readiness packet validator (fail-closed).

Reads the machine-form JSON in this directory and enforces the dry-run gates. Exit 0 => every gate passes AND the packet still declares production mutation, implementation execution, and REAL_RUN/QT001/cutover all blocked, with a real (non-fabricated) staging dry-run + rollback proof behind it. Exit 1 => a fail-closed condition fired (gate violated / overclaim / fabrication). Exit 2 => a required file is missing or unreadable.

Authorizes nothing. It only proves the dry-run packet is internally consistent, that the staging dry-run actually ran, that rollback was proven in staging, and that nothing claims execution-readiness, production readiness, or semantic Text-as-Code. """ import json import os import sys

HERE = os.path.dirname(os.path.abspath(file))

EXPECTED_N7 = "efb0c5747ae5f56c0e8b5d99c35438a3d6231253570e6ab4d2658ab9e1853d32" EXPECTED_N8 = "daa70c39a91a64696fb1a24ca2d3f620d04049583c9047689902ebf26117e1a1" EXPECTED_P7 = "9ddb27c35a06ca11ee616e3b0399c705d0d97f5b8f284c12045c396f7c034550" EXPECTED_SEAL_TREE = "3890cd3467720b3f2c105625e2e1dc627fd2e7fdfb74d9059bad95bece12a234" EXPECTED_PLANNING_TREE = "f470d0d019f9af63794ce943a64ea08ff31a17906a7857f4124d0b1e14a0fe8f" EXPECTED_CANON = "49c386a9b9666c09786fc4f89bc79776b6046eaee6f4da6d8537d2c753b734d0" EXPECTED_OWNER_DECISION = ( "OPT3_AUTHORIZE_CODEX_AUTHORITY_SEAL_AND_POST_SEAL_IMPLEMENTATION_PLANNING_ONLY" ) FORBIDDEN_OVERCLAIMS = [ "IU_TRACEABILITY_PASS", "SEMANTIC_TEXT_AS_CODE_PASS", "RELEASE_BUNDLE_PASS", ]

tokens that must never appear as "requested"/"authorized"/"executed" in this lane

FORBIDDEN_ACTIONS = [ "REAL_RUN", "QT001", "permit", "activation", "repoint", "cutover", ] VALID_REVIEW_VERDICTS = [ "READY_FOR_OWNER_EXECUTION_REVIEW", "TRUE_ENVIRONMENT_BLOCKER", "ENGINEERING_REMAINS", ]

def _fail(r, code): r.append(code)

def check_seal(seal): r = [] if not seal.get("p7_present", False): _fail(r, "MISSING_P7") if seal.get("p7_digest") != EXPECTED_P7: _fail(r, "P7_DIGEST_MISMATCH") if not seal.get("n8_present", False): _fail(r, "MISSING_N8") if seal.get("n8_digest") != EXPECTED_N8: _fail(r, "N8_DIGEST_MISMATCH") if seal.get("n7_digest") != EXPECTED_N7: _fail(r, "N7_DIGEST_MISMATCH") if seal.get("seal_packet_tree_sha256") != EXPECTED_SEAL_TREE: _fail(r, "SEAL_TREE_MISMATCH") if seal.get("implementation_execution_allowed_now") is not False: _fail(r, "IMPLEMENTATION_EXECUTION_CLAIMED_NOW") if seal.get("production_mutation_allowed_now") is not False: _fail(r, "PRODUCTION_MUTATION_ALLOWED_NOW") if seal.get("owner_decision_consumed") != "AUTHORIZE_DRYRUN_ONLY": _fail(r, "OWNER_DECISION_NOT_DRYRUN_ONLY") if seal.get("original_seal_owner_decision") != EXPECTED_OWNER_DECISION: _fail(r, "ORIGINAL_OWNER_DECISION_NOT_OPT3") if seal.get("implementation_authorized_by_p7_alone") is not False: _fail(r, "P7_ALONE_AUTHORIZES_IMPL") return r

def check_planning_consumption(pl): r = [] if pl.get("planning_packet_tree_sha256") != EXPECTED_PLANNING_TREE: _fail(r, "STALE_PLANNING_PACKET_TREE") if pl.get("planning_validator_exit") != 0: _fail(r, "PLANNING_VALIDATOR_NOT_PASS") if pl.get("planning_probes_any_fail_open") is not False: _fail(r, "PLANNING_PROBES_FAIL_OPEN") if pl.get("planning_rerun_result") != "PASS": _fail(r, "PLANNING_RERUN_NOT_PASS") if pl.get("reconstruct_byte_exact") is not True: _fail(r, "PLANNING_RECONSTRUCT_NOT_BYTE_EXACT") return r

def check_precondition_recheck(pc): r = [] items = pc.get("items") valid = {"PASS", "FAIL", "UNKNOWN", "DRYRUN_ALLOWED_WITH_LIMIT", "BLOCKS_DRYRUN", "BLOCKS_EXECUTION_ONLY"} if not isinstance(items, list) or not items: _fail(r, "PRECONDITION_RECHECK_EMPTY") return r for it in items: if it.get("recheck_classification") not in valid: _fail(r, "PRECONDITION_BAD_CLASSIFICATION:" + str(it.get("id"))) if not str(it.get("evidence", "")).strip(): _fail(r, "PRECONDITION_NO_EVIDENCE:" + str(it.get("id"))) # anything that blocks the dry-run means the dry-run must not be declared safe if it.get("recheck_classification") == "BLOCKS_DRYRUN" and pc.get("dryrun_can_proceed") is True: _fail(r, "DRYRUN_PROCEEDS_DESPITE_BLOCKER:" + str(it.get("id"))) if pc.get("execution_ready") is True: _fail(r, "EXECUTION_READY_CLAIMED") return r

def check_isolation(iso): r = [] if iso.get("is_production") is not False: _fail(r, "WORKSPACE_IS_PRODUCTION") for k in ("connects_to_production_pg", "connects_to_production_directus", "connects_to_system_issues", "connects_to_registry_row"): if iso.get(k) is not False: _fail(r, "ISOLATION_PROD_CONNECTION:" + k) if iso.get("disposable") is not True: _fail(r, "WORKSPACE_NOT_DISPOSABLE") if not str(iso.get("cleanup_method", "")).strip(): _fail(r, "ISOLATION_NO_CLEANUP") if iso.get("isolation_proven") is not True: _fail(r, "ISOLATION_NOT_PROVEN") wp = str(iso.get("workspace_path", "")) if not (wp.startswith("/tmp/") or wp.startswith("/private/var/folders/")): _fail(r, "WORKSPACE_NOT_TEMP_PATH") return r

def check_dryrun_evidence(dr): r = [] if dr.get("production_target") is not False: _fail(r, "DRYRUN_TARGETS_PRODUCTION") # no-vector raw-evidence pointer is required at packet level if not str(dr.get("no_vector_raw_evidence_pointer", "")).strip(): _fail(r, "MISSING_NO_VECTOR_POINTER") steps = dr.get("steps") if not isinstance(steps, list) or not steps: _fail(r, "DRYRUN_NO_STEPS") return r for st in steps: sid = str(st.get("id")) for field in ("command", "cwd", "expected_exit", "actual_exit", "log", "verdict"): if field not in st or str(st.get(field)) == "": _fail(r, "DRYRUN_STEP_FIELD_MISSING:" + sid + ":" + field) if st.get("production") is not False: _fail(r, "DRYRUN_STEP_PRODUCTION:" + sid) # exit code consistency: actual must equal expected, or verdict must not be PASS if st.get("expected_exit") != st.get("actual_exit") and st.get("verdict") == "PASS": _fail(r, "DRYRUN_EXIT_CODE_MISMATCH:" + sid) # readonly steps need no hash; mutating steps must carry an artifact hash if not st.get("readonly", False) and not str(st.get("artifact_hash", "")).strip(): _fail(r, "DRYRUN_MUTATING_STEP_NO_HASH:" + sid) # report-vs-file: a PASS verdict requires real captured evidence if st.get("verdict") == "PASS" and st.get("evidence_present") is not True: _fail(r, "REPORT_PASS_BUT_NO_EVIDENCE:" + sid) return r

def check_rollback_proof(rb): r = [] status = rb.get("rollback_proof_status") if status not in ("PROVEN_IN_STAGING", "NOT_APPLICABLE"): _fail(r, "ROLLBACK_PROOF_STATUS_INVALID") # never claim production rollback proof if rb.get("production_rollback_status") not in (None, "NOT_APPLICABLE"): _fail(r, "PRODUCTION_ROLLBACK_CLAIMED") entries = rb.get("entries", []) mutated = rb.get("staging_mutation_occurred") if status == "PROVEN_IN_STAGING": if mutated is not True: _fail(r, "ROLLBACK_PROVEN_BUT_NO_MUTATION") if not entries: _fail(r, "ROLLBACK_PROVEN_NO_ENTRIES") for e in entries: eid = str(e.get("id")) for field in ("before_hash", "after_apply_hash", "after_rollback_hash"): if not str(e.get(field, "")).strip(): _fail(r, "ROLLBACK_ENTRY_MISSING_HASH:" + eid + ":" + field) # rollback must restore original: before == after_rollback, and apply changed it if e.get("before_hash") != e.get("after_rollback_hash"): _fail(r, "ROLLBACK_NOT_RESTORED:" + eid) if e.get("restored_match") is not True: _fail(r, "ROLLBACK_RESTORED_MATCH_FALSE:" + eid) elif status == "NOT_APPLICABLE": # may only be NA if no staging mutation occurred if mutated is True: _fail(r, "ROLLBACK_NA_BUT_MUTATION_OCCURRED") if not str(rb.get("not_applicable_reason", "")).strip(): _fail(r, "ROLLBACK_NA_NO_REASON") return r

def check_tkt_base(tk): r = [] for lvl, key in (("L0", "l0_file"), ("L1", "l1_reconstruction"), ("L2", "l2_fail_closed"), ("L3", "l3_governance")): if tk.get(key) != "PASS": fail(r, "TKT_BASE" + lvl + "_NOT_PASS") if tk.get("level_reached") != "L3": _fail(r, "TKT_BASE_LEVEL_NOT_L3") # ceiling: deferred semantic levels must stay deferred, never claimed PASS for key, oc in (("l4_iu_traceability", "IU_TRACEABILITY_PASS"), ("l5_semantic_text_as_code", "SEMANTIC_TEXT_AS_CODE_PASS"), ("l6_release_bundle", "RELEASE_BUNDLE_PASS")): if str(tk.get(key)).upper() in ("PASS", oc): _fail(r, "TKT_BASE_OVERCLAIM:" + oc) if tk.get("forbidden_overclaims_present") is not False: _fail(r, "TKT_BASE_OVERCLAIM_FLAG") return r

def check_bad_input(bi): r = [] if bi.get("any_fail_open") is not False: _fail(r, "BAD_INPUT_ANY_FAIL_OPEN") if bi.get("controls_pass") is not True: _fail(r, "BAD_INPUT_CONTROLS_BROKEN") if int(bi.get("total", 0)) < 18: _fail(r, "BAD_INPUT_TOO_FEW_PROBES") if int(bi.get("fail_closed", 0)) != int(bi.get("total", 0)): _fail(r, "BAD_INPUT_NOT_ALL_FAIL_CLOSED") return r

def check_review(rv): r = [] if rv.get("verdict") not in VALID_REVIEW_VERDICTS: _fail(r, "REVIEW_VERDICT_INVALID") if rv.get("production_ready") is not False: _fail(r, "REVIEW_CLAIMS_PRODUCTION_READY") if rv.get("implementation_execution_authorized") is not False: _fail(r, "REVIEW_CLAIMS_IMPL_EXEC_AUTHORIZED") if rv.get("production_rollback_proven") is not False: _fail(r, "REVIEW_CLAIMS_PRODUCTION_ROLLBACK") # forbidden actions must not be marked requested/authorized/executed fa = rv.get("forbidden_actions_status", {}) for act in FORBIDDEN_ACTIONS: st = str(fa.get(act, "BLOCKED")) if st not in ("BLOCKED", "NOT_REQUESTED", "NOT_AUTHORIZED"): _fail(r, "FORBIDDEN_ACTION_ENABLED:" + act) return r

def check_owner(ow): r = [] if ow.get("default_decision") != "HOLD": _fail(r, "DEFAULT_NOT_HOLD") if ow.get("execution_authorization_status") != "NOT_AUTHORIZED": _fail(r, "EXECUTION_AUTHORIZED_IN_TEMPLATE") sig = str(ow.get("owner_signature", "")).strip().upper() selected = ow.get("selected_option") if selected not in (None, "", "HOLD") and sig in ("", "UNSIGNED"): _fail(r, "OWNER_APPROVAL_FABRICATED") return r

def check_governance(gv): r = [] ids = gv.get("object_ids", []) if len(ids) != len(set(ids)): _fail(r, "DUPLICATE_OBJECT_ID") used = set() for a, b in gv.get("existing_reserved_ranges", []): used.update(range(a, b + 1)) used.update(range(1, gv.get("canonical_registry_max_id", 0) + 1)) collisions = sorted(set(ids) & used) if collisions: _fail(r, "OBJECT_ID_COLLISION:" + ",".join(map(str, collisions))) if gv.get("orphan_objects") not in (None, [], False): _fail(r, "ORPHAN_OBJECT") if gv.get("registered_via_addendum") is not True: _fail(r, "OBJECTS_NOT_GOVERNED") if gv.get("canonical_fold_applied") is not False: _fail(r, "CANONICAL_FOLD_APPLIED") return r

FILES = { "seal": "seal-consumption.json", "pl": "planning-consumption.json", "pc": "precondition-recheck.json", "iso": "staging-isolation-proof.json", "dr": "dryrun-execution-evidence.json", "rb": "rollback-recovery-proof.json", "tk": "tkt-base-l0-l3-check.json", "bi": "bad-input-probes.json", "rv": "execution-readiness-review.json", "ow": "owner-next-decision-template.json", "gv": "governance-objects.json", }

CHECKS = [ ("seal-consumption", "seal", check_seal), ("planning-consumption", "pl", check_planning_consumption), ("precondition-recheck", "pc", check_precondition_recheck), ("staging-isolation", "iso", check_isolation), ("dryrun-execution-evidence", "dr", check_dryrun_evidence), ("rollback-recovery-proof", "rb", check_rollback_proof), ("tkt-base-l0-l3", "tk", check_tkt_base), ("bad-input-probes", "bi", check_bad_input), ("execution-readiness-review", "rv", check_review), ("owner-next-decision", "ow", check_owner), ("governance-objects", "gv", check_governance), ]

def load_all(base=HERE): data = {} for key, fn in FILES.items(): path = os.path.join(base, fn) if not os.path.exists(path): print("MISSING_FILE:" + fn) sys.exit(2) try: with open(path, "r", encoding="utf-8") as fh: data[key] = json.load(fh) except Exception as exc: # noqa: BLE001 print("UNREADABLE_FILE:" + fn + ":" + str(exc)) sys.exit(2) return data

def run(data): all_fail = [] for label, key, fn in CHECKS: fails = fn(data[key]) status = "PASS" if not fails else "FAIL" print("[%s] %s%s" % (status, label, "" if not fails else " -> " + "; ".join(fails))) all_fail.extend(fails) return all_fail

def main(): data = load_all() fails = run(data) if fails: print("DRYRUN_VALIDATOR_RESULT: FAIL (%d fail-closed conditions)" % len(fails)) sys.exit(1) print("DRYRUN_VALIDATOR_RESULT: PASS (staging dry-run + rollback proven; " "execution/production/REAL_RUN/QT001/cutover still blocked)") sys.exit(0)

if name == "main": main()

Back to Knowledge Hub knowledge/dev/reports/architecture/fix7-p0-dryrun-and-execution-readiness-packet-2026-06-11/dryrun_validator.py