KB-2A22 rev 2

DOT_R2_B2_STAGING_SCHEMA_SHELL — reference fail-closed validator (Macro-9B / 9B2 rev2)

15 min read Revision 2
dot-managemacro9bmacro9b2validatorcodefail-closedremediation2026-06-20

… reference implementation — Macro-9B2 rev2 remediation. Pure decision function; NO DB I/O. Proven locally 64/64 PASS, 0 fail-open (52 bad-input + 12 real_run simulation rows + 8 meta-assertions). This rev2 closes ALL 7 Codex HOLD findings against rev1 and supersedes the prior rev1 37/37 evidence. See dot-r2-b2-validator-test-run-v2.txt (superseding evidence; the rev1 dot-r2-b2-validator-test-run.txt is retained only as the historical pre-remediation record) and dot-schema-write-guards.contract.md. Source below.

"""
dot_r2_b2_staging_schema_shell_validator.py  (Macro-9B2 remediation, rev2)
=========================================================================
Reference validator / fail-closed guard logic for the DOT path:

    DOT_R2_B2_STAGING_SCHEMA_SHELL  (+ 4 separable guard components)

Mission: R2-B2-MACRO-9B2-DOT-VALIDATOR-CONTRACT-REMEDIATION-2026-06-20
Status : ENGINEERING ARTIFACT ONLY. Not registered in dot_tools. Not wired to
         any runtime. Performs NO database I/O of any kind.

rev2 closes the seven Codex HOLD findings against rev1:
  1. missing `channel` now rejects                       -> MISSING_CHANNEL
  2. missing/blank `actor` now rejects                   -> MISSING_ACTOR
  3. strict full-string validation (re.fullmatch) replaces match(...$)
  4. whitespace / control chars in target_schema|run_id  -> reject
     (MALFORMED_SCHEMA_CHARS / BAD_RUN_ID)
  5. gate-open must be EXACTLY boolean True; any non-bool ('true'/'false'/1/None)
     rejects with INVALID_GATE_TYPE -- no truthiness
  6/8. real_run / teardown_real_run require Guard 3 verdict == PASS before any
     write intent; FAIL/UNKNOWN/missing reject
     (PROD_UNTOUCHED_FAIL / PROD_UNTOUCHED_UNKNOWN)
  7. Guard 3 consumes explicit before/after evidence and emits PASS/FAIL/UNKNOWN
     (not just a static plan)
  9. Guard 4 no longer calls Guard 1; both call a shared pure helper
     (_validate_target) so separability is preserved.

BOUNDARY DISCIPLINE:
  - Pure decision function. Imports no DB driver, opens no socket, runs no
    DDL/DML, returns plans/verdicts/write-intent *as data only*.
  - The Owner real-run gate is a separate authority argument, NOT a field of the
    untrusted request dict. In this macro the runtime gate is CLOSED; the local
    test may pass owner_real_run_gate_open=True ONLY to prove Guard 3 is enforced.
    Even with the gate open this module returns write-INTENT strings, never a DB
    call. The write-enabled DOT implementation is NOT authored here
    (HOLD_FOR_OWNER_REAL_RUN).
"""

import re

DOT_CODE = "DOT_R2_B2_STAGING_SCHEMA_SHELL"

# Modes (DOT 18 row 1)
MODES = (
    "validate_only",
    "dry_run_plan",
    "real_run",
    "verify",
    "teardown_plan",
    "teardown_real_run",
)
REAL_RUN_MODES = ("real_run", "teardown_real_run")
NO_WRITE_MODES = ("validate_only", "dry_run_plan", "verify", "teardown_plan")

# Strict full-string patterns. Always used with re.fullmatch -- never re.match --
# and only AFTER _has_forbidden_chars has rejected any whitespace/control char.
ALLOWLIST_RE = re.compile(r"r2_b2_wb_[a-z0-9]+(_[a-z0-9]+)*")
RUN_ID_RE = re.compile(r"[0-9]{8}T[0-9]{6}Z")

