KB-4CE8 rev 2

FIX7 P0 Shaped-Clone CI-Gate — bad_input_probes.py

11 min read Revision 2
tool-kiem-thufix7p0production-shaped-cloneci-gate-packet2026-06-12

#!/usr/bin/env python3

-- coding: utf-8 --

""" FIX7 P0 - Production-Shaped Clone Rehearsal + CI Gate :: bad-input probe harness.

Local/static ONLY. No clone, no production, no CI trigger. Each probe feeds a deliberately invalid input to the relevant acceptance check and asserts the check FAILS CLOSED (rejects), emitting NO production/seal/cutover token.

A probe "fails closed" when the guard rejects the bad input. A probe that would have ACCEPTED a bad input is a fail-OPEN and makes the whole harness exit 1.

Ten invalid classes are required by the macro:

  1. clone not isolated
  2. clone points to production
  3. clone contains production secrets
  4. schema mismatch ignored
  5. after_apply_hash == before_hash
  6. rollback does not restore
  7. CI seal mismatch still passes
  8. Unicode/escaping byte drift still passes
  9. production PASS emitted from clone rehearsal
  10. REAL_RUN/QT001/cutover token emitted Plus one positive control (a well-formed isolated-clone acceptance) that is allowed. """ import json import hashlib import os import sys

HERE = os.path.dirname(os.path.abspath(file)) OUT = os.path.join(HERE, "bad-input-probes.json")

Tokens that must NEVER be emitted by any acceptance of a clone/rehearsal input.

FORBIDDEN_TOKENS = ( "PRODUCTION_PASS", "PRODUCTION_EXECUTION", "REAL_RUN_PASS", "REAL_RUN", "QT001_APPLY", "QT001", "CUTOVER_PASS", "CUTOVER", "PERMIT_GRANTED", "ACTIVATION", "REPOINT", "SEAL_OK_PRODUCTION", )

def sha256_bytes(b): return hashlib.sha256(b).hexdigest()

---------------------------------------------------------------------------

The guards under test. Each returns (accepted: bool, token: str|None, reason).

"accepted" True means the input was let through; for a bad input that is a

fail-OPEN. token is any status token the guard would emit on acceptance.

---------------------------------------------------------------------------

def guard_clone_isolation(clone): """Accept a clone only if provably isolated and non-production.""" if clone.get("connects_to_production"): return (False, None, "REJECT_CLONE_CONNECTS_TO_PRODUCTION") if clone.get("points_to_production_host"): return (False, None, "REJECT_CLONE_POINTS_TO_PRODUCTION") if not clone.get("isolated"): return (False, None, "REJECT_CLONE_NOT_ISOLATED") if clone.get("contains_secrets"): return (False, None, "REJECT_CLONE_CONTAINS_SECRETS") if clone.get("realpath", "").startswith(("/opt/incomex", "prod://", "postgres://")): return (False, None, "REJECT_CLONE_REALPATH_LOOKS_PRODUCTION") return (True, "CLONE_ISOLATED_OK", "ACCEPT_ISOLATED_NON_PRODUCTION_CLONE")

def guard_schema_compat(check): """Accept rehearsal only if no unresolved schema mismatch is ignored.""" if check.get("mismatch") and not check.get("resolved"): return (False, None, "REJECT_SCHEMA_MISMATCH_UNRESOLVED") return (True, "SCHEMA_COMPAT_OK", "ACCEPT_SCHEMA_COMPATIBLE")

def guard_rollback_proof(entry): """Accept a rollback entry only if apply really mutated and rollback restored.""" before = entry.get("before_hash") apply_h = entry.get("after_apply_hash") after_rb = entry.get("after_rollback_hash") if before is None or apply_h is None or after_rb is None: return (False, None, "REJECT_ROLLBACK_FIELDS_MISSING") if apply_h == before: return (False, None, "REJECT_APPLY_DID_NOT_MUTATE") if after_rb != before: return (False, None, "REJECT_ROLLBACK_DID_NOT_RESTORE") return (True, "ROLLBACK_OK", "ACCEPT_ROLLBACK_PROVEN")

def guard_ci_seal(file_bytes, sealed_sha, sealed_len): """The CI seal-vs-bytes gate: hash RAW bytes, compare to sealed sha + length. Never normalizes/re-encodes before hashing (that is the drift hole).""" actual_sha = sha256_bytes(file_bytes) actual_len = len(file_bytes) if actual_len != sealed_len: return (False, None, "REJECT_CI_SEAL_BYTE_LENGTH_MISMATCH") if actual_sha != sealed_sha: return (False, None, "REJECT_CI_SEAL_SHA_MISMATCH") return (True, "CI_SEAL_OK", "ACCEPT_CI_SEAL_BYTE_EXACT")

def guard_no_production_token(claimed_status): """A clone-rehearsal result may never carry a production/realrun/cutover token.""" up = str(claimed_status).upper() for tok in FORBIDDEN_TOKENS: if tok in up: return (False, None, "REJECT_FORBIDDEN_PRODUCTION_TOKEN:" + tok) return (True, "CLONE_REHEARSAL_STATUS_OK", "ACCEPT_NON_PRODUCTION_STATUS")

---------------------------------------------------------------------------

Probe definitions: (id, klass, guard call -> (accepted, token, reason), expect_reject)

---------------------------------------------------------------------------

def run_probes(): probes = []

