KB-75A6
dot-iu-cutter v0.5 — Constitution Snapshot-source MARK Entrypoint: Code Diff / Patch (new file cutter_agent/dryrun.py + test; additive, stdlib-only, no DB)
36 min read Revision 1
dot-iu-cutterv0.5constitution-fixturesnapshot-source-markcode-patchnew-filestdlib-onlyno-dbdesign-onlydieu442026-05-18
dot-iu-cutter v0.5 — Constitution Snapshot-source MARK Entrypoint: Code Diff / Patch
Phase:
…_code_authoring· Nature:patch_authored_in_KB__not_committed__not_deployed· Date: 2026-05-18 · doc 2 of 6patch_kind: PURE ADDITION — 2 NEW files; ZERO modification of existing repo files applied_to_real_repo: false ; git_commit: false ; deploy: false module_sha256: f1f42e83ca23ba0b328f79cf04a8391ac699d1b307eb1b22b52c305f2efa1422 test_sha256: 31143968f322433cc5da62fa3ccf2a1fbe1905f461940c789a57cb0a116dc1b4 decision_authority: GPT / User ONLY ; self_advance: PROHIBITED
The patch adds two files under iu-cutter/:
cutter_agent/dryrun.py | 559 ++++++++++++++++++ (new)
tests/test_dryrun_snapshot_mark.py | 211 +++++++++++++ (new)
2 files changed, 770 insertions(+), 0 deletions(-)
0 existing files modified
Apply form (a later, separately-authorized phase only):
git apply is N/A (target is not a git repo); apply = drop the two files at the
paths above. No existing file is touched. python -m cutter_agent.dryrun then
exists; the v0.4 cli.py path is unchanged.
1. NEW FILE — cutter_agent/dryrun.py (verbatim, embed exactly)
#!/usr/bin/env python3
"""dot-iu-cutter v0.5 — snapshot-source MARK dry-run entrypoint.
``python -m cutter_agent.dryrun --mode mark-manifest-only ...``
HARD GUARANTEES (import-isolated; verified by
tests/test_dryrun_snapshot_mark.py::TestNoDbImportIsolation):
* This module imports ONLY the Python standard library. It does NOT import
cutter_agent.db_adapter / phases / ledger / signal. It therefore CANNOT
open a DB connection, write a cutter_governance row, CUT, VERIFY, call
fn_iu_create, or write Directus/vector. Output is artifact files only.
* ``mark-manifest-only`` is the ONLY mode. ``--no-db-write/--no-cut/
--no-verify`` are accepted and asserted; any other mode is refused.
* The input is the PINNED normalized snapshot artifact. The snapshot
BEGIN/END region is rehashed BEFORE any parse; mismatch => ABORT.
* Fail-closed: unknown marker, duplicate address, span overlap, uncovered
body line, orphan section, malformed heading, checksum/length/marker
mismatch, or non-deterministic re-run => BLOCKED, no/partial output
quarantined, non-zero exit. BLOCKED is always preferred over a guessed
PASS.
Implements the GPT-ratified design package
``v0.5-constitution-snapshot-source-mark-dryrun-entrypoint-design`` and the
OD-G3 ruling (emit NGUYEN_TAC + KIEN_TRUC_SECTION + DIEU; DIEU = floor).
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import re
import sys
from pathlib import Path
PARSER_REFIMPL = "nuxt-incomex-portal-constitution-v1.refimpl.r1"
GRAMMAR_PROFILE = "incomex-architecture-constitution-v4"
ADDRESS_TEMPLATE = "at.icx.const.v4"
BEGIN_SENTINEL = "<<<BEGIN-NORMALIZED-CONTENT-DO-NOT-EDIT"
END_SENTINEL = "END-NORMALIZED-CONTENT-DO-NOT-EDIT>>>"
# 4 ratified status markers (grammar_profile_status_marker, LIVE).
MARKERS = {"✅": "enacted", "\U0001F4CB": "controlled_draft",
"\U0001F4DD": "draft", "⛔": "obsolete"}
EXCLUSION_REASON = {
"controlled_draft": "controlled_draft_deferred",
"draft": "draft_excluded_by_enacted_only",
"obsolete": "obsolete_excluded",
}
# Exact zone-entry headers (verbatim from the pinned snapshot region).
Z2_H = "15 NGUYÊN TẮC NỀN TẢNG — CẤM VI PHẠM"
Z3_H = ("KIẾN TRÚC HẠ TẦNG DỮ LIỆU — 4 DATABASE "
"+ 3 LỚP NÃO-KHO-CỔNG (BỔ SUNG S176)")
Z4_H = "2 CHIỀU QUẢN LÝ"
Z5_H = "MỤC LỤC LUẬT"
Z6_H = "CHANGELOG"
ZONE_HEADERS = [Z2_H, Z3_H, Z4_H, Z5_H, Z6_H]
NGUYEN_TAC_COLHDR = ["#", "Nguyên tắc", "Nghĩa", "Hệ quả"]
CATALOG_COLVOCAB = {"Điều", "Tên", "File", "Ghi chú",
"Lý do"}
NT_ID_RE = re.compile(r"^([1-9]|1[0-5])$")
KT_SEC_RE = re.compile(r"^([A-C])\.\s+(.+)$")
DIEU_ID_RE = re.compile(r"^(—|0-S/M/L|\d+(?:-[A-Z])?)$")
GROUP_HDR_RE = re.compile(
r"^(?P<label>.+?)\s+—\s+(?P<m>[✅\U0001F4CB\U0001F4DD⛔])"
r"(?:\s+BAN HÀNH)?$")
POINTER_RE = re.compile(r"^→\s")
class FailClosed(Exception):
"""Any guard / determinism / coverage failure. -> BLOCKED."""
def _sha(s: str) -> str:
return hashlib.sha256(s.encode("utf-8")).hexdigest()
# ----------------------------------------------------------------- gate
def extract_region(artifact_text: str) -> str:
"""Bytes strictly between the BEGIN/END sentinel lines (D-SENTINEL).
Sentinels excluded; no added trailing newline. Each sentinel must occur
exactly once.
"""
if artifact_text.count(BEGIN_SENTINEL) != 1:
raise FailClosed("snapshot: BEGIN sentinel not present exactly once")
if artifact_text.count(END_SENTINEL) != 1:
raise FailClosed("snapshot: END sentinel not present exactly once")
i = artifact_text.index(BEGIN_SENTINEL)
nl = artifact_text.index("\n", i)
start = nl + 1
k = artifact_text.index(END_SENTINEL, start)
region = artifact_text[start:k]
if region.endswith("\n"):
region = region[:-1]
return region
def snapshot_gate(region: str, expect_sha: str, expect_len: int,
expect_markers: dict) -> dict:
got_sha = _sha(region)
got_len = len(region)
got_markers = {v: region.count(k) for k, v in MARKERS.items()}
if got_sha != expect_sha:
raise FailClosed(
f"snapshot rehash mismatch: got {got_sha} != {expect_sha} "
f"(ABORT before parse)")
if got_len != expect_len:
raise FailClosed(
f"snapshot length mismatch: got {got_len} != {expect_len}")
if got_markers != expect_markers:
raise FailClosed(
f"marker census mismatch: got {got_markers} != {expect_markers}")
return {"region_sha256": got_sha, "region_length": got_len,
"marker_counts": got_markers}
# -------------------------------------------------------------- matchers
def zone_router(lines: list[str]) -> list[str]:
"""mc.icx.zone_router — assign each line a zone, fail-closed."""
for h in ZONE_HEADERS:
if lines.count(h) != 1:
raise FailClosed(
f"zone header not present exactly once: {h!r} "
f"(malformed document)")
idx = {h: lines.index(h) for h in ZONE_HEADERS}
order = [idx[h] for h in ZONE_HEADERS]
if order != sorted(order):
raise FailClosed("zone headers out of document order (malformed)")
zones = []
cur = "Z1"
rev = {idx[Z2_H]: "Z2", idx[Z3_H]: "Z3", idx[Z4_H]: "Z4",
idx[Z5_H]: "Z5", idx[Z6_H]: "Z6"}
for n in range(len(lines)):
if n in rev:
cur = rev[n]
zones.append(cur)
return zones
def _span(lines, lo, hi):
"""Inclusive line-index range -> source_span dict over the region."""
text = "\n".join(lines[lo:hi + 1])
return {"line_start": lo, "line_end": hi,
"span_sha256": _sha(text)}, text
def parse_nguyen_tac(lines, zones):
"""mc.icx.nguyen_tac — 15 principle records (level NGUYEN_TAC)."""
z2 = [n for n, z in enumerate(zones) if z == "Z2"]
if not z2:
raise FailClosed("NGUYEN_TAC zone empty (malformed)")
start = z2[0]
if lines[start] != Z2_H:
raise FailClosed("NGUYEN_TAC: zone-entry header missing")
p = start + 1
for col in NGUYEN_TAC_COLHDR:
if lines[p] != col:
raise FailClosed(
f"NGUYEN_TAC malformed column header: {lines[p]!r} != {col!r}")
p += 1
units, struct, unit_lines = [], list(range(start, p)), []
expect = 1
while p < len(lines) and zones[p] == "Z2" and not POINTER_RE.match(
lines[p]):
m = NT_ID_RE.match(lines[p])
if not m or int(lines[p]) != expect:
raise FailClosed(
f"NGUYEN_TAC malformed/out-of-order id near line {p}: "
f"{lines[p]!r} (expected {expect})")
idl = p
if p + 3 >= len(lines):
raise FailClosed("NGUYEN_TAC truncated record (malformed)")
name, nghia, hequa = lines[p + 1], lines[p + 2], lines[p + 3]
sp, _ = _span(lines, idl, p + 3)
units.append({
"level": "NGUYEN_TAC", "unit_kind": "principle",
"number": str(expect), "title": name,
"heading": lines[idl],
"normalized_text": "\n".join([name, nghia, hequa]),
"source_span": sp, "status_marker_observed": "inherited",
"status_basis": "tier_0_document_promulgation",
"cut_reason": "independent_principle"})
unit_lines += list(range(idl, p + 4))
p += 4
expect += 1
if expect != 16:
raise FailClosed(
f"NGUYEN_TAC expected 15 principles, got {expect - 1}")
# residual Z2 lines (the two "→ " cross-ref pointers) = structural
struct += [n for n in range(p, len(lines)) if zones[n] == "Z2"]
return units, unit_lines, struct
def parse_kien_truc(lines, zones):
"""mc.icx.kien_truc_section — lettered sections A/B/C."""
z3 = [n for n, z in enumerate(zones) if z == "Z3"]
if not z3:
raise FailClosed("KIEN_TRUC zone empty (malformed)")
start = z3[0]
heads = [n for n in z3 if KT_SEC_RE.match(lines[n])]
if [KT_SEC_RE.match(lines[n]).group(1) for n in heads] != ["A", "B", "C"]:
raise FailClosed("KIEN_TRUC sections != [A,B,C] (malformed)")
units, unit_lines = [], []
# everything in Z3 before the first section header = container/boilerplate
struct = list(range(start, heads[0]))
bounds = heads + [z3[-1] + 1]
for i, hn in enumerate(heads):
end = bounds[i + 1] - 1
m = KT_SEC_RE.match(lines[hn])
sp, body = _span(lines, hn, end)
units.append({
"level": "KIEN_TRUC_SECTION", "unit_kind": "architecture_section",
"number": m.group(1), "title": m.group(2),
"heading": lines[hn], "normalized_text": body,
"source_span": sp, "status_marker_observed": "inherited",
"status_basis": "tier_0_document_promulgation",
"cut_reason": "independent_section"})
unit_lines += list(range(hn, end + 1))
return units, unit_lines, struct
def parse_dieu(lines, zones):
"""mc.icx.dieu + status_marker_detector — catalog rows (level DIEU)."""
z5 = [n for n, z in enumerate(zones) if z == "Z5"]
if not z5:
raise FailClosed("DIEU zone empty (malformed)")
start, last = z5[0], z5[-1]
if lines[start] != Z5_H:
raise FailClosed("DIEU: zone-entry header missing")
units, struct, unit_lines = [], [start], [] # Z5 header = structural
p = start + 1
cur_group = None
while p <= last:
gm = GROUP_HDR_RE.match(lines[p])
if gm:
cur_group = {"label": gm.group("label"),
"marker": gm.group("m"),
"status": MARKERS[gm.group("m")]}
struct.append(p)
p += 1
cols = []
while p <= last and lines[p] in CATALOG_COLVOCAB:
cols.append(lines[p])
struct.append(p)
p += 1
if not cols:
raise FailClosed(
f"DIEU group {cur_group['label']!r} missing column "
f"headers (malformed)")
cur_group["has_id"] = "Điều" in cols
cur_group["ncols"] = len(cols)
continue
if cur_group is None:
raise FailClosed(
f"DIEU row before any group header at line {p} "
f"(orphan section)")
# a row
nc = cur_group["ncols"]
if cur_group["has_id"]:
if not DIEU_ID_RE.match(lines[p]):
raise FailClosed(
f"DIEU malformed id token at line {p}: {lines[p]!r}")
idl = p
if p + nc - 1 > last:
raise FailClosed("DIEU truncated row (malformed)")
dieu_id = lines[p]
cells = lines[p + 1:p + nc]
ghi_chu = cells[-1] if nc >= 3 else (cells[-1] if cells else "")
title = cells[0] if cells else ""
end = p + nc - 1
else: # obsolete group: (Tên, Lý do), no id
idl = p
if p + nc - 1 > last:
raise FailClosed("DIEU truncated obsolete row (malformed)")
dieu_id = None
cells = lines[p:p + nc]
title = cells[0]
ghi_chu = cells[-1]
end = p + nc - 1
# status cascade: tier_2 explicit row marker overrides tier_1 group
row_marker = None
if ghi_chu and ghi_chu[0] in MARKERS:
row_marker = ghi_chu[0]
if row_marker:
eff = MARKERS[row_marker]
basis = "tier_2_explicit_row_marker"
observed = row_marker
else:
eff = cur_group["status"]
basis = "tier_1_group_header"
observed = "inherited:" + cur_group["marker"]
sp, body = _span(lines, idl, end)
units.append({
"level": "DIEU", "unit_kind":
("pointer_row" if dieu_id == "—"
else "obsolete_entry" if dieu_id is None else "dieu_law"),
"number": dieu_id, "title": title,
"heading": lines[idl], "normalized_text": body,
"source_span": sp, "status_marker_observed": observed,
"effective_status": eff, "status_basis": basis,
"group_label": cur_group["label"],
"cut_reason": ("pointer_reference" if dieu_id == "—"
else "catalog_law_entry")})
unit_lines += list(range(idl, end + 1))
p = end + 1
return units, unit_lines, struct
# ------------------------------------------------------------- addresses
def canonical_address(docprefix: str, u: dict) -> str:
lv = u["level"]
if lv == "NGUYEN_TAC":
return f"{docprefix}/NT-{u['number']}"
if lv == "KIEN_TRUC_SECTION":
return f"{docprefix}/KT-{u['number']}"
nid = u["number"]
if nid == "—":
return f"{docprefix}/DIEU-TERMINOLOGY"
if nid is None: # obsolete entry, no id -> deterministic slug
slug = re.sub(r"[^A-Za-z0-9]+", "-",
u["title"]).strip("-").upper()[:40]
return f"{docprefix}/DIEU-OBSOLETE-{slug}"
return f"{docprefix}/DIEU-{nid.replace('/', '-')}"
# --------------------------------------------------------------- emitter
def build_manifest(region: str, gate: dict, args) -> dict:
lines = region.split("\n")
zones = zone_router(lines)
nt, ntu, nts = parse_nguyen_tac(lines, zones)
kt, ktu, kts = parse_kien_truc(lines, zones)
dv, dvu, dvs = parse_dieu(lines, zones)
all_units = nt + kt + dv
# NGUYEN_TAC / KIEN_TRUC_SECTION effective_status = tier_0 = enacted
for u in nt + kt:
u["effective_status"] = "enacted"
# Rigorous coverage proof: unit-span set and structural-noncontent set
# are derived INDEPENDENTLY; they must be disjoint and together cover
# every line (no gap = no silent drop; no intersection = no overlap).
nall = len(lines)
unit_seq = ntu + ktu + dvu
unit_set = set(unit_seq)
if len(unit_seq) != len(unit_set):
dup = sorted({x for x in unit_seq if unit_seq.count(x) > 1})
raise FailClosed(f"span overlap on lines {dup[:10]} (no double-cut)")
zone_hdr = {lines.index(h) for h in ZONE_HEADERS}
structural = (set(nts) | set(kts) | set(dvs) | zone_hdr
| {n for n, z in enumerate(zones) if z in ("Z1", "Z4",
"Z6")})
inter = unit_set & structural
if inter:
raise FailClosed(
f"unit/noncontent span overlap on {sorted(inter)[:10]}")
if unit_set | structural != set(range(nall)):
missing = sorted(set(range(nall)) - (unit_set | structural))
raise FailClosed(
f"uncovered body text at lines {missing[:10]} (silent drop)")
boiler = sorted(structural)
candidates, excluded = [], []
for u in all_units:
u["canonical_address"] = canonical_address(args.docprefix, u)
u["provenance"] = {
"source_document_version_id": args.source_version_id,
"snapshot_artifact_path": args.snapshot_artifact,
"snapshot_region_sha256": gate["region_sha256"],
"parser_reference_implementation": PARSER_REFIMPL,
"grammar_profile": GRAMMAR_PROFILE}
if u["effective_status"] == "enacted":
candidates.append(u)
else:
u["exclusion_reason"] = EXCLUSION_REASON[u["effective_status"]]
u["emitted_as"] = "EXCLUDED"
excluded.append(u)
# address uniqueness across candidates AND excluded
addrs = [u["canonical_address"] for u in candidates + excluded]
dups = sorted({a for a in addrs if addrs.count(a) > 1})
if dups:
raise FailClosed(f"duplicate canonical_address: {dups} (collision)")
units_sorted = sorted(candidates + excluded,
key=lambda u: u["source_span"]["line_start"])
digest_body = json.dumps(
[[u["canonical_address"], u["level"], u["effective_status"],
u["source_span"]["span_sha256"]] for u in units_sorted],
ensure_ascii=False, sort_keys=True, separators=(",", ":"))
manifest_digest = _sha(digest_body)
header = {
"generated_for": "incomex-constitution",
"source_document_version_id": args.source_version_id,
"snapshot_artifact_path": args.snapshot_artifact,
"snapshot_region_sha256": gate["region_sha256"],
"snapshot_region_length": gate["region_length"],
"marker_census_observed": gate["marker_counts"],
"grammar_profile": GRAMMAR_PROFILE,
"address_template": ADDRESS_TEMPLATE,
"parser_reference_implementation": PARSER_REFIMPL,
"docprefix": args.docprefix, "scope_policy": args.scope,
"mode": args.mode, "db_write": "NONE",
"candidate_count": len(candidates),
"excluded_count": len(excluded),
"noncontent_count": len(boiler),
"manifest_digest_sha256": manifest_digest}
return {"manifest_header": header, "candidates": candidates,
"excluded": excluded, "noncontent_lines": boiler,
"_lines_total": len(lines)}
def reconstruction_ok(region: str, manifest: dict) -> bool:
lines = region.split("\n")
covered = set()
for u in manifest["candidates"] + manifest["excluded"]:
s = u["source_span"]
covered |= set(range(s["line_start"], s["line_end"] + 1))
covered |= set(manifest["noncontent_lines"])
return covered == set(range(len(lines)))
# ------------------------------------------------------------------ main
def _write(out: Path, name: str, obj) -> None:
p = out / name
if name.endswith(".json"):
p.write_text(json.dumps(obj, ensure_ascii=False, indent=2),
encoding="utf-8")
else:
p.write_text(obj, encoding="utf-8")
def main(argv=None) -> int:
ap = argparse.ArgumentParser(
prog="cutter_agent.dryrun",
description="no-DB-write snapshot-source MARK manifest dry-run")
ap.add_argument("--mode", required=True)
ap.add_argument("--no-db-write", action="store_true")
ap.add_argument("--no-cut", action="store_true")
ap.add_argument("--no-verify", action="store_true")
ap.add_argument("--fail-closed", action="store_true")
ap.add_argument("--source-version-id", required=True)
ap.add_argument("--snapshot-artifact", required=True)
ap.add_argument("--expect-region-sha", required=True)
ap.add_argument("--expect-length", type=int, required=True)
ap.add_argument("--expect-markers", required=True)
ap.add_argument("--grammar-profile", default=GRAMMAR_PROFILE)
ap.add_argument("--parser-refimpl", default=PARSER_REFIMPL)
ap.add_argument("--scope", default="enacted_only")
ap.add_argument("--docprefix", default="ICX-CONST")
ap.add_argument("--out-dir", required=True)
ap.add_argument("--emit", default="")
args = ap.parse_args(argv)
# mode / safety guards (fail-closed) -------------------------------
if args.mode != "mark-manifest-only":
sys.stderr.write("REFUSED: only --mode mark-manifest-only is "
"supported (no CUT/VERIFY/DB).\n")
return 2
if not (args.no_db_write and args.no_cut and args.no_verify):
sys.stderr.write("REFUSED: --no-db-write --no-cut --no-verify are "
"mandatory for this entrypoint.\n")
return 2
if args.scope != "enacted_only":
sys.stderr.write("REFUSED: only scope enacted_only is supported.\n")
return 2
for bad in ("PG_DSN", "DATABASE_URL", "DIRECTUS_URL", "PGPASSWORD"):
if os.environ.get(bad):
sys.stderr.write(
f"REFUSED: {bad} is set; this entrypoint never reads a DB "
f"credential. Unset it and retry.\n")
return 2
em = {}
for tok in args.expect_markers.split(","):
k, _, v = tok.partition("=")
em[k.strip()] = int(v)
out = Path(args.out_dir)
try:
artifact_text = Path(args.snapshot_artifact).read_text(
encoding="utf-8")
region = extract_region(artifact_text)
gate = snapshot_gate(region, args.expect_region_sha,
args.expect_length, em)
manifest = build_manifest(region, gate, args)
if not reconstruction_ok(region, manifest):
raise FailClosed("reconstruction failed (silent drop detected)")
# determinism: rebuild and compare digest
m2 = build_manifest(region, gate, args)
d1 = manifest["manifest_header"]["manifest_digest_sha256"]
d2 = m2["manifest_header"]["manifest_digest_sha256"]
if d1 != d2:
raise FailClosed(f"determinism mismatch: {d1} != {d2}")
except FailClosed as e:
out.mkdir(parents=True, exist_ok=True)
_write(out, "dryrun_report.md",
f"# dot-iu-cutter v0.5 Constitution dry-run\n\n"
f"status: BLOCKED\nreason: {e}\n"
f"production_touched: false\ndb_write: NONE\n")
sys.stderr.write(f"BLOCKED: {e}\n")
return 3
out.mkdir(parents=True, exist_ok=True)
hdr = manifest["manifest_header"]
_write(out, "manifest.json", manifest)
review = {"coverage_closed": True, "no_overlap": True,
"address_unique": True,
"levels_present": sorted({u["level"] for u in
manifest["candidates"]}),
"dieu_44_excluded": any(
u["number"] == "44" and
u["effective_status"] == "controlled_draft"
for u in manifest["excluded"]),
"candidate_count": hdr["candidate_count"],
"excluded_count": hdr["excluded_count"]}
_write(out, "review_evaluation.json", review)
_write(out, "coverage_proof.json", {
"region_length": hdr["snapshot_region_length"],
"lines_total": manifest["_lines_total"],
"candidate_lines": sum(
u["source_span"]["line_end"] - u["source_span"]["line_start"] + 1
for u in manifest["candidates"]),
"excluded_lines": sum(
u["source_span"]["line_end"] - u["source_span"]["line_start"] + 1
for u in manifest["excluded"]),
"noncontent_lines": len(manifest["noncontent_lines"]),
"reconstruction_ok": True})
_write(out, "determinism_digest.md",
f"# determinism digest\n\n"
f"manifest_digest_sha256: {hdr['manifest_digest_sha256']}\n"
f"re_run_equal: true\n")
_write(out, "dryrun_report.md",
f"# dot-iu-cutter v0.5 Constitution dry-run\n\n"
f"status: PASS\nmode: {hdr['mode']}\ndb_write: NONE\n"
f"production_touched: false\n"
f"candidate_count: {hdr['candidate_count']}\n"
f"excluded_count: {hdr['excluded_count']}\n"
f"manifest_digest_sha256: {hdr['manifest_digest_sha256']}\n")
print(json.dumps({"status": "PASS",
"candidate_count": hdr["candidate_count"],
"excluded_count": hdr["excluded_count"],
"manifest_digest_sha256":
hdr["manifest_digest_sha256"],
"production_touched": False, "db_write": "NONE"},
ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
2. NEW FILE — tests/test_dryrun_snapshot_mark.py (verbatim)
"""Unit/integration tests for cutter_agent.dryrun (no DB, no gated dry-run).
These exercise: snapshot gate, matchers, status cascade, coverage,
determinism, fail-closed negatives, and import-isolation. They do NOT
emit the official KB first-dry-run artifacts (that is the separately
GPT-gated milestone). The Constitution snapshot fixture
`constitution-normalized-17660443e0f23e99.md` is a byte-exact local copy
of the KB-pinned artifact (region sha256 == 17660443…cae80c).
"""
import importlib
import io
import json
import sys
import unittest
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from cutter_agent import dryrun as D # noqa: E402
ART = Path(__file__).resolve().parents[1] / \
"constitution-normalized-17660443e0f23e99.md"
SHA = "17660443e0f23e994e1807cf8e22920951a9e70c598956dbd0e752f4f5cae80c"
EM = {"enacted": 19, "controlled_draft": 1, "draft": 1, "obsolete": 1}
class _Args:
source_version_id = "icxconst-008a06ace23a96ea6cd456146e805c97"
snapshot_artifact = "knowledge/.../snap.md"
docprefix = "ICX-CONST"
scope = "enacted_only"
mode = "mark-manifest-only"
class TestGate(unittest.TestCase):
def setUp(self):
self.region = D.extract_region(ART.read_text(encoding="utf-8"))
def test_region_identity_exact(self):
g = D.snapshot_gate(self.region, SHA, 17522, EM)
self.assertEqual(g["region_sha256"], SHA)
self.assertEqual(g["region_length"], 17522)
self.assertEqual(g["marker_counts"], EM)
def test_gate_fails_on_sha_drift(self):
with self.assertRaises(D.FailClosed):
D.snapshot_gate(self.region, "deadbeef" * 8, 17522, EM)
def test_gate_fails_on_length_drift(self):
with self.assertRaises(D.FailClosed):
D.snapshot_gate(self.region, SHA, 17521, EM)
def test_gate_fails_on_marker_drift(self):
with self.assertRaises(D.FailClosed):
D.snapshot_gate(self.region, SHA, 17522,
{"enacted": 18, "controlled_draft": 1,
"draft": 1, "obsolete": 1})
def test_missing_sentinel_fails(self):
with self.assertRaises(D.FailClosed):
D.extract_region("no sentinels here")
class TestManifest(unittest.TestCase):
def setUp(self):
self.region = D.extract_region(ART.read_text(encoding="utf-8"))
self.gate = D.snapshot_gate(self.region, SHA, 17522, EM)
self.m = D.build_manifest(self.region, self.gate, _Args())
def test_emits_three_levels_dieu_floor(self):
levels = {u["level"] for u in self.m["candidates"]}
self.assertEqual(levels,
{"NGUYEN_TAC", "KIEN_TRUC_SECTION", "DIEU"})
def test_counts_and_range(self):
h = self.m["manifest_header"]
self.assertEqual(
sum(1 for u in self.m["candidates"]
if u["level"] == "NGUYEN_TAC"), 15)
self.assertEqual(
sum(1 for u in self.m["candidates"]
if u["level"] == "KIEN_TRUC_SECTION"), 3)
self.assertTrue(55 <= h["candidate_count"] <= 78)
def test_dieu_44_excluded_controlled_draft_tier2(self):
d44 = [u for u in self.m["excluded"] if u["number"] == "44"]
self.assertEqual(len(d44), 1)
self.assertEqual(d44[0]["effective_status"], "controlled_draft")
self.assertEqual(d44[0]["status_basis"],
"tier_2_explicit_row_marker")
self.assertEqual(d44[0]["exclusion_reason"],
"controlled_draft_deferred")
def test_dieu_34_draft_and_obsolete_excluded(self):
st = sorted(u["effective_status"] for u in self.m["excluded"])
self.assertEqual(st, ["controlled_draft", "draft",
"obsolete", "obsolete"])
def test_no_candidate_is_non_enacted(self):
self.assertTrue(all(u["effective_status"] == "enacted"
for u in self.m["candidates"]))
def test_address_format_and_uniqueness(self):
allu = self.m["candidates"] + self.m["excluded"]
for u in allu:
self.assertTrue(u["canonical_address"].startswith("ICX-CONST/"))
self.assertNotIn("✅", u["canonical_address"])
self.assertNotIn("\U0001F4CB", u["canonical_address"])
addrs = [u["canonical_address"] for u in allu]
self.assertEqual(len(addrs), len(set(addrs)))
def test_coverage_closes_and_reconstructs(self):
self.assertTrue(D.reconstruction_ok(self.region, self.m))
def test_determinism(self):
m2 = D.build_manifest(self.region, self.gate, _Args())
self.assertEqual(
self.m["manifest_header"]["manifest_digest_sha256"],
m2["manifest_header"]["manifest_digest_sha256"])
def test_provenance_binding(self):
for u in self.m["candidates"]:
p = u["provenance"]
self.assertEqual(p["source_document_version_id"],
_Args.source_version_id)
self.assertEqual(p["snapshot_region_sha256"], SHA)
self.assertEqual(p["parser_reference_implementation"],
D.PARSER_REFIMPL)
class TestFailClosedSynthetic(unittest.TestCase):
_NT = "".join(f"{i}\nN{i}\nm{i}\nc{i}\n" for i in range(1, 16))
BASE = ("15 NGUYÊN TẮC NỀN TẢNG — CẤM VI PHẠM\n#\nNguyên tắc\nNghĩa\n"
"Hệ quả\n" + _NT + "→ ptr\n"
+ D.Z3_H + "\nlesson\nA. SEC A — t\nbodyA\n"
"B. SEC B — t\nbodyB\nC. SEC C — t\nbodyC\n"
"2 CHIỀU QUẢN LÝ\nx\nMỤC LỤC LUẬT\n"
"G — ✅\nĐiều\nTên\n9\nLaw9\nCHANGELOG\nVersion\nrowz")
def _mk(self, body):
return ("<<<BEGIN-NORMALIZED-CONTENT-DO-NOT-EDIT\n" + body
+ "\nEND-NORMALIZED-CONTENT-DO-NOT-EDIT>>>\n")
def test_unknown_marker_fails_closed(self):
bad = self.BASE.replace("G — ✅", "G — ✨") # sparkles, unmapped
region = D.extract_region(self._mk(bad))
with self.assertRaises(D.FailClosed):
D.build_manifest(region, {"region_sha256": "x",
"region_length": 0, "marker_counts": {}},
_Args())
def test_orphan_row_before_group_fails_closed(self):
bad = self.BASE.replace("G — ✅\nĐiều\nTên\n", "")
region = D.extract_region(self._mk(bad))
with self.assertRaises(D.FailClosed):
D.build_manifest(region, {"region_sha256": "x",
"region_length": 0, "marker_counts": {}},
_Args())
def test_nt_out_of_order_fails_closed(self):
bad = self.BASE.replace("\n1\nN1\nm1\nc1\n", "\n2\nN1\nm1\nc1\n")
region = D.extract_region(self._mk(bad))
with self.assertRaises(D.FailClosed):
D.build_manifest(region, {"region_sha256": "x",
"region_length": 0, "marker_counts": {}},
_Args())
def test_synthetic_happy_path_cascade(self):
region = D.extract_region(self._mk(self.BASE))
m = D.build_manifest(region, {"region_sha256": "x",
"region_length": 0, "marker_counts": {}},
_Args())
# 15 NT (tier_0), 3 KT (tier_0), 1 DIEU enacted via tier_1 group ✅
self.assertEqual(sum(1 for u in m["candidates"]
if u["level"] == "NGUYEN_TAC"), 15)
self.assertEqual(sum(1 for u in m["candidates"]
if u["level"] == "KIEN_TRUC_SECTION"), 3)
d = [u for u in m["candidates"] if u["level"] == "DIEU"]
self.assertEqual(d[0]["status_basis"], "tier_1_group_header")
self.assertTrue(D.reconstruction_ok(region, m))
class TestNoDbImportIsolation(unittest.TestCase):
def test_module_imports_only_stdlib(self):
import ast
mod = importlib.import_module("cutter_agent.dryrun")
tree = ast.parse(Path(mod.__file__).read_text(encoding="utf-8"))
stdlib = {"argparse", "hashlib", "json", "os", "re", "sys",
"pathlib", "__future__"}
mods = set()
for node in ast.walk(tree):
if isinstance(node, ast.Import):
mods |= {a.name.split(".")[0] for a in node.names}
elif isinstance(node, ast.ImportFrom):
mods.add((node.module or "").split(".")[0])
self.assertTrue(mods <= stdlib,
f"non-stdlib imports present: {mods - stdlib}")
for banned in ("psycopg", "socket", "requests", "sqlalchemy",
"cutter_agent"):
self.assertNotIn(banned, mods)
def test_cli_refuses_wrong_mode(self):
rc = D.main(["--mode", "cut", "--no-db-write", "--no-cut",
"--no-verify", "--source-version-id", "x",
"--snapshot-artifact", "x", "--expect-region-sha", "x",
"--expect-length", "1", "--expect-markers",
"enacted=1", "--out-dir", "/tmp/iucut-x"])
self.assertEqual(rc, 2)
def test_cli_refuses_without_no_db_flags(self):
rc = D.main(["--mode", "mark-manifest-only",
"--source-version-id", "x", "--snapshot-artifact", "x",
"--expect-region-sha", "x", "--expect-length", "1",
"--expect-markers", "enacted=1",
"--out-dir", "/tmp/iucut-x"])
self.assertEqual(rc, 2)
if __name__ == "__main__":
unittest.main(verbosity=2)
3. Notes for reviewers
- the test fixture constitution-normalized-17660443e0f23e99.md is a byte-exact
local copy of the KB-pinned artifact; its region sha256 reproduces
17660443…cae80c exactly (validated, doc 3). It is NOT committed to the repo by
this phase; a later code phase decides whether the fixture ships in tests/ or is
fetched from KB at test time.
- import isolation is STRUCTURAL: dryrun.py imports only argparse/hashlib/json/os/
re/sys/pathlib/__future__. It never imports db_adapter/phases/ledger/signal,
psycopg, socket, requests, or sqlalchemy (AST-asserted in TestNoDbImportIsolation).
- no existing repo file is modified; cli.py / phases.py / db_adapter.py untouched.
- doc 2 of 6; nothing applied/committed/deployed. Self-advance PROHIBITED.
Companion docs: code-authoring-plan (1), test-plan-and-results (3), command-review-package (4), risk-and-rollback-note (5), code-authoring-report (6).