KB-472C

FIX7 P0 Final Pre-Real-Data — bad_input_probes.py

9 min read Revision 1
tool-kiem-thufix7p0final-pre-real-datareadiness-packet2026-06-12

#!/usr/bin/env python3

-- coding: ascii --

"""FIX7 P0 final pre-real-data lane - bad-input probes (must ALL fail closed).

Each probe feeds a defective input to a guard in this lane and PASSES only if the guard REFUSES (fail-closed). A probe FAILS (fail-open) if the defective input is accepted. The suite also scans every captured rejection output for forbidden authorization tokens - a rejection must never leak a token that could be read as a production/seal/cutover authorization.

No production contact. Everything runs in throwaway temp dirs. """ import copy import json import os import sqlite3 import subprocess import sys import tempfile

sys.dont_write_bytecode = True HERE = os.path.dirname(os.path.abspath(file)) sys.path.insert(0, HERE)

import generate_production_shaped_surrogate as G # noqa: E402 import surrogate_rehearsal as R # noqa: E402 import hardened_dryrun_validator as V # noqa: E402

FORBIDDEN_TOKENS = ( "PRODUCTION_PASS", "PRODUCTION_OK", "REAL_RUN_AUTHORIZED", "REAL_RUN_OK", "QT001_AUTHORIZED", "CUTOVER_AUTHORIZED", "PERMIT_GRANTED", "ACTIVATION_AUTHORIZED", "REPOINT_AUTHORIZED", "SEAL_AUTHORED", )

results = [] captured_outputs = []

def record(pid, desc, fail_closed, detail): detail = str(detail) captured_outputs.append(detail) results.append({"probe": pid, "description": desc, "fail_closed_as_expected": bool(fail_closed), "rejection_detail": detail[:300]}) print("[%s] %s :: %s" % ("PASS" if fail_closed else "FAIL", pid, desc))

def load_rb(): with open(os.path.join(HERE, "rollback-evidence.json"), encoding="utf-8") as fh: return json.load(fh)

def main(): tmp = tempfile.mkdtemp(prefix="fix7-final-bad-input.", dir="/private/tmp")

# P1: plain sqlite db without surrogate provenance -> rehearsal must refuse
p1 = os.path.join(tmp, "plain.db")
conn = sqlite3.connect(p1)
conn.execute("CREATE TABLE t (x)")
conn.commit()
err = R.check_surrogate_marker(conn)
conn.close()
record("P1", "db without surrogate_provenance is refused",
       err is not None and err.startswith("REJECT_NOT_A_SURROGATE"), err)

# P2: provenance claims REAL production data -> refused
p2 = os.path.join(tmp, "claims-real.db")
conn = sqlite3.connect(p2)
conn.execute("CREATE TABLE surrogate_provenance (marker TEXT, "
             "real_production_data TEXT, contains_secrets TEXT, contains_pii TEXT)")
conn.execute("INSERT INTO surrogate_provenance VALUES (?, 'YES', 'NO', 'NO')",
             (R.MARKER,))
conn.commit()
err = R.check_surrogate_marker(conn)
conn.close()
record("P2", "provenance claiming real_production_data=YES is refused",
       err is not None and "PROVENANCE_INVALID" in err, err)

# P3: wrong marker string -> refused
p3 = os.path.join(tmp, "wrong-marker.db")
conn = sqlite3.connect(p3)
conn.execute("CREATE TABLE surrogate_provenance (marker TEXT, "
             "real_production_data TEXT, contains_secrets TEXT, contains_pii TEXT)")
conn.execute("INSERT INTO surrogate_provenance VALUES "
             "('TOTALLY_REAL_PRODUCTION_DUMP', 'NO', 'NO', 'NO')")
conn.commit()
err = R.check_surrogate_marker(conn)
conn.close()
record("P3", "wrong provenance marker string is refused",
       err is not None and "PROVENANCE_INVALID" in err, err)

# P4: birth attempt with UNAPPROVED proposal -> Tier-0 gate refuses
p4 = os.path.join(tmp, "gate-unapproved.db")
G.generate(p4)
conn = sqlite3.connect(p4)
try:
    R.apply_birth(conn, "SURR-PROPOSAL-UNAPPROVED")
    out4, closed4 = "ACCEPTED (fail-open)", False
except SystemExit as e:
    out4 = str(e)
    closed4 = "REJECT_TIER0_GATE_NOT_SATISFIED" in out4
conn.close()
record("P4", "birth with approvals=0 proposal is refused", closed4, out4)

# P5: birth attempt with db_target != directus -> Tier-0 gate refuses
conn = sqlite3.connect(p4)
try:
    R.apply_birth(conn, "SURR-PROPOSAL-WRONG-DB")
    out5, closed5 = "ACCEPTED (fail-open)", False
except SystemExit as e:
    out5 = str(e)
    closed5 = "REJECT_TIER0_GATE_NOT_SATISFIED" in out5
