KB-2A22 rev 2
DOT_R2_B2_STAGING_SCHEMA_SHELL — reference fail-closed validator (Macro-9B / 9B2 rev2)
15 min read Revision 2
dot-managemacro9bmacro9b2validatorcodefail-closedremediation2026-06-20
… reference implementation — Macro-9B2 rev2 remediation. Pure decision function; NO DB I/O. Proven locally 64/64 PASS, 0 fail-open (52 bad-input + 12 real_run simulation rows + 8 meta-assertions). This rev2 closes ALL 7 Codex HOLD findings against rev1 and supersedes the prior rev1 37/37 evidence. See dot-r2-b2-validator-test-run-v2.txt (superseding evidence; the rev1 dot-r2-b2-validator-test-run.txt is retained only as the historical pre-remediation record) and dot-schema-write-guards.contract.md. Source below.
"""
dot_r2_b2_staging_schema_shell_validator.py (Macro-9B2 remediation, rev2)
=========================================================================
Reference validator / fail-closed guard logic for the DOT path:
DOT_R2_B2_STAGING_SCHEMA_SHELL (+ 4 separable guard components)
Mission: R2-B2-MACRO-9B2-DOT-VALIDATOR-CONTRACT-REMEDIATION-2026-06-20
Status : ENGINEERING ARTIFACT ONLY. Not registered in dot_tools. Not wired to
any runtime. Performs NO database I/O of any kind.
rev2 closes the seven Codex HOLD findings against rev1:
1. missing `channel` now rejects -> MISSING_CHANNEL
2. missing/blank `actor` now rejects -> MISSING_ACTOR
3. strict full-string validation (re.fullmatch) replaces match(...$)
4. whitespace / control chars in target_schema|run_id -> reject
(MALFORMED_SCHEMA_CHARS / BAD_RUN_ID)
5. gate-open must be EXACTLY boolean True; any non-bool ('true'/'false'/1/None)
rejects with INVALID_GATE_TYPE -- no truthiness
6/8. real_run / teardown_real_run require Guard 3 verdict == PASS before any
write intent; FAIL/UNKNOWN/missing reject
(PROD_UNTOUCHED_FAIL / PROD_UNTOUCHED_UNKNOWN)
7. Guard 3 consumes explicit before/after evidence and emits PASS/FAIL/UNKNOWN
(not just a static plan)
9. Guard 4 no longer calls Guard 1; both call a shared pure helper
(_validate_target) so separability is preserved.
BOUNDARY DISCIPLINE:
- Pure decision function. Imports no DB driver, opens no socket, runs no
DDL/DML, returns plans/verdicts/write-intent *as data only*.
- The Owner real-run gate is a separate authority argument, NOT a field of the
untrusted request dict. In this macro the runtime gate is CLOSED; the local
test may pass owner_real_run_gate_open=True ONLY to prove Guard 3 is enforced.
Even with the gate open this module returns write-INTENT strings, never a DB
call. The write-enabled DOT implementation is NOT authored here
(HOLD_FOR_OWNER_REAL_RUN).
"""
import re
DOT_CODE = "DOT_R2_B2_STAGING_SCHEMA_SHELL"
# Modes (DOT 18 row 1)
MODES = (
"validate_only",
"dry_run_plan",
"real_run",
"verify",
"teardown_plan",
"teardown_real_run",
)
REAL_RUN_MODES = ("real_run", "teardown_real_run")
NO_WRITE_MODES = ("validate_only", "dry_run_plan", "verify", "teardown_plan")
# Strict full-string patterns. Always used with re.fullmatch -- never re.match --
# and only AFTER _has_forbidden_chars has rejected any whitespace/control char.
ALLOWLIST_RE = re.compile(r"r2_b2_wb_[a-z0-9]+(_[a-z0-9]+)*")
RUN_ID_RE = re.compile(r"[0-9]{8}T[0-9]{6}Z")
# Hard-reject targets (DOT 18 principle 7). Compared case-insensitively + trimmed.
PROTECTED_SCHEMAS = frozenset({
"public", "iu_core", "cutter_governance", "sandbox_tac",
"information_schema", "pg_catalog", "pg_toast", "pg_temp",
})
# Only governed DOT channels may carry a write-enabled DOT (DOT 2.4 / 3).
GOVERNED_CHANNELS = frozenset({"process_dot_runner", "agent_api_executor"})
FORBIDDEN_CHANNELS = frozenset({
"manual_sql", "psql", "docker_exec_psql", "directus_generic", "host_exec",
})
PROTECTED_SURFACES = {
"structural_inventory_must_equal": ["public.*", "iu_core.*"],
"invariants_must_equal": [
"birth_registry.certified_count", "birth_registry.max_date_certified",
"governance_object_ownership.count", "universal_edges.count",
"universal_edges.provenance_count", "dot_tools.count",
"directus_collections.count", "directus_fields.count", "directus_relations.count",
],
"append_tables_write_set_must_be_empty_for_run": [
"event_outbox", "system_issues", "directus_activity", "directus_revisions",
"registry_changelog", "measurement_log",
],
}
# The surfaces Guard 3 must see present AND equal in before/after evidence to
# return PASS. Any missing surface -> UNKNOWN; any inequality -> FAIL (drift).
REQUIRED_VERIFY_SURFACES = (
"public.object_count", "iu_core.object_count",
"birth_registry.certified_count", "birth_registry.max_date_certified",
"governance_object_ownership.count",
"universal_edges.count", "universal_edges.provenance_count",
"dot_tools.count",
"directus_collections.count", "directus_fields.count", "directus_relations.count",
)
SEVEN_SHELL_TABLES = (
"wb_manifest", "wb_object", "wb_object_state", "wb_edge",
"wb_audit", "wb_drift_check", "wb_teardown_log",
)
def _has_forbidden_chars(s):
"""True if s contains ANY whitespace or control / non-printable character.
Catches trailing newline, carriage return, tab, vertical tab, form feed,
leading/trailing spaces, NUL and other C0 control chars + DEL. Pure, no I/O."""
for ch in s:
if ch.isspace():
return True
o = ord(ch)
if o < 0x20 or o == 0x7f:
return True
return False
def _validate_target(target, run_id):
"""Shared PURE target-schema validation. Returns reject codes (empty == pass).
Called by BOTH Guard 1 (allowlist) and Guard 4 (delete-fast) so neither guard
calls the other -- separability preserved (Codex finding 5/6, mission C9)."""
if target is None or str(target).strip() == "":
return ["MISSING_TARGET_SCHEMA"]
raw = str(target)
rejects = []
if _has_forbidden_chars(raw):
rejects.append("MALFORMED_SCHEMA_CHARS")
if raw.strip().lower() in PROTECTED_SCHEMAS:
rejects.append("PROTECTED_SCHEMA_TARGET")
if ALLOWLIST_RE.fullmatch(raw) is None:
rejects.append("NON_ALLOWLIST_SCHEMA")
elif run_id is not None and str(run_id).strip() != "":
token = str(run_id).strip().lower()
if token not in raw.lower():
rejects.append("SCHEMA_RUNID_MISMATCH")
return rejects
def allowlist_guard(req):
"""Guard 1 DOT_SCHEMA_WRITE_ALLOWLIST_GUARD. Pure; delegates ONLY to the shared
target helper. Returns reject codes (empty == pass)."""
return _validate_target(req.get("target_schema"), req.get("run_id"))
def delete_fast_guard(req):
"""Guard 4 DOT_STAGING_SCHEMA_DELETE_FAST. Re-checks the SAME allowlist via the
shared helper before any DROP -- does NOT call Guard 1 (separability)."""
return _validate_target(req.get("target_schema"), req.get("run_id"))
def production_untouched_plan():
"""Reference read-only before/after plan: describes the evidence Guard 3 needs."""
return {
"kind": "production_untouched_verify", "method": "read_only_before_after",
"abort_on_drift": True, "surfaces": PROTECTED_SURFACES,
"required_evidence_surfaces": list(REQUIRED_VERIFY_SURFACES),
"note": ("Structural object-inventory equality on public/iu_core + frozen "
"invariants. Append-only tables checked by run write-set = empty, "
"NOT by raw count (they grow from unrelated background flow)."),
}
def production_untouched_verify(evidence):
"""Guard 3 DOT_PRODUCTION_UNTOUCHED_VERIFY -- PURE verdict over EXPLICIT
before/after evidence. NO DB I/O. Returns {verdict, drift, reason, plan}.
- evidence None / not a dict / missing before|after / incomplete -> UNKNOWN
- any required surface before != after -> FAIL
- all required surfaces present and equal -> PASS
A real_run may proceed only on PASS (enforced by the router)."""
plan = production_untouched_plan()
if not isinstance(evidence, dict):
return {"verdict": "UNKNOWN", "drift": [], "reason": "no_evidence_supplied", "plan": plan}
before = evidence.get("before")
after = evidence.get("after")
if not isinstance(before, dict) or not isinstance(after, dict):
return {"verdict": "UNKNOWN", "drift": [], "reason": "missing_before_or_after", "plan": plan}
missing = [s for s in REQUIRED_VERIFY_SURFACES if s not in before or s not in after]
if missing:
return {"verdict": "UNKNOWN", "drift": [], "reason": "incomplete_surfaces",
"missing_surfaces": missing, "plan": plan}
drift = [s for s in REQUIRED_VERIFY_SURFACES if before[s] != after[s]]
if drift:
return {"verdict": "FAIL", "drift": drift, "reason": "production_drift_detected", "plan": plan}
return {"verdict": "PASS", "drift": [], "reason": "all_surfaces_equal", "plan": plan}
def audit_proof(req, reject_codes, write_intent, prod_untouched_verdict=None):
"""Guard 2 DOT_SCHEMA_WRITE_AUDIT_PROOF. Envelope for EVERY decision."""
return {
"dot_code": req.get("dot_code"), "actor": req.get("actor"),
"run_id": req.get("run_id"), "mode": req.get("mode"),
"target_schema": req.get("target_schema"),
"owner_authorization_ref": req.get("owner_authorization_ref"),
"channel": req.get("channel"), "decided_at": "<runtime-stamp>",
"reject_codes": list(reject_codes), "write_intent": list(write_intent),
"production_untouched_verdict": prod_untouched_verdict,
"before_snapshot_ref": "<runtime>" if write_intent else None,
"after_snapshot_ref": "<runtime>" if write_intent else None,
}
def validate_request(req, owner_real_run_gate_open=False):
"""Primary router. PURE decision function. Collects ALL reject codes. No DB I/O.
`owner_real_run_gate_open` is the explicit authority-gate argument -- it is NOT
read from the untrusted `req`. It must be EXACTLY boolean True to permit a
real-run write intent; any non-bool value rejects (INVALID_GATE_TYPE)."""
rejects = []
mode = req.get("mode")
if req.get("dot_code") != DOT_CODE:
rejects.append("WRONG_DOT_CODE")
if mode not in MODES:
rejects.append("UNKNOWN_MODE")
# channel: required, governed only
channel = req.get("channel")
if channel is None or str(channel).strip() == "":
rejects.append("MISSING_CHANNEL")
elif channel in FORBIDDEN_CHANNELS:
rejects.append("FORBIDDEN_MANUAL_CHANNEL")
elif channel not in GOVERNED_CHANNELS:
rejects.append("UNKNOWN_CHANNEL")
if req.get("use_directus_generic_create"):
rejects.append("DIRECTUS_GENERIC_FORBIDDEN")
# actor: required (audit identity)
actor = req.get("actor")
if actor is None or str(actor).strip() == "":
rejects.append("MISSING_ACTOR")
# run_id: required, strict, no control/whitespace chars
run_id = req.get("run_id")
if run_id is None or str(run_id).strip() == "":
rejects.append("MISSING_RUN_ID")
elif _has_forbidden_chars(str(run_id)) or RUN_ID_RE.fullmatch(str(run_id)) is None:
rejects.append("BAD_RUN_ID")
if req.get("copy_production_data"):
rejects.append("PROD_DATA_COPY_FORBIDDEN")
oar = req.get("owner_authorization_ref")
if oar is None or str(oar).strip() == "":
rejects.append("MISSING_OWNER_AUTH")
# target allowlist via shared helper (Guard 1 create / Guard 4 teardown)
if mode in ("teardown_plan", "teardown_real_run"):
rejects.extend(delete_fast_guard(req))
else:
rejects.extend(allowlist_guard(req))
# real-run gate (STRICT boolean) + Guard 3 production-untouched (must PASS)
g3 = None
if mode in REAL_RUN_MODES:
if not isinstance(owner_real_run_gate_open, bool):
rejects.append("INVALID_GATE_TYPE")
elif owner_real_run_gate_open is not True:
rejects.append("REAL_RUN_GATE_CLOSED")
else:
g3 = production_untouched_verify(req.get("production_untouched_evidence"))
if g3["verdict"] == "FAIL":
rejects.append("PROD_UNTOUCHED_FAIL")
elif g3["verdict"] != "PASS":
rejects.append("PROD_UNTOUCHED_UNKNOWN")
# de-dup, preserve order
seen = []
for r in rejects:
if r not in seen:
seen.append(r)
rejects = seen
accepted = (len(rejects) == 0)
writes = []
plan = None
prod_verdict = g3["verdict"] if g3 else None
if accepted:
if mode == "validate_only":
plan = {"kind": "validate_only", "result": "inputs_valid"}
elif mode == "dry_run_plan":
tgt = req["target_schema"]
ddl = [f"CREATE SCHEMA {tgt};"]
ddl += [f"CREATE TABLE {tgt}.{t} (...);" for t in SEVEN_SHELL_TABLES]
plan = {"kind": "dry_run_plan", "target_schema": tgt,
"seven_shell_tables": list(SEVEN_SHELL_TABLES),
"ddl_preview": ddl, "writes_if_executed": len(ddl)}
elif mode == "verify":
plan = production_untouched_verify(req.get("production_untouched_evidence"))
elif mode == "teardown_plan":
tgt = req["target_schema"]
plan = {"kind": "teardown_plan", "ddl_preview": [f"DROP SCHEMA {tgt} CASCADE;"]}
elif mode in REAL_RUN_MODES:
# Reached ONLY when: gate is EXACTLY True AND Guard 3 == PASS AND every
# input is valid. Emits write-INTENT strings only; never a DB call.
tgt = req["target_schema"]
if mode == "real_run":
writes = [f"CREATE SCHEMA {tgt};"] + \
[f"CREATE TABLE {tgt}.{t} (...);" for t in SEVEN_SHELL_TABLES]
else:
writes = [f"DROP SCHEMA {tgt} CASCADE;"]
plan = {"kind": mode, "target_schema": tgt, "GATE": "OWNER_REAL_RUN_OPEN",
"production_untouched_verdict": "PASS",
"note": ("write intent only; execution is a separate runtime step; "
"NOT executed in this macro (HOLD_FOR_OWNER_REAL_RUN)")}
audit = audit_proof(req, rejects, writes, prod_verdict)
return {"accepted": accepted, "mode": mode, "reject_codes": rejects,
"writes": writes, "plan": plan,
"production_untouched_verdict": prod_verdict, "audit": audit}