KB-75A6

dot-iu-cutter v0.5 — Constitution Snapshot-source MARK Entrypoint: Code Diff / Patch (new file cutter_agent/dryrun.py + test; additive, stdlib-only, no DB)

36 min read Revision 1
dot-iu-cutterv0.5constitution-fixturesnapshot-source-markcode-patchnew-filestdlib-onlyno-dbdesign-onlydieu442026-05-18

dot-iu-cutter v0.5 — Constitution Snapshot-source MARK Entrypoint: Code Diff / Patch

Phase: …_code_authoring · Nature: patch_authored_in_KB__not_committed__not_deployed · Date: 2026-05-18 · doc 2 of 6

patch_kind: PURE ADDITION — 2 NEW files; ZERO modification of existing repo files
applied_to_real_repo: false ; git_commit: false ; deploy: false
module_sha256: f1f42e83ca23ba0b328f79cf04a8391ac699d1b307eb1b22b52c305f2efa1422
test_sha256:   31143968f322433cc5da62fa3ccf2a1fbe1905f461940c789a57cb0a116dc1b4
decision_authority: GPT / User ONLY ; self_advance: PROHIBITED

The patch adds two files under iu-cutter/:

 cutter_agent/dryrun.py                 | 559 ++++++++++++++++++  (new)
 tests/test_dryrun_snapshot_mark.py     | 211 +++++++++++++       (new)
 2 files changed, 770 insertions(+), 0 deletions(-)
 0 existing files modified

Apply form (a later, separately-authorized phase only): git apply is N/A (target is not a git repo); apply = drop the two files at the paths above. No existing file is touched. python -m cutter_agent.dryrun then exists; the v0.4 cli.py path is unchanged.


1. NEW FILE — cutter_agent/dryrun.py (verbatim, embed exactly)

#!/usr/bin/env python3
"""dot-iu-cutter v0.5 — snapshot-source MARK dry-run entrypoint.

``python -m cutter_agent.dryrun --mode mark-manifest-only ...``

HARD GUARANTEES (import-isolated; verified by
tests/test_dryrun_snapshot_mark.py::TestNoDbImportIsolation):

  * This module imports ONLY the Python standard library. It does NOT import
    cutter_agent.db_adapter / phases / ledger / signal. It therefore CANNOT
    open a DB connection, write a cutter_governance row, CUT, VERIFY, call
    fn_iu_create, or write Directus/vector. Output is artifact files only.
  * ``mark-manifest-only`` is the ONLY mode. ``--no-db-write/--no-cut/
    --no-verify`` are accepted and asserted; any other mode is refused.
  * The input is the PINNED normalized snapshot artifact. The snapshot
    BEGIN/END region is rehashed BEFORE any parse; mismatch => ABORT.
  * Fail-closed: unknown marker, duplicate address, span overlap, uncovered
    body line, orphan section, malformed heading, checksum/length/marker
    mismatch, or non-deterministic re-run => BLOCKED, no/partial output
    quarantined, non-zero exit. BLOCKED is always preferred over a guessed
    PASS.

Implements the GPT-ratified design package
``v0.5-constitution-snapshot-source-mark-dryrun-entrypoint-design`` and the
OD-G3 ruling (emit NGUYEN_TAC + KIEN_TRUC_SECTION + DIEU; DIEU = floor).
"""

from __future__ import annotations

import argparse
import hashlib
import json
import os
import re
import sys
from pathlib import Path

PARSER_REFIMPL = "nuxt-incomex-portal-constitution-v1.refimpl.r1"
GRAMMAR_PROFILE = "incomex-architecture-constitution-v4"
ADDRESS_TEMPLATE = "at.icx.const.v4"

BEGIN_SENTINEL = "<<<BEGIN-NORMALIZED-CONTENT-DO-NOT-EDIT"
END_SENTINEL = "END-NORMALIZED-CONTENT-DO-NOT-EDIT>>>"

# 4 ratified status markers (grammar_profile_status_marker, LIVE).
MARKERS = {"✅": "enacted", "\U0001F4CB": "controlled_draft",
           "\U0001F4DD": "draft", "⛔": "obsolete"}
EXCLUSION_REASON = {
    "controlled_draft": "controlled_draft_deferred",
    "draft": "draft_excluded_by_enacted_only",
    "obsolete": "obsolete_excluded",
}

# Exact zone-entry headers (verbatim from the pinned snapshot region).
Z2_H = "15 NGUYÊN TẮC NỀN TẢNG — CẤM VI PHẠM"
Z3_H = ("KIẾN TRÚC HẠ TẦNG DỮ LIỆU — 4 DATABASE "
        "+ 3 LỚP NÃO-KHO-CỔNG (BỔ SUNG S176)")
