FIX7 P0 planning packet — bad_input_probes.py
#!/usr/bin/env python3 """Adversarial bad-input probes for the FIX7 P0 planning packet validator.
Each probe takes the real (good) design data, injects exactly one defect, and asserts the matching validator gate FAILS closed (returns a non-empty fail list). A positive control asserts the unmodified data PASSES every gate.
Exit 0 => every probe fails closed (any_fail_open == False) AND controls pass. Exit 1 => any probe failed open (a defect slipped through) or a control broke. """ import copy import sys
import planning_packet_validator as V
def load(): return V.load_all(V.HERE)
def expect_fail(name, fn, data, code_prefixes=None): """Return True if gate fn(data) failed closed (non-empty).""" fails = fn(data) ok = bool(fails) if ok and code_prefixes: ok = any(any(f.startswith(p) for p in code_prefixes) for f in fails) print("[%s] %s%s" % ("FAIL-CLOSED" if ok else "FAIL-OPEN", name, "" if not fails else " (" + "; ".join(fails) + ")")) return ok
def main(): base = load() results = []
# ---- positive controls: good data must pass every gate ----
controls_ok = True
for label, key, fn in V.CHECKS:
fails = fn(base[key])
if fails:
controls_ok = False
print("[CONTROL-BROKEN] %s -> %s" % (label, "; ".join(fails)))
print("[CONTROL] good packet passes all gates: %s" % ("YES" if controls_ok else "NO"))
# ---- P1: missing P7 ----
d = copy.deepcopy(base["seal"]); d["p7_present"] = False; d.pop("p7_digest", None)
results.append(expect_fail("P1 missing P7", V.check_seal, d, ["MISSING_P7", "P7_DIGEST_MISMATCH"]))
# ---- P2: P7 digest mismatch ----
d = copy.deepcopy(base["seal"]); d["p7_digest"] = "0" * 64
results.append(expect_fail("P2 P7 digest mismatch", V.check_seal, d, ["P7_DIGEST_MISMATCH"]))
# ---- P3: implementation execution claimed now ----
d = copy.deepcopy(base["seal"]); d["implementation_execution_allowed_now"] = True
results.append(expect_fail("P3 exec claimed now", V.check_seal, d, ["IMPLEMENTATION_EXECUTION_CLAIMED_NOW"]))
# ---- P4: production mutation allowed now ----
d = copy.deepcopy(base["seal"]); d["production_mutation_allowed_now"] = True
results.append(expect_fail("P4 prod mutation allowed", V.check_seal, d, ["PRODUCTION_MUTATION_ALLOWED_NOW"]))
# ---- P5: rollback proof fabricated ----
d = copy.deepcopy(base["rb"]); d["rollback_proof_status"] = "PROVEN"
results.append(expect_fail("P5 rollback proof fabricated", V.check_rollback, d, ["ROLLBACK_PROOF_FABRICATED"]))
# ---- P6: missing precondition evidence (required item) ----
d = copy.deepcopy(base["pc"])
for it in d["items"]:
if it.get("required"):
it["evidence_path"] = ""
break
results.append(expect_fail("P6 missing precondition evidence", V.check_precondition, d, ["MISSING_PRECONDITION_EVIDENCE"]))
# ---- P7: target surface not inventoried (drop PG) ----
d = copy.deepcopy(base["inv"])
d["surfaces"] = [s for s in d["surfaces"] if "pg" not in s.get("surface", "").lower()
and "postgres" not in s.get("surface", "").lower()]
results.append(expect_fail("P7 surface not inventoried", V.check_mutation_inventory, d, ["SURFACE_NOT_INVENTORIED"]))
# ---- P8: dry-run command without rollback ----
d = copy.deepcopy(base["dr"])
d["steps"][2]["rollback_ref"] = ""
results.append(expect_fail("P8 dryrun cmd without rollback", V.check_dryrun, d, ["DRYRUN_CMD_WITHOUT_ROLLBACK"]))
# ---- P9: registry/index mutation not classified ----
d = copy.deepcopy(base["inv"])
for s in d["surfaces"]:
if "registry" in s.get("surface", "").lower() or "index" in s.get("surface", "").lower():
s["mutation_class"] = ""
break
results.append(expect_fail("P9 registry mutation not classified", V.check_mutation_inventory, d, ["REGISTRY_MUTATION_NOT_CLASSIFIED"]))
# ---- P10: owner approval fabricated ----
d = copy.deepcopy(base["ow"])
d["selected_option"] = "AUTHORIZE_IMPLEMENTATION_EXECUTION_NO_PRODUCTION"
d["owner_signature"] = "UNSIGNED"
results.append(expect_fail("P10 owner approval fabricated", V.check_owner, d, ["OWNER_APPROVAL_FABRICATED"]))
# ---- P11: default decision not HOLD ----
d = copy.deepcopy(base["ow"]); d["default_decision"] = "AUTHORIZE_DRYRUN_ONLY"
results.append(expect_fail("P11 default not HOLD", V.check_owner, d, ["DEFAULT_NOT_HOLD"]))
# ---- P12: mutation allowed_now True ----
d = copy.deepcopy(base["inv"]); d["surfaces"][0]["allowed_now"] = True
results.append(expect_fail("P12 mutation allowed_now", V.check_mutation_inventory, d, ["MUTATION_ALLOWED_NOW"]))
# ---- P13: execution_ready overclaim ----
d = copy.deepcopy(base["pc"]); d["execution_ready"] = True
results.append(expect_fail("P13 execution_ready overclaim", V.check_precondition, d,
["EXECUTION_READY_OVERCLAIM", "EXECUTION_READY_CLAIMED_IN_PLANNING"]))
# ---- P14: owner decision not OPT3 (wrong/forged token) ----
d = copy.deepcopy(base["seal"]); d["owner_decision"] = "OPT4_AUTHORIZE_EXECUTION"
results.append(expect_fail("P14 owner decision not OPT3", V.check_seal, d, ["OWNER_DECISION_NOT_OPT3"]))
# ---- P15: dry-run targets production ----
d = copy.deepcopy(base["dr"]); d["production_target"] = True
results.append(expect_fail("P15 dryrun targets production", V.check_dryrun, d, ["DRYRUN_TARGETS_PRODUCTION"]))
total = len(results)
passed = sum(1 for x in results if x)
any_fail_open = not all(results) or not controls_ok
print("BAD_INPUT_PROBES: %d/%d fail-closed; controls_pass=%s; any_fail_open=%s"
% (passed, total, controls_ok, any_fail_open))
sys.exit(1 if any_fail_open else 0)
if name == "main": main()