# Hard-reject targets (DOT 18 principle 7). Compared case-insensitively + trimmed.
PROTECTED_SCHEMAS = frozenset({
    "public", "iu_core", "cutter_governance", "sandbox_tac",
    "information_schema", "pg_catalog", "pg_toast", "pg_temp",
})

# Only governed DOT channels may carry a write-enabled DOT (DOT 2.4 / 3).
GOVERNED_CHANNELS = frozenset({"process_dot_runner", "agent_api_executor"})
FORBIDDEN_CHANNELS = frozenset({
    "manual_sql", "psql", "docker_exec_psql", "directus_generic", "host_exec",
})

PROTECTED_SURFACES = {
    "structural_inventory_must_equal": ["public.*", "iu_core.*"],
    "invariants_must_equal": [
        "birth_registry.certified_count", "birth_registry.max_date_certified",
        "governance_object_ownership.count", "universal_edges.count",
        "universal_edges.provenance_count", "dot_tools.count",
        "directus_collections.count", "directus_fields.count", "directus_relations.count",
    ],
    "append_tables_write_set_must_be_empty_for_run": [
        "event_outbox", "system_issues", "directus_activity", "directus_revisions",
        "registry_changelog", "measurement_log",
    ],
}

# The surfaces Guard 3 must see present AND equal in before/after evidence to
# return PASS. Any missing surface -> UNKNOWN; any inequality -> FAIL (drift).
REQUIRED_VERIFY_SURFACES = (
    "public.object_count", "iu_core.object_count",
    "birth_registry.certified_count", "birth_registry.max_date_certified",
    "governance_object_ownership.count",
    "universal_edges.count", "universal_edges.provenance_count",
    "dot_tools.count",
    "directus_collections.count", "directus_fields.count", "directus_relations.count",
)

SEVEN_SHELL_TABLES = (
    "wb_manifest", "wb_object", "wb_object_state", "wb_edge",
    "wb_audit", "wb_drift_check", "wb_teardown_log",
)


def _has_forbidden_chars(s):
    """True if s contains ANY whitespace or control / non-printable character.
    Catches trailing newline, carriage return, tab, vertical tab, form feed,
    leading/trailing spaces, NUL and other C0 control chars + DEL. Pure, no I/O."""
    for ch in s:
        if ch.isspace():
            return True
        o = ord(ch)
        if o < 0x20 or o == 0x7f:
            return True
    return False


def _validate_target(target, run_id):
    """Shared PURE target-schema validation. Returns reject codes (empty == pass).
    Called by BOTH Guard 1 (allowlist) and Guard 4 (delete-fast) so neither guard
    calls the other -- separability preserved (Codex finding 5/6, mission C9)."""
    if target is None or str(target).strip() == "":
        return ["MISSING_TARGET_SCHEMA"]
    raw = str(target)
    rejects = []
    if _has_forbidden_chars(raw):
        rejects.append("MALFORMED_SCHEMA_CHARS")
    if raw.strip().lower() in PROTECTED_SCHEMAS:
        rejects.append("PROTECTED_SCHEMA_TARGET")
    if ALLOWLIST_RE.fullmatch(raw) is None:
        rejects.append("NON_ALLOWLIST_SCHEMA")
    elif run_id is not None and str(run_id).strip() != "":
        token = str(run_id).strip().lower()
        if token not in raw.lower():
            rejects.append("SCHEMA_RUNID_MISMATCH")
    return rejects


def allowlist_guard(req):
    """Guard 1 DOT_SCHEMA_WRITE_ALLOWLIST_GUARD. Pure; delegates ONLY to the shared
    target helper. Returns reject codes (empty == pass)."""
    return _validate_target(req.get("target_schema"), req.get("run_id"))


def delete_fast_guard(req):
    """Guard 4 DOT_STAGING_SCHEMA_DELETE_FAST. Re-checks the SAME allowlist via the
    shared helper before any DROP -- does NOT call Guard 1 (separability)."""
    return _validate_target(req.get("target_schema"), req.get("run_id"))