Z4_H = "2 CHIỀU QUẢN LÝ"
Z5_H = "MỤC LỤC LUẬT"
Z6_H = "CHANGELOG"
ZONE_HEADERS = [Z2_H, Z3_H, Z4_H, Z5_H, Z6_H]

NGUYEN_TAC_COLHDR = ["#", "Nguyên tắc", "Nghĩa", "Hệ quả"]
CATALOG_COLVOCAB = {"Điều", "Tên", "File", "Ghi chú",
                    "Lý do"}

NT_ID_RE = re.compile(r"^([1-9]|1[0-5])$")
KT_SEC_RE = re.compile(r"^([A-C])\.\s+(.+)$")
DIEU_ID_RE = re.compile(r"^(—|0-S/M/L|\d+(?:-[A-Z])?)$")
GROUP_HDR_RE = re.compile(
    r"^(?P<label>.+?)\s+—\s+(?P<m>[✅\U0001F4CB\U0001F4DD⛔])"
    r"(?:\s+BAN HÀNH)?$")
POINTER_RE = re.compile(r"^→\s")


class FailClosed(Exception):
    """Any guard / determinism / coverage failure. -> BLOCKED."""


def _sha(s: str) -> str:
    return hashlib.sha256(s.encode("utf-8")).hexdigest()


# ----------------------------------------------------------------- gate
def extract_region(artifact_text: str) -> str:
    """Bytes strictly between the BEGIN/END sentinel lines (D-SENTINEL).

    Sentinels excluded; no added trailing newline. Each sentinel must occur
    exactly once.
    """
    if artifact_text.count(BEGIN_SENTINEL) != 1:
        raise FailClosed("snapshot: BEGIN sentinel not present exactly once")
    if artifact_text.count(END_SENTINEL) != 1:
        raise FailClosed("snapshot: END sentinel not present exactly once")
    i = artifact_text.index(BEGIN_SENTINEL)
    nl = artifact_text.index("\n", i)
    start = nl + 1
    k = artifact_text.index(END_SENTINEL, start)
    region = artifact_text[start:k]
    if region.endswith("\n"):
        region = region[:-1]
    return region


def snapshot_gate(region: str, expect_sha: str, expect_len: int,
                  expect_markers: dict) -> dict:
    got_sha = _sha(region)
    got_len = len(region)
    got_markers = {v: region.count(k) for k, v in MARKERS.items()}
    if got_sha != expect_sha:
        raise FailClosed(
            f"snapshot rehash mismatch: got {got_sha} != {expect_sha} "
            f"(ABORT before parse)")
    if got_len != expect_len:
        raise FailClosed(
            f"snapshot length mismatch: got {got_len} != {expect_len}")
    if got_markers != expect_markers:
        raise FailClosed(
            f"marker census mismatch: got {got_markers} != {expect_markers}")
    return {"region_sha256": got_sha, "region_length": got_len,
            "marker_counts": got_markers}


# -------------------------------------------------------------- matchers
def zone_router(lines: list[str]) -> list[str]:
    """mc.icx.zone_router — assign each line a zone, fail-closed."""
    for h in ZONE_HEADERS:
        if lines.count(h) != 1:
            raise FailClosed(
                f"zone header not present exactly once: {h!r} "
                f"(malformed document)")
    idx = {h: lines.index(h) for h in ZONE_HEADERS}
    order = [idx[h] for h in ZONE_HEADERS]
    if order != sorted(order):
        raise FailClosed("zone headers out of document order (malformed)")
    zones = []
    cur = "Z1"
    rev = {idx[Z2_H]: "Z2", idx[Z3_H]: "Z3", idx[Z4_H]: "Z4",
           idx[Z5_H]: "Z5", idx[Z6_H]: "Z6"}
    for n in range(len(lines)):
        if n in rev:
            cur = rev[n]
        zones.append(cur)
    return zones


def _span(lines, lo, hi):
    """Inclusive line-index range -> source_span dict over the region."""
    text = "\n".join(lines[lo:hi + 1])
    return {"line_start": lo, "line_end": hi,
            "span_sha256": _sha(text)}, text


