KB-6C26 rev 3

cp-render-section.py rev3 (whitelist from dot_config NT2/NT4)

12 min read Revision 3
dieu43stage-e2checksumsnt2-fix

#!/usr/bin/env python3

=============================================================================

Đ43 v1.2 rev 6 §6 Bước 5 — Generic section renderer (1 section per call).

Called per section by /opt/incomex/dot/bin/dot-context-pack-build.sh.

Responsibilities:

- Fetch template from KB (Agent Data API GET /documents/{path}?full=true)

- If data_source in (pg_query, kb_query): fetch SQL, validate 5 guards §5.8,

execute on target_db (rev 6) via context_pack_readonly + READ ONLY TX + timeout 30s

- Validate render_config keys (§5.7 P9 whitelist) — key lạ → FAIL-FAST

- Render mustache via chevron (Python helper auto-installed by precheck 1.9)

- Write atomically to output_file ($OUTPUT_ROOT.tmp/$BUILD_ID/<output_filename>)

NT2 / NT4 / rev 4 compliance: 0 case-dispatch per section, 0 hardcode template,

0 fallback built-in. Thêm section mới = INSERT section_definitions + upload KB.

Env required (from build.sh env_load → .env.production):

PGHOST PGPORT PG_USER_RO PG_PASSWORD_RO

AGENT_DATA_URL AGENT_DATA_API_KEY

=============================================================================

import argparse import json import os import re import subprocess import sys import urllib.request import urllib.error

Render engines imported on demand based on render_config.placeholder_style.

Precheck 1.9 (build.sh) reads dot_config.context_pack_python_deps và auto-install.

Exit codes

EX_OK = 0 EX_FAIL = 1 EX_SKIP = 77 # data_source not supported → WARN skip (Lỗ 4 fix)

render_config key whitelist — NT2/NT4: đọc runtime từ dot_config, KHÔNG hardcode.

fetch_render_config_whitelist() cache giá trị trong process.

_RC_WHITELIST_CACHE = None

BANNED_SQL_PATTERNS = [ r'\bINSERT\b', r'\bUPDATE\b', r'\bDELETE\b', r'\bALTER\b', r'\bDROP\b', r'\bTRUNCATE\b', r'\bGRANT\b', r'\bREVOKE\b', r'\bCOPY\b', r'\bCALL\b', r'\bVACUUM\b', r'\bANALYZE\b', r'\bCREATE\b', r'\bDO\b', r'SET\s+ROLE\b', r'SET\s+SESSION\b', ]

PATH_PREFIX_QUERY = 'knowledge__current-state__queries__' PATH_PREFIX_TEMPLATE = 'knowledge__current-state__templates__'

def log(msg): print(f'[render] {msg}', file=sys.stderr)

def kb_get(doc_id): url = f"{os.environ['AGENT_DATA_URL']}/documents/{doc_id}?full=true" req = urllib.request.Request(url, headers={'X-API-Key': os.environ['AGENT_DATA_API_KEY']}) try: with urllib.request.urlopen(req, timeout=30) as r: d = json.load(r) except urllib.error.HTTPError as e: raise RuntimeError(f'KB GET failed HTTP={e.code} doc_id={doc_id}: {e.read().decode()[:200]}') content = d.get('content') if content is None: raise RuntimeError(f'KB GET returned no content field: doc_id={doc_id}') return content

def kb_path_to_doc_id(kb_path): # kb_documents.key uses __ separator (legacy _fs_key encoding). # Agent Data API expects / in document_id path; storage re-encodes. return kb_path.replace('__', '/')

