FIX7 P0 Shaped-Clone CI-Gate — bad_input_probes.py
#!/usr/bin/env python3
-- coding: utf-8 --
""" FIX7 P0 - Production-Shaped Clone Rehearsal + CI Gate :: bad-input probe harness.
Local/static ONLY. No clone, no production, no CI trigger. Each probe feeds a deliberately invalid input to the relevant acceptance check and asserts the check FAILS CLOSED (rejects), emitting NO production/seal/cutover token.
A probe "fails closed" when the guard rejects the bad input. A probe that would have ACCEPTED a bad input is a fail-OPEN and makes the whole harness exit 1.
Ten invalid classes are required by the macro:
- clone not isolated
- clone points to production
- clone contains production secrets
- schema mismatch ignored
- after_apply_hash == before_hash
- rollback does not restore
- CI seal mismatch still passes
- Unicode/escaping byte drift still passes
- production PASS emitted from clone rehearsal
- REAL_RUN/QT001/cutover token emitted Plus one positive control (a well-formed isolated-clone acceptance) that is allowed. """ import json import hashlib import os import sys
HERE = os.path.dirname(os.path.abspath(file)) OUT = os.path.join(HERE, "bad-input-probes.json")
Tokens that must NEVER be emitted by any acceptance of a clone/rehearsal input.
FORBIDDEN_TOKENS = ( "PRODUCTION_PASS", "PRODUCTION_EXECUTION", "REAL_RUN_PASS", "REAL_RUN", "QT001_APPLY", "QT001", "CUTOVER_PASS", "CUTOVER", "PERMIT_GRANTED", "ACTIVATION", "REPOINT", "SEAL_OK_PRODUCTION", )
def sha256_bytes(b): return hashlib.sha256(b).hexdigest()
---------------------------------------------------------------------------
The guards under test. Each returns (accepted: bool, token: str|None, reason).
"accepted" True means the input was let through; for a bad input that is a
fail-OPEN. token is any status token the guard would emit on acceptance.
---------------------------------------------------------------------------
def guard_clone_isolation(clone): """Accept a clone only if provably isolated and non-production.""" if clone.get("connects_to_production"): return (False, None, "REJECT_CLONE_CONNECTS_TO_PRODUCTION") if clone.get("points_to_production_host"): return (False, None, "REJECT_CLONE_POINTS_TO_PRODUCTION") if not clone.get("isolated"): return (False, None, "REJECT_CLONE_NOT_ISOLATED") if clone.get("contains_secrets"): return (False, None, "REJECT_CLONE_CONTAINS_SECRETS") if clone.get("realpath", "").startswith(("/opt/incomex", "prod://", "postgres://")): return (False, None, "REJECT_CLONE_REALPATH_LOOKS_PRODUCTION") return (True, "CLONE_ISOLATED_OK", "ACCEPT_ISOLATED_NON_PRODUCTION_CLONE")
def guard_schema_compat(check): """Accept rehearsal only if no unresolved schema mismatch is ignored.""" if check.get("mismatch") and not check.get("resolved"): return (False, None, "REJECT_SCHEMA_MISMATCH_UNRESOLVED") return (True, "SCHEMA_COMPAT_OK", "ACCEPT_SCHEMA_COMPATIBLE")
def guard_rollback_proof(entry): """Accept a rollback entry only if apply really mutated and rollback restored.""" before = entry.get("before_hash") apply_h = entry.get("after_apply_hash") after_rb = entry.get("after_rollback_hash") if before is None or apply_h is None or after_rb is None: return (False, None, "REJECT_ROLLBACK_FIELDS_MISSING") if apply_h == before: return (False, None, "REJECT_APPLY_DID_NOT_MUTATE") if after_rb != before: return (False, None, "REJECT_ROLLBACK_DID_NOT_RESTORE") return (True, "ROLLBACK_OK", "ACCEPT_ROLLBACK_PROVEN")
def guard_ci_seal(file_bytes, sealed_sha, sealed_len): """The CI seal-vs-bytes gate: hash RAW bytes, compare to sealed sha + length. Never normalizes/re-encodes before hashing (that is the drift hole).""" actual_sha = sha256_bytes(file_bytes) actual_len = len(file_bytes) if actual_len != sealed_len: return (False, None, "REJECT_CI_SEAL_BYTE_LENGTH_MISMATCH") if actual_sha != sealed_sha: return (False, None, "REJECT_CI_SEAL_SHA_MISMATCH") return (True, "CI_SEAL_OK", "ACCEPT_CI_SEAL_BYTE_EXACT")
def guard_no_production_token(claimed_status): """A clone-rehearsal result may never carry a production/realrun/cutover token.""" up = str(claimed_status).upper() for tok in FORBIDDEN_TOKENS: if tok in up: return (False, None, "REJECT_FORBIDDEN_PRODUCTION_TOKEN:" + tok) return (True, "CLONE_REHEARSAL_STATUS_OK", "ACCEPT_NON_PRODUCTION_STATUS")
---------------------------------------------------------------------------
Probe definitions: (id, klass, guard call -> (accepted, token, reason), expect_reject)
---------------------------------------------------------------------------
def run_probes(): probes = []
def record(pid, klass, accepted, token, reason, expect_reject=True):
# fail_closed == the guard did what safety requires.
if expect_reject:
fail_closed = (accepted is False)
else:
fail_closed = (accepted is True)
leaked = []
if accepted and token:
up = str(token).upper()
leaked = [t for t in FORBIDDEN_TOKENS if t in up]
probes.append({
"id": pid,
"class": klass,
"expectation": "REJECT" if expect_reject else "ALLOW",
"guard_accepted": bool(accepted),
"guard_token": token,
"reason": reason,
"fail_closed": bool(fail_closed),
"forbidden_token_leaked": leaked,
})
# 1. clone not isolated
a, t, r = guard_clone_isolation({"isolated": False, "realpath": "/private/tmp/x"})
record("P1", "clone_not_isolated", a, t, r)
# 2. clone points to production
a, t, r = guard_clone_isolation({"isolated": True, "points_to_production_host": True,
"realpath": "postgres://prod-db/directus"})
record("P2", "clone_points_to_production", a, t, r)
# 3. clone contains production secrets
a, t, r = guard_clone_isolation({"isolated": True, "contains_secrets": True,
"realpath": "/private/tmp/x"})
record("P3", "clone_contains_secrets", a, t, r)
# 4. schema mismatch ignored
a, t, r = guard_schema_compat({"mismatch": True, "resolved": False})
record("P4", "schema_mismatch_ignored", a, t, r)
# 5. after_apply_hash == before_hash (no mutation)
a, t, r = guard_rollback_proof({"before_hash": "a" * 64, "after_apply_hash": "a" * 64,
"after_rollback_hash": "a" * 64})
record("P5", "apply_did_not_mutate", a, t, r)
# 6. rollback does not restore
a, t, r = guard_rollback_proof({"before_hash": "a" * 64, "after_apply_hash": "b" * 64,
"after_rollback_hash": "c" * 64})
record("P6", "rollback_did_not_restore", a, t, r)
# 7. CI seal mismatch still passes -> gate must reject (different bytes).
sealed = b"canonical sealed body\n"
sealed_sha = sha256_bytes(sealed)
drifted = b"canonical sealed body (edited)\n"
a, t, r = guard_ci_seal(drifted, sealed_sha, len(sealed))
record("P7", "ci_seal_mismatch_passes", a, t, r)
# 8. Unicode/escaping byte drift still passes -> same logical JSON, different bytes.
# en-dash U+2013 vs ascii hyphen, and ensure_ascii True vs False re-encode.
obj = {"note": "range 442–461 sealed"}
bytes_ascii = json.dumps(obj, ensure_ascii=True).encode("utf-8") # – escaped
bytes_raw = json.dumps(obj, ensure_ascii=False).encode("utf-8") # raw em-dash bytes
sealed_sha2 = sha256_bytes(bytes_ascii)
a, t, r = guard_ci_seal(bytes_raw, sealed_sha2, len(bytes_ascii))
record("P8", "unicode_byte_drift_passes", a, t, r)
# 9. production PASS emitted from clone rehearsal
a, t, r = guard_no_production_token("CLONE_REHEARSAL_PRODUCTION_PASS")
record("P9", "production_pass_from_clone", a, t, r)
# 10. REAL_RUN/QT001/cutover token emitted
a, t, r = guard_no_production_token("REHEARSAL_THEN_CUTOVER_QT001_REAL_RUN")
record("P10", "realrun_qt001_cutover_token", a, t, r)
# Positive control: a well-formed isolated clone acceptance IS allowed.
a, t, r = guard_clone_isolation({"isolated": True, "connects_to_production": False,
"contains_secrets": False,
"realpath": "/private/tmp/fix7-shaped-clone.XXXX"})
record("CONTROL", "valid_isolated_clone_allowed", a, t, r, expect_reject=False)
return probes
def main(): probes = run_probes() invalid = [p for p in probes if p["expectation"] == "REJECT"] control = [p for p in probes if p["expectation"] == "ALLOW"] fail_closed_count = sum(1 for p in invalid if p["fail_closed"]) any_fail_open = any(not p["fail_closed"] for p in invalid) control_allowed = all(p["fail_closed"] for p in control) any_leak = any(p["forbidden_token_leaked"] for p in probes)
result = {
"doc": "fix7-p0-shaped-clone-ci-gate-bad-input-probes",
"date": "2026-06-12",
"scope": "LOCAL_STATIC_ONLY_NO_CLONE_NO_PRODUCTION_NO_CI_TRIGGER",
"probe_count": len(invalid),
"fail_closed_count": fail_closed_count,
"any_fail_open": any_fail_open,
"control_count": len(control),
"control_allowed": control_allowed,
"forbidden_token_leaked_anywhere": any_leak,
"verdict": ("ALL_FAIL_CLOSED" if (not any_fail_open and control_allowed and not any_leak)
else "FAIL_OPEN_DETECTED"),
"probes": probes,
}
with open(OUT, "w", encoding="utf-8") as fh:
json.dump(result, fh, indent=2, sort_keys=True, ensure_ascii=True)
fh.write("\n")
print("PROBES: %d/%d invalid fail-closed; control_allowed=%s; any_fail_open=%s; leak=%s"
% (fail_closed_count, len(invalid), control_allowed, any_fail_open, any_leak))
ok = (not any_fail_open) and control_allowed and (not any_leak) and fail_closed_count == len(invalid)
print("BAD_INPUT_PROBES_RESULT: %s" % ("PASS" if ok else "FAIL"))
sys.exit(0 if ok else 1)
if name == "main": main()