def parse_nguyen_tac(lines, zones):
    """mc.icx.nguyen_tac — 15 principle records (level NGUYEN_TAC)."""
    z2 = [n for n, z in enumerate(zones) if z == "Z2"]
    if not z2:
        raise FailClosed("NGUYEN_TAC zone empty (malformed)")
    start = z2[0]
    if lines[start] != Z2_H:
        raise FailClosed("NGUYEN_TAC: zone-entry header missing")
    p = start + 1
    for col in NGUYEN_TAC_COLHDR:
        if lines[p] != col:
            raise FailClosed(
                f"NGUYEN_TAC malformed column header: {lines[p]!r} != {col!r}")
        p += 1
    units, struct, unit_lines = [], list(range(start, p)), []
    expect = 1
    while p < len(lines) and zones[p] == "Z2" and not POINTER_RE.match(
            lines[p]):
        m = NT_ID_RE.match(lines[p])
        if not m or int(lines[p]) != expect:
            raise FailClosed(
                f"NGUYEN_TAC malformed/out-of-order id near line {p}: "
                f"{lines[p]!r} (expected {expect})")
        idl = p
        if p + 3 >= len(lines):
            raise FailClosed("NGUYEN_TAC truncated record (malformed)")
        name, nghia, hequa = lines[p + 1], lines[p + 2], lines[p + 3]
        sp, _ = _span(lines, idl, p + 3)
        units.append({
            "level": "NGUYEN_TAC", "unit_kind": "principle",
            "number": str(expect), "title": name,
            "heading": lines[idl],
            "normalized_text": "\n".join([name, nghia, hequa]),
            "source_span": sp, "status_marker_observed": "inherited",
            "status_basis": "tier_0_document_promulgation",
            "cut_reason": "independent_principle"})
        unit_lines += list(range(idl, p + 4))
        p += 4
        expect += 1
    if expect != 16:
        raise FailClosed(
            f"NGUYEN_TAC expected 15 principles, got {expect - 1}")
    # residual Z2 lines (the two "→ " cross-ref pointers) = structural
    struct += [n for n in range(p, len(lines)) if zones[n] == "Z2"]
    return units, unit_lines, struct


def parse_kien_truc(lines, zones):
    """mc.icx.kien_truc_section — lettered sections A/B/C."""
    z3 = [n for n, z in enumerate(zones) if z == "Z3"]
    if not z3:
        raise FailClosed("KIEN_TRUC zone empty (malformed)")
    start = z3[0]
    heads = [n for n in z3 if KT_SEC_RE.match(lines[n])]
    if [KT_SEC_RE.match(lines[n]).group(1) for n in heads] != ["A", "B", "C"]:
        raise FailClosed("KIEN_TRUC sections != [A,B,C] (malformed)")
    units, unit_lines = [], []
    # everything in Z3 before the first section header = container/boilerplate
    struct = list(range(start, heads[0]))
    bounds = heads + [z3[-1] + 1]
    for i, hn in enumerate(heads):
        end = bounds[i + 1] - 1
        m = KT_SEC_RE.match(lines[hn])
        sp, body = _span(lines, hn, end)
        units.append({
            "level": "KIEN_TRUC_SECTION", "unit_kind": "architecture_section",
            "number": m.group(1), "title": m.group(2),
            "heading": lines[hn], "normalized_text": body,
            "source_span": sp, "status_marker_observed": "inherited",
            "status_basis": "tier_0_document_promulgation",
            "cut_reason": "independent_section"})
        unit_lines += list(range(hn, end + 1))
    return units, unit_lines, struct


def parse_dieu(lines, zones):
    """mc.icx.dieu + status_marker_detector — catalog rows (level DIEU)."""
    z5 = [n for n, z in enumerate(zones) if z == "Z5"]
    if not z5:
        raise FailClosed("DIEU zone empty (malformed)")
    start, last = z5[0], z5[-1]
    if lines[start] != Z5_H:
        raise FailClosed("DIEU: zone-entry header missing")
    units, struct, unit_lines = [], [start], []  # Z5 header = structural
    p = start + 1
    cur_group = None
    while p <= last:
        gm = GROUP_HDR_RE.match(lines[p])
        if gm:
            cur_group = {"label": gm.group("label"),
                         "marker": gm.group("m"),
                         "status": MARKERS[gm.group("m")]}
            struct.append(p)
            p += 1
            cols = []
            while p <= last and lines[p] in CATALOG_COLVOCAB:
                cols.append(lines[p])
                struct.append(p)
                p += 1
            if not cols:
                raise FailClosed(
                    f"DIEU group {cur_group['label']!r} missing column "
                    f"headers (malformed)")
            cur_group["has_id"] = "Điều" in cols
            cur_group["ncols"] = len(cols)
            continue
        if cur_group is None:
            raise FailClosed(
                f"DIEU row before any group header at line {p} "
                f"(orphan section)")
        # a row
        nc = cur_group["ncols"]
        if cur_group["has_id"]:
            if not DIEU_ID_RE.match(lines[p]):
                raise FailClosed(
                    f"DIEU malformed id token at line {p}: {lines[p]!r}")
            idl = p
            if p + nc - 1 > last:
                raise FailClosed("DIEU truncated row (malformed)")
            dieu_id = lines[p]
            cells = lines[p + 1:p + nc]
            ghi_chu = cells[-1] if nc >= 3 else (cells[-1] if cells else "")
            title = cells[0] if cells else ""
            end = p + nc - 1
        else:  # obsolete group: (Tên, Lý do), no id
            idl = p
            if p + nc - 1 > last:
                raise FailClosed("DIEU truncated obsolete row (malformed)")
            dieu_id = None
            cells = lines[p:p + nc]
            title = cells[0]
            ghi_chu = cells[-1]
            end = p + nc - 1
        # status cascade: tier_2 explicit row marker overrides tier_1 group
        row_marker = None
        if ghi_chu and ghi_chu[0] in MARKERS:
            row_marker = ghi_chu[0]
        if row_marker:
            eff = MARKERS[row_marker]
            basis = "tier_2_explicit_row_marker"
            observed = row_marker
        else:
            eff = cur_group["status"]
            basis = "tier_1_group_header"
            observed = "inherited:" + cur_group["marker"]
        sp, body = _span(lines, idl, end)
        units.append({
            "level": "DIEU", "unit_kind":
                ("pointer_row" if dieu_id == "—"
                 else "obsolete_entry" if dieu_id is None else "dieu_law"),
            "number": dieu_id, "title": title,
            "heading": lines[idl], "normalized_text": body,
            "source_span": sp, "status_marker_observed": observed,
            "effective_status": eff, "status_basis": basis,
            "group_label": cur_group["label"],
            "cut_reason": ("pointer_reference" if dieu_id == "—"
                           else "catalog_law_entry")})
        unit_lines += list(range(idl, end + 1))
        p = end + 1
    return units, unit_lines, struct


