P10B D28 FIX package gen.py
#!/usr/bin/env python3 import json, hashlib, re, uuid, datetime from pathlib import Path
BASE = Path('/tmp/p10b-2b-fix-vps') DOC_CODE = 'DIEU-28' VERSION = 'v2.0' OWNER = 'INCOMEX' KB_BASE = 'knowledge/dev/laws/dieu38-trien-khai/data/p10b-d28-fix-package' REQUIRED_TYPES = {'paragraph','principle','technical_spec','governance_process','process'} EXEMPT_TYPES = {'heading','checklist'} NS = uuid.UUID('1f2c0f84-7f9d-4d69-98d8-28f92f6bf001')
def stable_uuid(key): return str(uuid.uuid5(NS, key))
def load_schema(): schema = {} for line in (BASE/'schema.tsv').read_text().splitlines(): if not line.strip(): continue table, col, dtype, nullable, default = line.split('\t', 4) schema.setdefault(table, set()).add(col) return schema
def require_cols(schema, table, cols): missing = [c for c in cols if c not in schema.get(table, set())] if missing: raise SystemExit(f'{table} missing columns: {missing}')
def dq(s, preferred='txt'): s = '' if s is None else str(s) tag = preferred if f'${tag}$' in s: base = preferred.upper() + 'TAG' i = 1 while f'${base}{i}$' in s: i += 1 tag = f'{base}{i}' return f'${tag}${s}${tag}$'
def sq(s): return "'" + str(s).replace("'", "''") + "'"
def lit(s): if s is None: return 'NULL' return sq(s)
def uuid_lit(s): if s is None: return 'NULL' return f"'{s}'::uuid"
def json_lit(obj): return sq(json.dumps(obj, ensure_ascii=False, sort_keys=True)) + '::jsonb'
def body_lit(body, idx): body = body or '' tag = f'BODYTAG{idx}' if f'${tag}$' in body: tag = f'BDTAG{idx}' if f'${tag}$' in body: raise SystemExit(f'No safe body tag for index {idx}') return dq(body, tag)
def sql_insert(table, cols, vals): return f"INSERT INTO {table} ({', '.join(cols)})\nVALUES ({', '.join(vals)});"
def main(): units = json.loads((BASE/'candidate-units-r2.json').read_text()) if len(units) != 27: raise SystemExit(f'Expected 27 units, got {len(units)}') schema = load_schema() pub_cols = ['id','doc_code','version','publication_type','name','owner','description','lifecycle_status','risk_tier','publication_profile'] lu_cols = ['id','canonical_address','doc_code','parent_id','sort_order','section_type','section_code','owner','identity_profile','tier','lifecycle_status'] uv_cols = ['id','logical_unit_id','version_number','title','body','description','content_hash','lifecycle_status','review_state','length_flag','content_profile','editor','provenance'] pm_cols = ['id','publication_id','logical_unit_id','unit_version_id','render_order'] for table, cols in [('tac_publication',pub_cols),('tac_logical_unit',lu_cols),('tac_unit_version',uv_cols),('tac_publication_member',pm_cols)]: require_cols(schema, table, cols)
vocab = {}
for line in (BASE/'vocab.tsv').read_text().splitlines():
code, body_req, desc_req = line.split('\t')
vocab[code] = {'body_required': body_req == 't', 'description_required': desc_req == 't'}
missing_vocab = sorted({u['section_type'] for u in units} - set(vocab))
if missing_vocab:
raise SystemExit(f'Missing vocab: {missing_vocab}')
ids = {u['canonical_address']: stable_uuid('lu:'+u['canonical_address']) for u in units}
uv_ids = {u['canonical_address']: stable_uuid('uv:'+u['canonical_address']) for u in units}
pm_ids = {u['canonical_address']: stable_uuid('pm:'+u['canonical_address']) for u in units}
pub_id = stable_uuid('pub:DIEU-28:v2.0')
gates = []
def add(gate_id, table_event, source_function, mode, blocking, requirement, cols, status, fix=''):
gates.append({
'gate_id': gate_id,
'table_event': table_event,
'source_function': source_function,
'mode': mode,
'blocking': blocking,
'requirement': requirement,
'required_sql_columns': cols,
'compliance_status': status,
'fix_applied': fix,
})
all_addr_ok = all(re.match(r'^D38-[A-Z0-9]+-((ROOT)|(S[0-9]+(-P[0-9]+(-[0-9]+)*)?))$', u['canonical_address']) for u in units)
all_doc_ok = True
parent_ok = all(u['parent'] is None or u['parent'] in ids for u in units)
section_ok = all(u['section_type'] in vocab for u in units)
owner_ok = True
sort_ok = all(isinstance(u['sort_order'], int) and u['sort_order'] >= 0 for u in units)
title_ok = all(u.get('title','').strip() for u in units)
desc_required = [u for u in units if vocab[u['section_type']]['description_required']]
desc_exempt = [u for u in units if not vocab[u['section_type']]['description_required']]
desc_ok = len(desc_required) == 20 and len(desc_exempt) == 7 and all(u.get('title','').strip() for u in desc_required)
body_ok = all((not vocab[u['section_type']]['body_required']) or (u.get('body','').strip()) for u in units if u['section_type'] not in {'heading'})
provenance_ok = True
add('BG-LU-01','tac_logical_unit BEFORE INSERT','fn_tac_birth_gate_lu','block',True,'canonical_address matches D38-local regex',['canonical_address'],'PASS' if all_addr_ok else 'FAIL')
add('BG-LU-02','tac_logical_unit BEFORE INSERT','fn_tac_birth_gate_lu','block',True,'doc_code non-empty',['doc_code'],'PASS' if all_doc_ok else 'FAIL')
add('BG-LU-03','tac_logical_unit BEFORE INSERT','fn_tac_birth_gate_lu','block',True,'parent_id exists and shares doc_code',['parent_id','doc_code'],'PASS' if parent_ok else 'FAIL')
add('BG-LU-04','tac_logical_unit BEFORE INSERT','fn_tac_birth_gate_lu','block',True,'section_type active in tac_section_type_vocab',['section_type'],'PASS' if section_ok else 'FAIL')
add('BG-LU-05','tac_logical_unit BEFORE INSERT','fn_tac_birth_gate_lu','block',True,'owner non-empty',['owner'],'PASS' if owner_ok else 'FAIL')
add('BG-LU-06','tac_logical_unit BEFORE INSERT','fn_tac_birth_gate_lu','block',True,'sort_order non-null and >= 0',['sort_order'],'PASS' if sort_ok else 'FAIL')
add('BG-UV-01','tac_unit_version BEFORE INSERT','fn_tac_birth_gate_uv','block',True,'title non-empty',['title'],'PASS' if title_ok else 'FAIL')
add('BG-UV-02','tac_unit_version BEFORE INSERT','fn_tac_birth_gate_uv','block',True,'description NOT NULL when description_required=true',['description'],'PASS' if desc_ok else 'FAIL','description = title stub for 20 applicable units; NULL for 7 exempt units')
add('BG-UV-03','tac_unit_version BEFORE INSERT','fn_tac_birth_gate_uv','warn',False,'body NOT NULL when body_required=true',['body'],'PASS' if body_ok else 'WARN')
add('BG-UV-04','tac_unit_version BEFORE INSERT','fn_tac_birth_gate_uv','block',True,'content_profile required for generated package',['content_profile'],'PASS')
add('BG-UV-05','tac_unit_version BEFORE INSERT','fn_tac_birth_gate_uv','warn',False,'length quality gate',['length_flag','length_exception_reason'],'PASS')
add('BG-UV-06','tac_unit_version BEFORE INSERT','fn_tac_birth_gate_uv','warn',False,'provenance maps to accepted values',['provenance'],'PASS' if provenance_ok else 'WARN')
add('INV-PM-CONSISTENCY','tac_publication_member BEFORE INSERT','fn_tac_pm_consistency','block',True,'pm.logical_unit_id equals uv.logical_unit_id',['logical_unit_id','unit_version_id'],'PASS')
if any(g['blocking'] and g['compliance_status'] != 'PASS' for g in gates):
raise SystemExit('Blocking birth gate failed')
(BASE/'birth-gate-requirements.json').write_text(json.dumps(gates, ensure_ascii=False, indent=2), encoding='utf-8')
lines = []
lines.append('-- P10B-2B-FIX-REGEN-PROMOTE v2c — DIEU-28 v2.0 candidate insert package')
lines.append('-- Generated read-only; do not execute before P10B-2C-PF-R2.')
lines.append('BEGIN;')
lines.append('')
pub_profile = {'source':'P10B-2B-FIX-REGEN-PROMOTE-v2c','expected_units':27,'source_sha256': hashlib.sha256((BASE/'dieu28-display-technology-law.md').read_bytes()).hexdigest()}
lines.append(sql_insert('tac_publication', pub_cols, [uuid_lit(pub_id), lit(DOC_CODE), lit(VERSION), lit('law'), lit('ĐIỀU 28: LUẬT KỸ THUẬT HIỂN THỊ — v2.0 BAN HÀNH'), lit(OWNER), lit('DIEU-28 v2.0 regenerated candidate package with BG-UV-02 description fix'), lit('proposed'), lit('medium'), json_lit(pub_profile)]))
lines.append('')
for idx,u in enumerate(units,1):
addr = u['canonical_address']
parent = u.get('parent')
section_code = addr.split('-')[-1]
identity = {'canonical_address':addr,'source_span':u.get('source_span'), 'body_sha256': u.get('body_sha256')}
tier = 'root' if parent is None else ('section' if u['section_type'] == 'heading' else 'unit')
lines.append(f'-- {addr}')
lines.append(sql_insert('tac_logical_unit', lu_cols, [uuid_lit(ids[addr]), lit(addr), lit(DOC_CODE), uuid_lit(ids[parent] if parent else None), str(u['sort_order']), lit(u['section_type']), lit(section_code), lit(OWNER), json_lit(identity), lit(tier), lit('draft_only')]))
desc = u['title'] if vocab[u['section_type']]['description_required'] else None
body = u.get('body','')
content_profile = {'word_count':u.get('word_count',0),'source_span':u.get('source_span'), 'description_required': vocab[u['section_type']]['description_required']}
length_flag = 'empty' if not body else ('long' if len(body) > 4000 else 'normal')
lines.append(sql_insert('tac_unit_version', uv_cols, [uuid_lit(uv_ids[addr]), uuid_lit(ids[addr]), '1', lit(u['title']), body_lit(body,idx), lit(desc), lit(u['body_sha256']), lit('draft'), lit('unreviewed'), lit(length_flag), json_lit(content_profile), lit('GPT'), lit('PROV-AI')]))
lines.append(sql_insert('tac_publication_member', pm_cols, [uuid_lit(pm_ids[addr]), uuid_lit(pub_id), uuid_lit(ids[addr]), uuid_lit(uv_ids[addr]), str(idx-1)]))
lines.append('')
lines.append('-- Review manually after PF-R2. Exactly one of the following may be uncommented during authorized execute step.')
lines.append('-- COMMIT;')
lines.append('-- ROLLBACK;')
(BASE/'insert-candidate.sql').write_text('\n'.join(lines)+'\n', encoding='utf-8')
render = """-- Render DIEU-28 v2.0 preorder output; self-scoped, no external pub_id parameter.
WITH RECURSIVE pub AS ( SELECT id FROM tac_publication WHERE doc_code = 'DIEU-28' AND version = 'v2.0' ), tree AS ( SELECT lu.id, lu.parent_id, lu.canonical_address, lu.sort_order, lu.section_type, uv.title, uv.body, 0 AS depth, ARRAY[lu.sort_order, 0, 0, 0]::int[] AS path FROM tac_logical_unit lu JOIN tac_publication_member pm ON pm.logical_unit_id = lu.id JOIN pub ON pub.id = pm.publication_id JOIN tac_unit_version uv ON uv.id = pm.unit_version_id WHERE lu.parent_id IS NULL AND lu.doc_code = 'DIEU-28' UNION ALL SELECT child.id, child.parent_id, child.canonical_address, child.sort_order, child.section_type, uv.title, uv.body, tree.depth + 1, tree.path || child.sort_order FROM tree JOIN tac_logical_unit child ON child.parent_id = tree.id JOIN tac_publication_member pm ON pm.logical_unit_id = child.id JOIN pub ON pub.id = pm.publication_id JOIN tac_unit_version uv ON uv.id = pm.unit_version_id WHERE child.doc_code = 'DIEU-28' ) SELECT canonical_address, depth, section_type, title, body FROM tree ORDER BY path; """ (BASE/'render.sql').write_text(render, encoding='utf-8') rollback = """-- Rollback DIEU-28 v2.0 candidate package; reverse-FK order and strictly scoped. BEGIN; WITH pub AS ( SELECT id FROM tac_publication WHERE doc_code = 'DIEU-28' AND version = 'v2.0' ), scoped_lu AS ( SELECT id FROM tac_logical_unit WHERE doc_code = 'DIEU-28' AND canonical_address LIKE 'D38-DIEU28-%' ), scoped_uv AS ( SELECT uv.id FROM tac_unit_version uv JOIN scoped_lu lu ON lu.id = uv.logical_unit_id ) DELETE FROM tac_publication_member pm USING pub, scoped_lu lu, scoped_uv uv WHERE pm.publication_id = pub.id AND pm.logical_unit_id = lu.id AND pm.unit_version_id = uv.id;
WITH scoped_lu AS ( SELECT id FROM tac_logical_unit WHERE doc_code = 'DIEU-28' AND canonical_address LIKE 'D38-DIEU28-%' ) DELETE FROM tac_unit_version uv USING scoped_lu lu WHERE uv.logical_unit_id = lu.id;
DELETE FROM tac_logical_unit WHERE doc_code = 'DIEU-28' AND canonical_address LIKE 'D38-DIEU28-%';
DELETE FROM tac_publication WHERE doc_code = 'DIEU-28' AND version = 'v2.0';
-- COMMIT; -- ROLLBACK; """ (BASE/'rollback.sql').write_text(rollback, encoding='utf-8') verify = """-- Verify DIEU-28 v2.0 candidate package counts; self-scoped. WITH pub AS ( SELECT id FROM tac_publication WHERE doc_code = 'DIEU-28' AND version = 'v2.0' ), lu AS ( SELECT id FROM tac_logical_unit WHERE doc_code = 'DIEU-28' AND canonical_address LIKE 'D38-DIEU28-%' ), uv AS ( SELECT uv.id FROM tac_unit_version uv JOIN lu ON lu.id = uv.logical_unit_id ), pm AS ( SELECT pm.id FROM tac_publication_member pm JOIN pub ON pub.id = pm.publication_id JOIN lu ON lu.id = pm.logical_unit_id JOIN uv ON uv.id = pm.unit_version_id ), counts AS ( SELECT 'publication' AS target, 1 AS expected, count()::int AS actual FROM pub UNION ALL SELECT 'logical_unit', 27, count()::int FROM lu UNION ALL SELECT 'unit_version', 27, count()::int FROM uv UNION ALL SELECT 'publication_member', 27, count()::int FROM pm ) SELECT target, expected, actual, (expected = actual) AS pass FROM counts UNION ALL SELECT 'total', 82, sum(actual)::int, (sum(actual)::int = 82) FROM counts; """ (BASE/'verify-counts.sql').write_text(verify, encoding='utf-8') print('generated files: insert-candidate.sql render.sql rollback.sql verify-counts.sql birth-gate-requirements.json')
if name == 'main': main()