def record(pid, klass, accepted, token, reason, expect_reject=True):
    # fail_closed == the guard did what safety requires.
    if expect_reject:
        fail_closed = (accepted is False)
    else:
        fail_closed = (accepted is True)
    leaked = []
    if accepted and token:
        up = str(token).upper()
        leaked = [t for t in FORBIDDEN_TOKENS if t in up]
    probes.append({
        "id": pid,
        "class": klass,
        "expectation": "REJECT" if expect_reject else "ALLOW",
        "guard_accepted": bool(accepted),
        "guard_token": token,
        "reason": reason,
        "fail_closed": bool(fail_closed),
        "forbidden_token_leaked": leaked,
    })

# 1. clone not isolated
a, t, r = guard_clone_isolation({"isolated": False, "realpath": "/private/tmp/x"})
record("P1", "clone_not_isolated", a, t, r)

# 2. clone points to production
a, t, r = guard_clone_isolation({"isolated": True, "points_to_production_host": True,
                                 "realpath": "postgres://prod-db/directus"})
record("P2", "clone_points_to_production", a, t, r)

# 3. clone contains production secrets
a, t, r = guard_clone_isolation({"isolated": True, "contains_secrets": True,
                                 "realpath": "/private/tmp/x"})
record("P3", "clone_contains_secrets", a, t, r)

# 4. schema mismatch ignored
a, t, r = guard_schema_compat({"mismatch": True, "resolved": False})
record("P4", "schema_mismatch_ignored", a, t, r)

# 5. after_apply_hash == before_hash (no mutation)
a, t, r = guard_rollback_proof({"before_hash": "a" * 64, "after_apply_hash": "a" * 64,
                                "after_rollback_hash": "a" * 64})
record("P5", "apply_did_not_mutate", a, t, r)

# 6. rollback does not restore
a, t, r = guard_rollback_proof({"before_hash": "a" * 64, "after_apply_hash": "b" * 64,
                                "after_rollback_hash": "c" * 64})
record("P6", "rollback_did_not_restore", a, t, r)

# 7. CI seal mismatch still passes -> gate must reject (different bytes).
sealed = b"canonical sealed body\n"
sealed_sha = sha256_bytes(sealed)
drifted = b"canonical sealed body (edited)\n"
a, t, r = guard_ci_seal(drifted, sealed_sha, len(sealed))
record("P7", "ci_seal_mismatch_passes", a, t, r)

# 8. Unicode/escaping byte drift still passes -> same logical JSON, different bytes.
#    en-dash U+2013 vs ascii hyphen, and ensure_ascii True vs False re-encode.
obj = {"note": "range 442–461 sealed"}
bytes_ascii = json.dumps(obj, ensure_ascii=True).encode("utf-8")   # – escaped
bytes_raw = json.dumps(obj, ensure_ascii=False).encode("utf-8")    # raw em-dash bytes
sealed_sha2 = sha256_bytes(bytes_ascii)
a, t, r = guard_ci_seal(bytes_raw, sealed_sha2, len(bytes_ascii))
record("P8", "unicode_byte_drift_passes", a, t, r)

# 9. production PASS emitted from clone rehearsal
a, t, r = guard_no_production_token("CLONE_REHEARSAL_PRODUCTION_PASS")
record("P9", "production_pass_from_clone", a, t, r)

# 10. REAL_RUN/QT001/cutover token emitted
a, t, r = guard_no_production_token("REHEARSAL_THEN_CUTOVER_QT001_REAL_RUN")
record("P10", "realrun_qt001_cutover_token", a, t, r)

# Positive control: a well-formed isolated clone acceptance IS allowed.
a, t, r = guard_clone_isolation({"isolated": True, "connects_to_production": False,
                                 "contains_secrets": False,
                                 "realpath": "/private/tmp/fix7-shaped-clone.XXXX"})
record("CONTROL", "valid_isolated_clone_allowed", a, t, r, expect_reject=False)

return probes

def main(): probes = run_probes() invalid = [p for p in probes if p["expectation"] == "REJECT"] control = [p for p in probes if p["expectation"] == "ALLOW"] fail_closed_count = sum(1 for p in invalid if p["fail_closed"]) any_fail_open = any(not p["fail_closed"] for p in invalid) control_allowed = all(p["fail_closed"] for p in control) any_leak = any(p["forbidden_token_leaked"] for p in probes)

result = {
    "doc": "fix7-p0-shaped-clone-ci-gate-bad-input-probes",
    "date": "2026-06-12",
    "scope": "LOCAL_STATIC_ONLY_NO_CLONE_NO_PRODUCTION_NO_CI_TRIGGER",
    "probe_count": len(invalid),
    "fail_closed_count": fail_closed_count,
    "any_fail_open": any_fail_open,
    "control_count": len(control),
    "control_allowed": control_allowed,
    "forbidden_token_leaked_anywhere": any_leak,
    "verdict": ("ALL_FAIL_CLOSED" if (not any_fail_open and control_allowed and not any_leak)
                else "FAIL_OPEN_DETECTED"),
    "probes": probes,
}
with open(OUT, "w", encoding="utf-8") as fh:
    json.dump(result, fh, indent=2, sort_keys=True, ensure_ascii=True)
    fh.write("\n")

print("PROBES: %d/%d invalid fail-closed; control_allowed=%s; any_fail_open=%s; leak=%s"
      % (fail_closed_count, len(invalid), control_allowed, any_fail_open, any_leak))
ok = (not any_fail_open) and control_allowed and (not any_leak) and fail_closed_count == len(invalid)
print("BAD_INPUT_PROBES_RESULT: %s" % ("PASS" if ok else "FAIL"))
sys.exit(0 if ok else 1)

if name == "main": main()

Back to Knowledge Hub knowledge/dev/reports/architecture/fix7-p0-production-shaped-clone-rehearsal-ci-gate-packet-2026-06-12/bad_input_probes.py