# ------------------------------------------------------------- addresses
def canonical_address(docprefix: str, u: dict) -> str:
    lv = u["level"]
    if lv == "NGUYEN_TAC":
        return f"{docprefix}/NT-{u['number']}"
    if lv == "KIEN_TRUC_SECTION":
        return f"{docprefix}/KT-{u['number']}"
    nid = u["number"]
    if nid == "—":
        return f"{docprefix}/DIEU-TERMINOLOGY"
    if nid is None:  # obsolete entry, no id -> deterministic slug
        slug = re.sub(r"[^A-Za-z0-9]+", "-",
                      u["title"]).strip("-").upper()[:40]
        return f"{docprefix}/DIEU-OBSOLETE-{slug}"
    return f"{docprefix}/DIEU-{nid.replace('/', '-')}"


# --------------------------------------------------------------- emitter
def build_manifest(region: str, gate: dict, args) -> dict:
    lines = region.split("\n")
    zones = zone_router(lines)
    nt, ntu, nts = parse_nguyen_tac(lines, zones)
    kt, ktu, kts = parse_kien_truc(lines, zones)
    dv, dvu, dvs = parse_dieu(lines, zones)
    all_units = nt + kt + dv

    # NGUYEN_TAC / KIEN_TRUC_SECTION effective_status = tier_0 = enacted
    for u in nt + kt:
        u["effective_status"] = "enacted"

    # Rigorous coverage proof: unit-span set and structural-noncontent set
    # are derived INDEPENDENTLY; they must be disjoint and together cover
    # every line (no gap = no silent drop; no intersection = no overlap).
    nall = len(lines)
    unit_seq = ntu + ktu + dvu
    unit_set = set(unit_seq)
    if len(unit_seq) != len(unit_set):
        dup = sorted({x for x in unit_seq if unit_seq.count(x) > 1})
        raise FailClosed(f"span overlap on lines {dup[:10]} (no double-cut)")
    zone_hdr = {lines.index(h) for h in ZONE_HEADERS}
    structural = (set(nts) | set(kts) | set(dvs) | zone_hdr
                  | {n for n, z in enumerate(zones) if z in ("Z1", "Z4",
                                                             "Z6")})
    inter = unit_set & structural
    if inter:
        raise FailClosed(
            f"unit/noncontent span overlap on {sorted(inter)[:10]}")
    if unit_set | structural != set(range(nall)):
        missing = sorted(set(range(nall)) - (unit_set | structural))
        raise FailClosed(
            f"uncovered body text at lines {missing[:10]} (silent drop)")
    boiler = sorted(structural)

    candidates, excluded = [], []
    for u in all_units:
        u["canonical_address"] = canonical_address(args.docprefix, u)
        u["provenance"] = {
            "source_document_version_id": args.source_version_id,
            "snapshot_artifact_path": args.snapshot_artifact,
            "snapshot_region_sha256": gate["region_sha256"],
            "parser_reference_implementation": PARSER_REFIMPL,
            "grammar_profile": GRAMMAR_PROFILE}
        if u["effective_status"] == "enacted":
            candidates.append(u)
        else:
            u["exclusion_reason"] = EXCLUSION_REASON[u["effective_status"]]
            u["emitted_as"] = "EXCLUDED"
            excluded.append(u)

    # address uniqueness across candidates AND excluded
    addrs = [u["canonical_address"] for u in candidates + excluded]
    dups = sorted({a for a in addrs if addrs.count(a) > 1})
    if dups:
        raise FailClosed(f"duplicate canonical_address: {dups} (collision)")

    units_sorted = sorted(candidates + excluded,
                          key=lambda u: u["source_span"]["line_start"])
    digest_body = json.dumps(
        [[u["canonical_address"], u["level"], u["effective_status"],
          u["source_span"]["span_sha256"]] for u in units_sorted],
        ensure_ascii=False, sort_keys=True, separators=(",", ":"))
    manifest_digest = _sha(digest_body)

    header = {
        "generated_for": "incomex-constitution",
        "source_document_version_id": args.source_version_id,
        "snapshot_artifact_path": args.snapshot_artifact,
        "snapshot_region_sha256": gate["region_sha256"],
        "snapshot_region_length": gate["region_length"],
        "marker_census_observed": gate["marker_counts"],
        "grammar_profile": GRAMMAR_PROFILE,
        "address_template": ADDRESS_TEMPLATE,
        "parser_reference_implementation": PARSER_REFIMPL,
        "docprefix": args.docprefix, "scope_policy": args.scope,
        "mode": args.mode, "db_write": "NONE",
        "candidate_count": len(candidates),
        "excluded_count": len(excluded),
        "noncontent_count": len(boiler),
        "manifest_digest_sha256": manifest_digest}
    return {"manifest_header": header, "candidates": candidates,
            "excluded": excluded, "noncontent_lines": boiler,
            "_lines_total": len(lines)}


