FIX7 Real-N6 packet — kb_fetch_reconstruct.py
#!/usr/bin/env python3
============================================================================
FIX7 REAL-N6 packet -- KB fresh-fetch reconstruction tool.
Rebuilds the governed evidence this packet verifies, from the KB alone:
- the FIX7 Recheck-9 Packet V2 root files + 10 active corpus docs (the
governed blueprint authority surface), verified bidirectionally against
that packet's HASH_MANIFEST.txt (FAIL-CLOSED on any gap/drift);
- the governed authority_seal_encoder.py (sha256 13344f92...957144b8),
verified against its pinned hash (the firewall/probe seal boundary).
The real-N6 verifier does NOT keep a duplicate KB copy of the corpus or the
algorithm: ONE authority, ONE nature. This tool materializes them on demand.
Usage: python3 kb_fetch_reconstruct.py <outdir>
reads creds from ~/.claude.json mcpServers["agent-data"] (or env MCP_URL/MCP_KEY)
READ-ONLY KB access (get_document_for_rewrite); writes only inside <outdir>.
============================================================================
import json, os, ssl, sys, hashlib, urllib.request
PACKET_ROOT = "knowledge/dev/laws/tool-kiem-thu/packets/fix7-codex-recheck-9-2026-06-10/" BLUEPRINT = "knowledge/dev/reports/architecture/t1-fix7-existing-system-refactor-execution-blueprint-2026-06-08/" ENCODER_DOC = "knowledge/dev/laws/tool-kiem-thu/packets/fix7-authority-closure-2026-06-10/authority_seal_encoder.py" ENCODER_SHA = "13344f92cafcaf0d07dcb21700bdb642f38b89351702e08080eacb0e957144b8"
ROOT_FILES = ["README_FOR_CODEX.md","RERUN.sh","HASH_MANIFEST.txt","manifest.json", "manifest_tool.py","adversarial_suite.py","kb_fetch_reconstruct.py", "blackbox_negative_suite.py","failopen_regression.py", "evidence/canonicalizer-fix7-canon-v1-ssot.md","evidence/canonicalizer-fix7-canon-v1-ssot.py", "evidence/fix7_canon_v1_ssot_extended.py","evidence/materialize_canonicalizer.py", "evidence/selftest-expected-output.txt","evidence/produce-expected-output.txt", "logs/materialized-selftest.log","logs/extended-selftest.log","logs/produce.log", "logs/forbidden-scope.log","logs/manifest-verify.log","logs/adversarial-suite.log", "logs/blackbox-negative-suite.log","logs/failopen-regression.log"] DOCS = ["00-readme-first.md","01-live-existing-system-inventory.md","02-design-to-live-mapping.md", "03-gap-classification.md","04-dependency-safe-construction-order.md","05-rollback-blueprint.md", "06-test-guard-blueprint.md","07-implementation-package-split.md","08-hard-blocks-do-not-touch-list.md", "12-final-verdict.md"]
def cfg(): url, key = os.environ.get("MCP_URL"), os.environ.get("MCP_KEY") if url and key: return url, key c = json.load(open(os.path.expanduser("~/.claude.json")))["mcpServers"]["agent-data"] return c["url"], c["headers"]["X-API-Key"]
def ctx(): try: return ssl.create_default_context(cafile="/etc/ssl/cert.pem") except Exception: return ssl.create_default_context()
def fetch(url, key, doc_id, rid): body = json.dumps({"jsonrpc":"2.0","id":rid,"method":"tools/call","params":{ "name":"get_document_for_rewrite","arguments":{"document_id":doc_id}}}).encode() req = urllib.request.Request(url, data=body, headers={"Content-Type":"application/json", "Accept":"application/json, text/event-stream","X-API-Key":key}) with urllib.request.urlopen(req, timeout=120, context=ctx()) as r: raw = r.read().decode() if "\ndata:" in raw or raw.startswith("data:"): for line in raw.splitlines(): if line.startswith("data:"): raw = line[5:].strip() resp = json.loads(raw) if "error" in resp: raise SystemExit(f"FETCH_FAILED {doc_id}: {resp['error']}") d = json.loads(resp["result"]["content"][0]["text"]) if "content" not in d: raise SystemExit(f"FETCH_FAILED {doc_id}: no content: {str(d)[:200]}") return d
def main(): if len(sys.argv) != 2: print("usage: kb_fetch_reconstruct.py <outdir>"); return 2 out = sys.argv[1] url, key = cfg() plan = [(PACKET_ROOT+f, f) for f in ROOT_FILES] + [(BLUEPRINT+d, "docs/"+d) for d in DOCS] rid = 0 for doc_id, rel in plan: rid += 1 d = fetch(url, key, doc_id, rid) p = os.path.join(out, rel); os.makedirs(os.path.dirname(p) or ".", exist_ok=True) open(p, "wb").write(d["content"].encode("utf-8")) # governed authority encoder (seal boundary), pinned by sha256 rid += 1 d = fetch(url, key, ENCODER_DOC, rid) ep = os.path.join(out, "authority", "authority_seal_encoder.py") os.makedirs(os.path.dirname(ep), exist_ok=True) edata = d["content"].encode("utf-8"); open(ep, "wb").write(edata) eh = hashlib.sha256(edata).hexdigest() if eh != ENCODER_SHA: print(f"RECONSTRUCTION: FAIL (authority encoder {eh} != pinned {ENCODER_SHA})"); return 1 # bidirectional verify the recheck-9 tree against its HASH_MANIFEST hm = os.path.join(out, "HASH_MANIFEST.txt"); bad=[]; entries={} for line in open(hm): line=line.strip() if not line or line.startswith("#"): continue h,_,rel=line.partition(" "); entries[rel]=h fetched = {rel for _, rel in plan if rel != "HASH_MANIFEST.txt"} for rel in sorted(fetched): h = hashlib.sha256(open(os.path.join(out, rel), "rb").read()).hexdigest() if rel not in entries: bad.append(f"fetched not pinned: {rel}") elif entries[rel] != h: bad.append(f"hash mismatch: {rel}") for rel in sorted(entries): if rel not in fetched: bad.append(f"manifest entry not reconstructed: {rel}") if bad: print("RECONSTRUCTION: FAIL"); [print(" -", b) for b in bad]; return 1 print(f"RECONSTRUCTION: OK ({len(fetched)} recheck-9 files + governed encoder; tree matches HASH_MANIFEST)") print(f" authority_seal_encoder.py sha256 = {eh} (== pinned)") return 0
if name == "main": sys.exit(main())