def fetch_render_config_whitelist(): """Đọc dot_config.context_pack_render_config_whitelist từ PG (NT2/NT4).""" global _RC_WHITELIST_CACHE if _RC_WHITELIST_CACHE is not None: return _RC_WHITELIST_CACHE env = dict(os.environ) env['PGPASSWORD'] = env['PG_PASSWORD_RW'] cmd = [ 'psql', '-h', env['PGHOST'], '-p', env['PGPORT'], '-U', env['PG_USER_RW'], '-d', env['PG_DB_MAIN'], '-tAXq', '-v', 'ON_ERROR_STOP=1', '-c', "SELECT value FROM dot_config WHERE key = 'context_pack_render_config_whitelist'", ] proc = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=30) if proc.returncode != 0: raise RuntimeError(f"fetch render_config whitelist fail: {proc.stderr.strip()[:200]}") out = proc.stdout.strip() if not out: raise RuntimeError("dot_config.context_pack_render_config_whitelist KHÔNG tồn tại (§6.X P2 CẤM fallback)") _RC_WHITELIST_CACHE = set(json.loads(out)) return _RC_WHITELIST_CACHE

def validate_render_config(rc, code): whitelist = fetch_render_config_whitelist() unknown = set(rc.keys()) - whitelist if unknown: raise ValueError( f"§5.7 P9 render_config key ngoài whitelist (section={code}): " f"{sorted(unknown)} | whitelist={sorted(whitelist)}" )

def validate_template_path(path): if not path.startswith(PATH_PREFIX_TEMPLATE): raise ValueError(f'template_kb_path ngoài whitelist: {path}')

def validate_query_guards(path, sql): # Guard 1: path whitelist if not path.startswith(PATH_PREFIX_QUERY): raise ValueError(f'§5.8 guard 1 path whitelist fail: {path}') # Guard 2a: banned token for pat in BANNED_SQL_PATTERNS: if re.search(pat, sql, flags=re.IGNORECASE): raise ValueError(f'§5.8 guard 2 banned token: {pat} in {path}') # Guard 2b: single-statement (no stray ; outside string literal) body = sql.strip().rstrip(';').strip() in_str = False i = 0 while i < len(body): c = body[i] if c == "'" and (i == 0 or body[i - 1] != '\'): in_str = not in_str elif c == ';' and not in_str: raise ValueError(f'§5.8 guard 2 multi-statement forbidden: {path}') i += 1

def run_query(sql, target_db): """Execute user SQL on target_db as context_pack_readonly. Session-level guards via PGOPTIONS: default_transaction_read_only=on (guard 4), statement_timeout=30s (guard 5). Role guarded by PG_USER_RO (guard 3). Wrap user SQL with row_to_json(...) to return single JSON row (robust parsing).""" wrapped = f"SELECT row_to_json(t) FROM (\n{sql}\n) t LIMIT 1" env = dict(os.environ) env['PGPASSWORD'] = env['PG_PASSWORD_RO'] env['PGOPTIONS'] = '-c default_transaction_read_only=on -c statement_timeout=30s' env['LC_ALL'] = env.get('LC_ALL', 'C.UTF-8') env['LANG'] = env.get('LANG', 'C.UTF-8') cmd = [ 'psql', '-h', env['PGHOST'], '-p', env['PGPORT'], '-U', env['PG_USER_RO'], '-d', target_db, '-tAXq', '-v', 'ON_ERROR_STOP=1', '-c', wrapped, ] proc = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=60) if proc.returncode != 0: raise RuntimeError(f'psql failed (target_db={target_db}): {proc.stderr.strip()[:400]}') out = proc.stdout.strip() # Lỗ 3 fix: 0-row SQL → WARN + return {} (KHÔNG fail-fast) if not out: log(f'WARN psql returned 0 rows (target_db={target_db}) — proceeding với data dict rỗng') return {} return json.loads(out)