conn.close()
record("P5", "birth with db_target!=directus is refused", closed5, out5)

# P6: production-looking target path -> refused
err = R.check_target_path("/private/tmp/prod-directus-vps/clone.db")
record("P6", "production-like token in target path is refused",
       err is not None and "REJECT_TARGET_LOOKS_PRODUCTION_LIKE" in err, err)

# P7: target path outside allowed staging roots -> refused
err = R.check_target_path("/Users/nmhuyen/somewhere/clone.db")
record("P7", "target outside staging roots is refused",
       err is not None and "REJECT_TARGET_NOT_IN_STAGING_ROOT" in err, err)

rb = load_rb()

# P8: fabricated no-mutation rollback -> hardened validator fails closed
fake = copy.deepcopy(rb)
fake["entries"][0]["after_apply_hash"] = fake["entries"][0]["before_hash"]
fails = V.check_rollback_proof(fake)
record("P8", "no-mutation rollback evidence fails closed",
       any(f.startswith("ROLLBACK_APPLY_DID_NOT_MUTATE") for f in fails), fails)

# P9: after_rollback != before (not restored) -> fails closed
fake = copy.deepcopy(rb)
fake["entries"][0]["after_rollback_hash"] = "0" * 64
fails = V.check_rollback_proof(fake)
record("P9", "non-restored rollback evidence fails closed",
       any(f.startswith("ROLLBACK_NOT_RESTORED") for f in fails), fails)

# P10: claiming a PRODUCTION rollback proof -> fails closed
fake = copy.deepcopy(rb)
fake["production_rollback_status"] = "PROVEN_IN_PRODUCTION"
fails = V.check_rollback_proof(fake)
record("P10", "claimed production rollback proof fails closed",
       any(f.startswith("PRODUCTION_ROLLBACK_CLAIMED") for f in fails), fails)

# P11: missing rollback-evidence.json -> wrapper exits nonzero, no PASS verdict
iso = os.path.join(tmp, "iso-missing-evidence")
os.makedirs(iso)
for f in ("hardened_dryrun_validator.py", "valid_evidence_recheck.json",
          "run_hardened_validator.py"):
    with open(os.path.join(HERE, f), "rb") as src, \
            open(os.path.join(iso, f), "wb") as dst:
        dst.write(src.read())
proc = subprocess.run([sys.executable,
                       os.path.join(iso, "run_hardened_validator.py")],
                      capture_output=True, text=True, cwd=iso)
closed11 = (proc.returncode != 0
            and "HARDENED_VALIDATOR_OVERALL: PASS" not in proc.stdout)
record("P11", "missing rollback evidence -> wrapper exits nonzero without PASS",
       closed11, "rc=%d" % proc.returncode)

# P12: expected restored-pin mismatch -> fails closed
fake = copy.deepcopy(rb)
fake["entries"][0]["expected_restored_hash"] = "f" * 64
fake["entries"][0]["after_rollback_hash"] = fake["entries"][0]["before_hash"]
fails = V.check_rollback_proof(fake)
record("P12", "restored hash != explicit pin fails closed",
       any(f.startswith("ROLLBACK_NOT_RESTORED_TO_PIN") for f in fails), fails)

# forbidden-token leak scan across all captured rejection outputs
blob = "\n".join(captured_outputs)
leaked = [t for t in FORBIDDEN_TOKENS if t in blob]
token_clean = (len(leaked) == 0)
print("[%s] forbidden-token leak scan (%d tokens checked)"
      % ("PASS" if token_clean else "FAIL", len(FORBIDDEN_TOKENS)))

any_fail_open = any(not r["fail_closed_as_expected"] for r in results)
ok = (not any_fail_open) and token_clean
out = {
    "doc": "fix7-p0-final-pre-real-data-bad-input-probes",
    "date": "2026-06-12",
    "probe_count": len(results),
    "probes": results,
    "any_fail_open": any_fail_open,
    "forbidden_tokens_checked": list(FORBIDDEN_TOKENS),
    "forbidden_tokens_leaked": leaked,
    "all_fail_closed": ok,
    "production_contact": False,
}
with open(os.path.join(HERE, "bad-input-probes.json"), "w",
          encoding="ascii") as fh:
    json.dump(out, fh, indent=2, sort_keys=True, ensure_ascii=True)
    fh.write("\n")
print("BAD_INPUT_PROBES: %d/%d fail-closed, any_fail_open=%s"
      % (sum(1 for r in results if r["fail_closed_as_expected"]),
         len(results), any_fail_open))
print("BAD_INPUT_PROBES_RESULT: %s" % ("PASS" if ok else "FAIL"))
sys.exit(0 if ok else 1)

if name == "main": main()

Back to Knowledge Hub knowledge/dev/reports/architecture/fix7-p0-final-pre-real-data-readiness-packet-2026-06-12/bad_input_probes.py