def production_untouched_plan():
    """Reference read-only before/after plan: describes the evidence Guard 3 needs."""
    return {
        "kind": "production_untouched_verify", "method": "read_only_before_after",
        "abort_on_drift": True, "surfaces": PROTECTED_SURFACES,
        "required_evidence_surfaces": list(REQUIRED_VERIFY_SURFACES),
        "note": ("Structural object-inventory equality on public/iu_core + frozen "
                 "invariants. Append-only tables checked by run write-set = empty, "
                 "NOT by raw count (they grow from unrelated background flow)."),
    }


def production_untouched_verify(evidence):
    """Guard 3 DOT_PRODUCTION_UNTOUCHED_VERIFY -- PURE verdict over EXPLICIT
    before/after evidence. NO DB I/O. Returns {verdict, drift, reason, plan}.
      - evidence None / not a dict / missing before|after / incomplete -> UNKNOWN
      - any required surface before != after                          -> FAIL
      - all required surfaces present and equal                       -> PASS
    A real_run may proceed only on PASS (enforced by the router)."""
    plan = production_untouched_plan()
    if not isinstance(evidence, dict):
        return {"verdict": "UNKNOWN", "drift": [], "reason": "no_evidence_supplied", "plan": plan}
    before = evidence.get("before")
    after = evidence.get("after")
    if not isinstance(before, dict) or not isinstance(after, dict):
        return {"verdict": "UNKNOWN", "drift": [], "reason": "missing_before_or_after", "plan": plan}
    missing = [s for s in REQUIRED_VERIFY_SURFACES if s not in before or s not in after]
    if missing:
        return {"verdict": "UNKNOWN", "drift": [], "reason": "incomplete_surfaces",
                "missing_surfaces": missing, "plan": plan}
    drift = [s for s in REQUIRED_VERIFY_SURFACES if before[s] != after[s]]
    if drift:
        return {"verdict": "FAIL", "drift": drift, "reason": "production_drift_detected", "plan": plan}
    return {"verdict": "PASS", "drift": [], "reason": "all_surfaces_equal", "plan": plan}


def audit_proof(req, reject_codes, write_intent, prod_untouched_verdict=None):
    """Guard 2 DOT_SCHEMA_WRITE_AUDIT_PROOF. Envelope for EVERY decision."""
    return {
        "dot_code": req.get("dot_code"), "actor": req.get("actor"),
        "run_id": req.get("run_id"), "mode": req.get("mode"),
        "target_schema": req.get("target_schema"),
        "owner_authorization_ref": req.get("owner_authorization_ref"),
        "channel": req.get("channel"), "decided_at": "<runtime-stamp>",
        "reject_codes": list(reject_codes), "write_intent": list(write_intent),
        "production_untouched_verdict": prod_untouched_verdict,
        "before_snapshot_ref": "<runtime>" if write_intent else None,
        "after_snapshot_ref": "<runtime>" if write_intent else None,
    }