def reconstruction_ok(region: str, manifest: dict) -> bool:
    lines = region.split("\n")
    covered = set()
    for u in manifest["candidates"] + manifest["excluded"]:
        s = u["source_span"]
        covered |= set(range(s["line_start"], s["line_end"] + 1))
    covered |= set(manifest["noncontent_lines"])
    return covered == set(range(len(lines)))


# ------------------------------------------------------------------ main
def _write(out: Path, name: str, obj) -> None:
    p = out / name
    if name.endswith(".json"):
        p.write_text(json.dumps(obj, ensure_ascii=False, indent=2),
                     encoding="utf-8")
    else:
        p.write_text(obj, encoding="utf-8")


def main(argv=None) -> int:
    ap = argparse.ArgumentParser(
        prog="cutter_agent.dryrun",
        description="no-DB-write snapshot-source MARK manifest dry-run")
    ap.add_argument("--mode", required=True)
    ap.add_argument("--no-db-write", action="store_true")
    ap.add_argument("--no-cut", action="store_true")
    ap.add_argument("--no-verify", action="store_true")
    ap.add_argument("--fail-closed", action="store_true")
    ap.add_argument("--source-version-id", required=True)
    ap.add_argument("--snapshot-artifact", required=True)
    ap.add_argument("--expect-region-sha", required=True)
    ap.add_argument("--expect-length", type=int, required=True)
    ap.add_argument("--expect-markers", required=True)
    ap.add_argument("--grammar-profile", default=GRAMMAR_PROFILE)
    ap.add_argument("--parser-refimpl", default=PARSER_REFIMPL)
    ap.add_argument("--scope", default="enacted_only")
    ap.add_argument("--docprefix", default="ICX-CONST")
    ap.add_argument("--out-dir", required=True)
    ap.add_argument("--emit", default="")
    args = ap.parse_args(argv)

    # mode / safety guards (fail-closed) -------------------------------
    if args.mode != "mark-manifest-only":
        sys.stderr.write("REFUSED: only --mode mark-manifest-only is "
                         "supported (no CUT/VERIFY/DB).\n")
        return 2
    if not (args.no_db_write and args.no_cut and args.no_verify):
        sys.stderr.write("REFUSED: --no-db-write --no-cut --no-verify are "
                         "mandatory for this entrypoint.\n")
        return 2
    if args.scope != "enacted_only":
        sys.stderr.write("REFUSED: only scope enacted_only is supported.\n")
        return 2
    for bad in ("PG_DSN", "DATABASE_URL", "DIRECTUS_URL", "PGPASSWORD"):
        if os.environ.get(bad):
            sys.stderr.write(
                f"REFUSED: {bad} is set; this entrypoint never reads a DB "
                f"credential. Unset it and retry.\n")
            return 2

    em = {}
    for tok in args.expect_markers.split(","):
        k, _, v = tok.partition("=")
        em[k.strip()] = int(v)

    out = Path(args.out_dir)
    try:
        artifact_text = Path(args.snapshot_artifact).read_text(
            encoding="utf-8")
        region = extract_region(artifact_text)
        gate = snapshot_gate(region, args.expect_region_sha,
                             args.expect_length, em)
        manifest = build_manifest(region, gate, args)
        if not reconstruction_ok(region, manifest):
            raise FailClosed("reconstruction failed (silent drop detected)")
        # determinism: rebuild and compare digest
        m2 = build_manifest(region, gate, args)
        d1 = manifest["manifest_header"]["manifest_digest_sha256"]
        d2 = m2["manifest_header"]["manifest_digest_sha256"]
        if d1 != d2:
            raise FailClosed(f"determinism mismatch: {d1} != {d2}")
    except FailClosed as e:
        out.mkdir(parents=True, exist_ok=True)
        _write(out, "dryrun_report.md",
               f"# dot-iu-cutter v0.5 Constitution dry-run\n\n"
               f"status: BLOCKED\nreason: {e}\n"
               f"production_touched: false\ndb_write: NONE\n")
        sys.stderr.write(f"BLOCKED: {e}\n")
        return 3

    out.mkdir(parents=True, exist_ok=True)
    hdr = manifest["manifest_header"]
    _write(out, "manifest.json", manifest)
    review = {"coverage_closed": True, "no_overlap": True,
              "address_unique": True,
              "levels_present": sorted({u["level"] for u in
                                        manifest["candidates"]}),
              "dieu_44_excluded": any(
                  u["number"] == "44" and
                  u["effective_status"] == "controlled_draft"
                  for u in manifest["excluded"]),
              "candidate_count": hdr["candidate_count"],
              "excluded_count": hdr["excluded_count"]}
    _write(out, "review_evaluation.json", review)
    _write(out, "coverage_proof.json", {
        "region_length": hdr["snapshot_region_length"],
        "lines_total": manifest["_lines_total"],
        "candidate_lines": sum(
            u["source_span"]["line_end"] - u["source_span"]["line_start"] + 1
            for u in manifest["candidates"]),
        "excluded_lines": sum(
            u["source_span"]["line_end"] - u["source_span"]["line_start"] + 1
            for u in manifest["excluded"]),
        "noncontent_lines": len(manifest["noncontent_lines"]),
        "reconstruction_ok": True})
    _write(out, "determinism_digest.md",
           f"# determinism digest\n\n"
           f"manifest_digest_sha256: {hdr['manifest_digest_sha256']}\n"
           f"re_run_equal: true\n")
    _write(out, "dryrun_report.md",
           f"# dot-iu-cutter v0.5 Constitution dry-run\n\n"
           f"status: PASS\nmode: {hdr['mode']}\ndb_write: NONE\n"
           f"production_touched: false\n"
           f"candidate_count: {hdr['candidate_count']}\n"
           f"excluded_count: {hdr['excluded_count']}\n"
           f"manifest_digest_sha256: {hdr['manifest_digest_sha256']}\n")
    print(json.dumps({"status": "PASS",
                      "candidate_count": hdr["candidate_count"],
                      "excluded_count": hdr["excluded_count"],
                      "manifest_digest_sha256":
                          hdr["manifest_digest_sha256"],
                      "production_touched": False, "db_write": "NONE"},
                     ensure_ascii=False, indent=2))
    return 0


