bad_input_probes.py
#!/usr/bin/env python3 """Adversarial bad-input probes for the FIX7 P0 dry-run packet validator.
Each probe takes the real (good) packet data, injects exactly one defect, and asserts the matching validator gate FAILS closed (returns a non-empty fail list whose codes match the expected prefix). Positive controls assert the unmodified data passes every gate.
Exit 0 => every probe fails closed (any_fail_open == False) AND controls pass. Exit 1 => any probe failed open (a defect slipped through) or a control broke.
Covers the 20 required defect classes (>= 18): 1 missing P7 2 wrong P7 digest 3 missing N8 4 stale planning tree 5 production target 6 production PG URL 7 REAL_RUN requested 8 QT001/apply requested 9 permit requested 10 activation/cutover 11 rollback w/o run 12 exit-code mismatch 13 missing log 14 missing hash 15 missing no-vector pointer 16 orphan object 17 semantic Text-as-Code 18 impl-exec authorized 19 duplicate object id 20 report PASS no evidence """ import copy import sys
import dryrun_validator as V
def load(): return V.load_all(V.HERE)
def expect_fail(name, fn, data, code_prefixes): fails = fn(data) ok = bool(fails) and any(any(f.startswith(p) for p in code_prefixes) for f in fails) print("[%s] %s%s" % ("FAIL-CLOSED" if ok else "FAIL-OPEN", name, "" if not fails else " (" + "; ".join(fails) + ")")) return ok
def main(): base = load() results = []
# ---- positive controls ----
controls_ok = True
for label, key, fn in V.CHECKS:
fails = fn(base[key])
if fails:
controls_ok = False
print("[CONTROL-BROKEN] %s -> %s" % (label, "; ".join(fails)))
print("[CONTROL] good packet passes all gates: %s" % ("YES" if controls_ok else "NO"))
def seal():
return copy.deepcopy(base["seal"])
# 1 missing P7
d = seal(); d["p7_present"] = False; d.pop("p7_digest", None)
results.append(expect_fail("P1 missing P7", V.check_seal, d, ["MISSING_P7", "P7_DIGEST_MISMATCH"]))
# 2 wrong P7 digest
d = seal(); d["p7_digest"] = "0" * 64
results.append(expect_fail("P2 wrong P7 digest", V.check_seal, d, ["P7_DIGEST_MISMATCH"]))
# 3 missing N8
d = seal(); d["n8_present"] = False; d.pop("n8_digest", None)
results.append(expect_fail("P3 missing N8", V.check_seal, d, ["MISSING_N8", "N8_DIGEST_MISMATCH"]))
# 4 stale planning packet tree
d = copy.deepcopy(base["pl"]); d["planning_packet_tree_sha256"] = "dead" * 16
results.append(expect_fail("P4 stale planning tree", V.check_planning_consumption, d, ["STALE_PLANNING_PACKET_TREE"]))
# 5 production target path (dry-run targets production)
d = copy.deepcopy(base["dr"]); d["production_target"] = True
results.append(expect_fail("P5 production target", V.check_dryrun_evidence, d, ["DRYRUN_TARGETS_PRODUCTION"]))
# 6 production PG URL / connection
d = copy.deepcopy(base["iso"]); d["connects_to_production_pg"] = True
results.append(expect_fail("P6 production PG connection", V.check_isolation, d, ["ISOLATION_PROD_CONNECTION"]))
# 7 REAL_RUN requested
d = copy.deepcopy(base["rv"]); d["forbidden_actions_status"] = dict(d["forbidden_actions_status"]); d["forbidden_actions_status"]["REAL_RUN"] = "REQUESTED"
results.append(expect_fail("P7 REAL_RUN requested", V.check_review, d, ["FORBIDDEN_ACTION_ENABLED:REAL_RUN"]))
# 8 QT001/apply requested
d = copy.deepcopy(base["rv"]); d["forbidden_actions_status"] = dict(d["forbidden_actions_status"]); d["forbidden_actions_status"]["QT001"] = "REQUESTED"
results.append(expect_fail("P8 QT001 requested", V.check_review, d, ["FORBIDDEN_ACTION_ENABLED:QT001"]))
# 9 permit requested
d = copy.deepcopy(base["rv"]); d["forbidden_actions_status"] = dict(d["forbidden_actions_status"]); d["forbidden_actions_status"]["permit"] = "REQUESTED"
results.append(expect_fail("P9 permit requested", V.check_review, d, ["FORBIDDEN_ACTION_ENABLED:permit"]))
# 10 activation/repoint/cutover requested
d = copy.deepcopy(base["rv"]); d["forbidden_actions_status"] = dict(d["forbidden_actions_status"]); d["forbidden_actions_status"]["cutover"] = "REQUESTED"
results.append(expect_fail("P10 cutover requested", V.check_review, d, ["FORBIDDEN_ACTION_ENABLED:cutover"]))
# 11 rollback proof claimed without rollback (no mutation)
d = copy.deepcopy(base["rb"]); d["rollback_proof_status"] = "PROVEN_IN_STAGING"; d["staging_mutation_occurred"] = False; d["entries"] = []
results.append(expect_fail("P11 rollback proven w/o run", V.check_rollback_proof, d, ["ROLLBACK_PROVEN_BUT_NO_MUTATION", "ROLLBACK_PROVEN_NO_ENTRIES"]))
# 12 exit code mismatch (verdict still PASS)
d = copy.deepcopy(base["dr"]); d["steps"][0]["actual_exit"] = 7; d["steps"][0]["verdict"] = "PASS"
results.append(expect_fail("P12 exit code mismatch", V.check_dryrun_evidence, d, ["DRYRUN_EXIT_CODE_MISMATCH"]))
# 13 missing log
d = copy.deepcopy(base["dr"]); d["steps"][0]["log"] = ""
results.append(expect_fail("P13 missing log", V.check_dryrun_evidence, d, ["DRYRUN_STEP_FIELD_MISSING"]))
# 14 missing hash (mutating step)
d = copy.deepcopy(base["dr"])
for st in d["steps"]:
if not st.get("readonly", False):
st["artifact_hash"] = ""
break
results.append(expect_fail("P14 missing hash", V.check_dryrun_evidence, d, ["DRYRUN_MUTATING_STEP_NO_HASH"]))
# 15 missing no-vector pointer
d = copy.deepcopy(base["dr"]); d["no_vector_raw_evidence_pointer"] = ""
results.append(expect_fail("P15 missing no-vector pointer", V.check_dryrun_evidence, d, ["MISSING_NO_VECTOR_POINTER"]))
# 16 orphan object
d = copy.deepcopy(base["gv"]); d["orphan_objects"] = ["TKT-OBJ-999"]
results.append(expect_fail("P16 orphan object", V.check_governance, d, ["ORPHAN_OBJECT"]))
# 17 semantic Text-as-Code PASS overclaim
d = copy.deepcopy(base["tk"]); d["l5_semantic_text_as_code"] = "PASS"
results.append(expect_fail("P17 semantic Text-as-Code overclaim", V.check_tkt_base, d, ["TKT_BASE_OVERCLAIM"]))
# 18 implementation execution claimed as authorized
d = copy.deepcopy(base["rv"]); d["implementation_execution_authorized"] = True
results.append(expect_fail("P18 impl-exec authorized", V.check_review, d, ["REVIEW_CLAIMS_IMPL_EXEC_AUTHORIZED"]))
# 19 duplicate object id
d = copy.deepcopy(base["gv"]); d["object_ids"] = list(d["object_ids"]) + [d["object_ids"][0]]
results.append(expect_fail("P19 duplicate object id", V.check_governance, d, ["DUPLICATE_OBJECT_ID"]))
# 20 report PASS but file evidence missing
d = copy.deepcopy(base["dr"]); d["steps"][0]["verdict"] = "PASS"; d["steps"][0]["evidence_present"] = False
results.append(expect_fail("P20 report PASS no evidence", V.check_dryrun_evidence, d, ["REPORT_PASS_BUT_NO_EVIDENCE"]))
total = len(results)
passed = sum(1 for x in results if x)
any_fail_open = (not all(results)) or (not controls_ok)
print("BAD_INPUT_PROBES: %d/%d fail-closed; controls_pass=%s; any_fail_open=%s"
% (passed, total, controls_ok, any_fail_open))
sys.exit(1 if any_fail_open else 0)
if name == "main": main()