def render_with_engine(template, data, style, code): """Dispatch render engine theo render_config.placeholder_style (§5.7 P9). 'mustache' → chevron (default). 'jinja' → jinja2. Key lạ đã bị reject ở upstream.""" if style == 'mustache': import chevron return chevron.render(template, data) if style == 'jinja': import jinja2 env_j2 = jinja2.Environment( undefined=jinja2.Undefined, # missing key → empty, không fail autoescape=False, keep_trailing_newline=True, ) return env_j2.from_string(template).render(**data) raise ValueError(f"placeholder_style='{style}' không support — section={code}")

def render_section(args): code = args.code log(f'section={code} format={args.format} data_source={args.data_source} target_db={args.target_db or "NULL"}')

rc = json.loads(args.render_config) if args.render_config else {}
validate_render_config(rc, code)
style = rc.get('placeholder_style', 'mustache')
log(f'placeholder_style={style}')

if not args.template_path:
    raise ValueError(f'template_kb_path NULL (Đ43 rev 4 CẤM built-in fallback) section={code}')
validate_template_path(args.template_path)

# Lỗ 4 fix: data_source chưa support → exit 77 SKIP (KHÔNG crash)
ds = args.data_source
if ds in ('filesystem_scan', 'custom'):
    log(f"WARN data_source='{ds}' not supported in Stage D2 — SKIP section={code}")
    sys.exit(EX_SKIP)

template_doc_id = kb_path_to_doc_id(args.template_path)
template = kb_get(template_doc_id)
log(f'template loaded: {template_doc_id} ({len(template)} bytes)')

# Lỗ 2 fix: template body rỗng → FAIL-FAST (KHÔNG sinh file rỗng)
if not template.strip():
    raise ValueError(f"template body rỗng/whitespace-only (Lỗ 2 guard) section={code} path={args.template_path}")

data = {
    'generated_at': args.generated_at,
    'build_id': args.build_id,
    'git_commit': args.git_commit,
    'trigger_source': args.trigger_source,
}

if ds in ('pg_query', 'kb_query'):
    if not args.query_path:
        raise ValueError(f'query_kb_path NULL khi data_source={ds} — FAIL-FAST section={code}')
    if not args.target_db:
        raise ValueError(f'target_db NULL khi data_source={ds} — Đ43 rev 6 §5.7 chk_target_db_consistency')
    sql_doc_id = kb_path_to_doc_id(args.query_path)
    sql = kb_get(sql_doc_id)
    validate_query_guards(args.query_path, sql)
    log(f'sql loaded: {sql_doc_id} ({len(sql)} bytes)')
    row = run_query(sql, args.target_db)  # may return {} on 0-row (see run_query)
    data.update(row)
    log(f'query returned {len(row)} keys')
elif ds == 'static':
    log('static section — no query')
else:
    raise ValueError(f'unknown data_source={ds} section={code}')

rendered = render_with_engine(template, data, style, code)

os.makedirs(os.path.dirname(args.output_file), exist_ok=True)
tmp_path = f'{args.output_file}.part'
with open(tmp_path, 'w') as f:
    f.write(rendered)
os.rename(tmp_path, args.output_file)
size = os.path.getsize(args.output_file)
log(f'wrote {args.output_file} ({size} bytes)')
print(size)

def main(): ap = argparse.ArgumentParser(description='Đ43 §6 Bước 5 generic section renderer') ap.add_argument('--code', required=True) ap.add_argument('--format', required=True, choices=['markdown', 'json', 'mermaid']) ap.add_argument('--data-source', required=True) ap.add_argument('--target-db', default='') ap.add_argument('--template-path', default='') ap.add_argument('--query-path', default='') ap.add_argument('--render-config', default='{}') ap.add_argument('--output-file', required=True) ap.add_argument('--generated-at', required=True) ap.add_argument('--build-id', required=True) ap.add_argument('--git-commit', required=True) ap.add_argument('--trigger-source', required=True) args = ap.parse_args()

for attr in ('target_db', 'template_path', 'query_path'):
    if getattr(args, attr) == '':
        setattr(args, attr, None)

try:
    render_section(args)
except Exception as exc:
    log(f'FATAL section={args.code}: {type(exc).__name__}: {exc}')
    sys.exit(1)

if name == 'main': main()