if __name__ == "__main__":
    raise SystemExit(main())

2. NEW FILE — tests/test_dryrun_snapshot_mark.py (verbatim)

"""Unit/integration tests for cutter_agent.dryrun (no DB, no gated dry-run).

These exercise: snapshot gate, matchers, status cascade, coverage,
determinism, fail-closed negatives, and import-isolation. They do NOT
emit the official KB first-dry-run artifacts (that is the separately
GPT-gated milestone). The Constitution snapshot fixture
`constitution-normalized-17660443e0f23e99.md` is a byte-exact local copy
of the KB-pinned artifact (region sha256 == 17660443…cae80c).
"""
import importlib
import io
import json
import sys
import unittest
from pathlib import Path

sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from cutter_agent import dryrun as D  # noqa: E402

ART = Path(__file__).resolve().parents[1] / \
    "constitution-normalized-17660443e0f23e99.md"
SHA = "17660443e0f23e994e1807cf8e22920951a9e70c598956dbd0e752f4f5cae80c"
EM = {"enacted": 19, "controlled_draft": 1, "draft": 1, "obsolete": 1}


class _Args:
    source_version_id = "icxconst-008a06ace23a96ea6cd456146e805c97"
    snapshot_artifact = "knowledge/.../snap.md"
    docprefix = "ICX-CONST"
    scope = "enacted_only"
    mode = "mark-manifest-only"


class TestGate(unittest.TestCase):
    def setUp(self):
        self.region = D.extract_region(ART.read_text(encoding="utf-8"))

    def test_region_identity_exact(self):
        g = D.snapshot_gate(self.region, SHA, 17522, EM)
        self.assertEqual(g["region_sha256"], SHA)
        self.assertEqual(g["region_length"], 17522)
        self.assertEqual(g["marker_counts"], EM)

    def test_gate_fails_on_sha_drift(self):
        with self.assertRaises(D.FailClosed):
            D.snapshot_gate(self.region, "deadbeef" * 8, 17522, EM)

    def test_gate_fails_on_length_drift(self):
        with self.assertRaises(D.FailClosed):
            D.snapshot_gate(self.region, SHA, 17521, EM)

    def test_gate_fails_on_marker_drift(self):
        with self.assertRaises(D.FailClosed):
            D.snapshot_gate(self.region, SHA, 17522,
                            {"enacted": 18, "controlled_draft": 1,
                             "draft": 1, "obsolete": 1})

    def test_missing_sentinel_fails(self):
        with self.assertRaises(D.FailClosed):
            D.extract_region("no sentinels here")