def validate_request(req, owner_real_run_gate_open=False):
    """Primary router. PURE decision function. Collects ALL reject codes. No DB I/O.

    `owner_real_run_gate_open` is the explicit authority-gate argument -- it is NOT
    read from the untrusted `req`. It must be EXACTLY boolean True to permit a
    real-run write intent; any non-bool value rejects (INVALID_GATE_TYPE)."""
    rejects = []
    mode = req.get("mode")

    if req.get("dot_code") != DOT_CODE:
        rejects.append("WRONG_DOT_CODE")
    if mode not in MODES:
        rejects.append("UNKNOWN_MODE")

    # channel: required, governed only
    channel = req.get("channel")
    if channel is None or str(channel).strip() == "":
        rejects.append("MISSING_CHANNEL")
    elif channel in FORBIDDEN_CHANNELS:
        rejects.append("FORBIDDEN_MANUAL_CHANNEL")
    elif channel not in GOVERNED_CHANNELS:
        rejects.append("UNKNOWN_CHANNEL")
    if req.get("use_directus_generic_create"):
        rejects.append("DIRECTUS_GENERIC_FORBIDDEN")

    # actor: required (audit identity)
    actor = req.get("actor")
    if actor is None or str(actor).strip() == "":
        rejects.append("MISSING_ACTOR")

    # run_id: required, strict, no control/whitespace chars
    run_id = req.get("run_id")
    if run_id is None or str(run_id).strip() == "":
        rejects.append("MISSING_RUN_ID")
    elif _has_forbidden_chars(str(run_id)) or RUN_ID_RE.fullmatch(str(run_id)) is None:
        rejects.append("BAD_RUN_ID")

    if req.get("copy_production_data"):
        rejects.append("PROD_DATA_COPY_FORBIDDEN")

    oar = req.get("owner_authorization_ref")
    if oar is None or str(oar).strip() == "":
        rejects.append("MISSING_OWNER_AUTH")

    # target allowlist via shared helper (Guard 1 create / Guard 4 teardown)
    if mode in ("teardown_plan", "teardown_real_run"):
        rejects.extend(delete_fast_guard(req))
    else:
        rejects.extend(allowlist_guard(req))

    # real-run gate (STRICT boolean) + Guard 3 production-untouched (must PASS)
    g3 = None
    if mode in REAL_RUN_MODES:
        if not isinstance(owner_real_run_gate_open, bool):
            rejects.append("INVALID_GATE_TYPE")
        elif owner_real_run_gate_open is not True:
            rejects.append("REAL_RUN_GATE_CLOSED")
        else:
            g3 = production_untouched_verify(req.get("production_untouched_evidence"))
            if g3["verdict"] == "FAIL":
                rejects.append("PROD_UNTOUCHED_FAIL")
            elif g3["verdict"] != "PASS":
                rejects.append("PROD_UNTOUCHED_UNKNOWN")

    # de-dup, preserve order
    seen = []
    for r in rejects:
        if r not in seen:
            seen.append(r)
    rejects = seen

    accepted = (len(rejects) == 0)
    writes = []
    plan = None
    prod_verdict = g3["verdict"] if g3 else None

    if accepted:
        if mode == "validate_only":
            plan = {"kind": "validate_only", "result": "inputs_valid"}
        elif mode == "dry_run_plan":
            tgt = req["target_schema"]
            ddl = [f"CREATE SCHEMA {tgt};"]
            ddl += [f"CREATE TABLE {tgt}.{t} (...);" for t in SEVEN_SHELL_TABLES]
            plan = {"kind": "dry_run_plan", "target_schema": tgt,
                    "seven_shell_tables": list(SEVEN_SHELL_TABLES),
                    "ddl_preview": ddl, "writes_if_executed": len(ddl)}
        elif mode == "verify":
            plan = production_untouched_verify(req.get("production_untouched_evidence"))
        elif mode == "teardown_plan":
            tgt = req["target_schema"]
            plan = {"kind": "teardown_plan", "ddl_preview": [f"DROP SCHEMA {tgt} CASCADE;"]}
        elif mode in REAL_RUN_MODES:
            # Reached ONLY when: gate is EXACTLY True AND Guard 3 == PASS AND every
            # input is valid. Emits write-INTENT strings only; never a DB call.
            tgt = req["target_schema"]
            if mode == "real_run":
                writes = [f"CREATE SCHEMA {tgt};"] + \
                         [f"CREATE TABLE {tgt}.{t} (...);" for t in SEVEN_SHELL_TABLES]
            else:
                writes = [f"DROP SCHEMA {tgt} CASCADE;"]
            plan = {"kind": mode, "target_schema": tgt, "GATE": "OWNER_REAL_RUN_OPEN",
                    "production_untouched_verdict": "PASS",
                    "note": ("write intent only; execution is a separate runtime step; "
                             "NOT executed in this macro (HOLD_FOR_OWNER_REAL_RUN)")}

    audit = audit_proof(req, rejects, writes, prod_verdict)
    return {"accepted": accepted, "mode": mode, "reject_codes": rejects,
            "writes": writes, "plan": plan,
            "production_untouched_verdict": prod_verdict, "audit": audit}