class TestManifest(unittest.TestCase):
    def setUp(self):
        self.region = D.extract_region(ART.read_text(encoding="utf-8"))
        self.gate = D.snapshot_gate(self.region, SHA, 17522, EM)
        self.m = D.build_manifest(self.region, self.gate, _Args())

    def test_emits_three_levels_dieu_floor(self):
        levels = {u["level"] for u in self.m["candidates"]}
        self.assertEqual(levels,
                         {"NGUYEN_TAC", "KIEN_TRUC_SECTION", "DIEU"})

    def test_counts_and_range(self):
        h = self.m["manifest_header"]
        self.assertEqual(
            sum(1 for u in self.m["candidates"]
                if u["level"] == "NGUYEN_TAC"), 15)
        self.assertEqual(
            sum(1 for u in self.m["candidates"]
                if u["level"] == "KIEN_TRUC_SECTION"), 3)
        self.assertTrue(55 <= h["candidate_count"] <= 78)

    def test_dieu_44_excluded_controlled_draft_tier2(self):
        d44 = [u for u in self.m["excluded"] if u["number"] == "44"]
        self.assertEqual(len(d44), 1)
        self.assertEqual(d44[0]["effective_status"], "controlled_draft")
        self.assertEqual(d44[0]["status_basis"],
                         "tier_2_explicit_row_marker")
        self.assertEqual(d44[0]["exclusion_reason"],
                         "controlled_draft_deferred")

    def test_dieu_34_draft_and_obsolete_excluded(self):
        st = sorted(u["effective_status"] for u in self.m["excluded"])
        self.assertEqual(st, ["controlled_draft", "draft",
                              "obsolete", "obsolete"])

    def test_no_candidate_is_non_enacted(self):
        self.assertTrue(all(u["effective_status"] == "enacted"
                            for u in self.m["candidates"]))

    def test_address_format_and_uniqueness(self):
        allu = self.m["candidates"] + self.m["excluded"]
        for u in allu:
            self.assertTrue(u["canonical_address"].startswith("ICX-CONST/"))
            self.assertNotIn("✅", u["canonical_address"])
            self.assertNotIn("\U0001F4CB", u["canonical_address"])
        addrs = [u["canonical_address"] for u in allu]
        self.assertEqual(len(addrs), len(set(addrs)))

    def test_coverage_closes_and_reconstructs(self):
        self.assertTrue(D.reconstruction_ok(self.region, self.m))

    def test_determinism(self):
        m2 = D.build_manifest(self.region, self.gate, _Args())
        self.assertEqual(
            self.m["manifest_header"]["manifest_digest_sha256"],
            m2["manifest_header"]["manifest_digest_sha256"])

    def test_provenance_binding(self):
        for u in self.m["candidates"]:
            p = u["provenance"]
            self.assertEqual(p["source_document_version_id"],
                             _Args.source_version_id)
            self.assertEqual(p["snapshot_region_sha256"], SHA)
            self.assertEqual(p["parser_reference_implementation"],
                             D.PARSER_REFIMPL)


class TestFailClosedSynthetic(unittest.TestCase):
    _NT = "".join(f"{i}\nN{i}\nm{i}\nc{i}\n" for i in range(1, 16))
    BASE = ("15 NGUYÊN TẮC NỀN TẢNG — CẤM VI PHẠM\n#\nNguyên tắc\nNghĩa\n"
            "Hệ quả\n" + _NT + "→ ptr\n"
            + D.Z3_H + "\nlesson\nA. SEC A — t\nbodyA\n"
            "B. SEC B — t\nbodyB\nC. SEC C — t\nbodyC\n"
            "2 CHIỀU QUẢN LÝ\nx\nMỤC LỤC LUẬT\n"
            "G — ✅\nĐiều\nTên\n9\nLaw9\nCHANGELOG\nVersion\nrowz")

    def _mk(self, body):
        return ("<<<BEGIN-NORMALIZED-CONTENT-DO-NOT-EDIT\n" + body
                + "\nEND-NORMALIZED-CONTENT-DO-NOT-EDIT>>>\n")

    def test_unknown_marker_fails_closed(self):
        bad = self.BASE.replace("G — ✅", "G — ✨")  # sparkles, unmapped
        region = D.extract_region(self._mk(bad))
        with self.assertRaises(D.FailClosed):
            D.build_manifest(region, {"region_sha256": "x",
                             "region_length": 0, "marker_counts": {}},
                             _Args())

    def test_orphan_row_before_group_fails_closed(self):
        bad = self.BASE.replace("G — ✅\nĐiều\nTên\n", "")
        region = D.extract_region(self._mk(bad))
        with self.assertRaises(D.FailClosed):
            D.build_manifest(region, {"region_sha256": "x",
                             "region_length": 0, "marker_counts": {}},
                             _Args())

    def test_nt_out_of_order_fails_closed(self):
        bad = self.BASE.replace("\n1\nN1\nm1\nc1\n", "\n2\nN1\nm1\nc1\n")
        region = D.extract_region(self._mk(bad))
        with self.assertRaises(D.FailClosed):
            D.build_manifest(region, {"region_sha256": "x",
                             "region_length": 0, "marker_counts": {}},
                             _Args())

    def test_synthetic_happy_path_cascade(self):
        region = D.extract_region(self._mk(self.BASE))
        m = D.build_manifest(region, {"region_sha256": "x",
                             "region_length": 0, "marker_counts": {}},
                             _Args())
        # 15 NT (tier_0), 3 KT (tier_0), 1 DIEU enacted via tier_1 group ✅
        self.assertEqual(sum(1 for u in m["candidates"]
                             if u["level"] == "NGUYEN_TAC"), 15)
        self.assertEqual(sum(1 for u in m["candidates"]
                             if u["level"] == "KIEN_TRUC_SECTION"), 3)
        d = [u for u in m["candidates"] if u["level"] == "DIEU"]
        self.assertEqual(d[0]["status_basis"], "tier_1_group_header")
        self.assertTrue(D.reconstruction_ok(region, m))


class TestNoDbImportIsolation(unittest.TestCase):
    def test_module_imports_only_stdlib(self):
        import ast
        mod = importlib.import_module("cutter_agent.dryrun")
        tree = ast.parse(Path(mod.__file__).read_text(encoding="utf-8"))
        stdlib = {"argparse", "hashlib", "json", "os", "re", "sys",
                  "pathlib", "__future__"}
        mods = set()
        for node in ast.walk(tree):
            if isinstance(node, ast.Import):
                mods |= {a.name.split(".")[0] for a in node.names}
            elif isinstance(node, ast.ImportFrom):
                mods.add((node.module or "").split(".")[0])
        self.assertTrue(mods <= stdlib,
                        f"non-stdlib imports present: {mods - stdlib}")
        for banned in ("psycopg", "socket", "requests", "sqlalchemy",
                       "cutter_agent"):
            self.assertNotIn(banned, mods)

    def test_cli_refuses_wrong_mode(self):
        rc = D.main(["--mode", "cut", "--no-db-write", "--no-cut",
                     "--no-verify", "--source-version-id", "x",
                     "--snapshot-artifact", "x", "--expect-region-sha", "x",
                     "--expect-length", "1", "--expect-markers",
                     "enacted=1", "--out-dir", "/tmp/iucut-x"])
        self.assertEqual(rc, 2)

    def test_cli_refuses_without_no_db_flags(self):
        rc = D.main(["--mode", "mark-manifest-only",
                     "--source-version-id", "x", "--snapshot-artifact", "x",
                     "--expect-region-sha", "x", "--expect-length", "1",
                     "--expect-markers", "enacted=1",
                     "--out-dir", "/tmp/iucut-x"])
        self.assertEqual(rc, 2)


if __name__ == "__main__":
    unittest.main(verbosity=2)

3. Notes for reviewers

- the test fixture constitution-normalized-17660443e0f23e99.md is a byte-exact
  local copy of the KB-pinned artifact; its region sha256 reproduces
  17660443…cae80c exactly (validated, doc 3). It is NOT committed to the repo by
  this phase; a later code phase decides whether the fixture ships in tests/ or is
  fetched from KB at test time.
- import isolation is STRUCTURAL: dryrun.py imports only argparse/hashlib/json/os/
  re/sys/pathlib/__future__. It never imports db_adapter/phases/ledger/signal,
  psycopg, socket, requests, or sqlalchemy (AST-asserted in TestNoDbImportIsolation).
- no existing repo file is modified; cli.py / phases.py / db_adapter.py untouched.
- doc 2 of 6; nothing applied/committed/deployed. Self-advance PROHIBITED.

Companion docs: code-authoring-plan (1), test-plan-and-results (3), command-review-package (4), risk-and-rollback-note (5), code-authoring-report (6).

Back to Knowledge Hub knowledge/dev/laws/dieu44-trien-khai/v0.5-constitution-snapshot-source-mark-dryrun-entrypoint-code-authoring/dot-iu-cutter-v0.5-constitution-snapshot-mark-code-diff-or-patch-2026-05-18.md