23-P3D4C1 — Staging Outbox + Worker Notification Implementation Prompt (rev3)
23-P3D4C1 — Staging Outbox + Worker Notification Implementation Prompt (rev3)
Date: 2026-05-08 Status: PROMPT rev3 — chờ GPT/User final review. CHƯA dispatch. Report: knowledge/dev/laws/dieu44-trien-khai/reports/23-p3d4c1-staging-outbox-worker-notification-implementation-report.md Scope: PG implementation — staging table, birth trigger, worker function, pg_cron. NO Directus/Nuxt. Rev2→Rev3: Fix 3 blockers (gateway populate timing, test debounce clamp, rollback-on-success), plus constraint name verify, worker detail reporting.
Hard Boundaries
- ❌ KHÔNG code Nuxt
- ❌ KHÔNG mutate Directus config
- ❌ KHÔNG start Hermes
- ❌ KHÔNG dispatch Codex (cần user approval)
- ❌ KHÔNG heavy computation trên hot path
- ❌ KHÔNG external scheduler — pg_cron only
- ❌ KHÔNG viết tool/service mới
- ❌ KHÔNG expose body content
- ❌ KHÔNG backfill nặng
- ❌ KHÔNG claim batch grouping hoàn chỉnh nếu stable ref chưa populate kịp trước birth hook
- ✅ PG schema changes (reviewed, with data-safe rollback)
- ✅ pg_cron extension install (admin action, with preflight)
Architecture
AI ghi miếng → fn_iu_save(... , p_source_document_ref, p_import_batch_ref) → PG (fast)
→ IU created with source/batch ref populated
→ UV(seq=1) created → birth trigger O(1) → iu_notification_pending (stable key available)
→ KHÔNG tính toán gì thêm
pg_cron worker (mỗi ~2 phút)
→ advisory lock (exception-safe)
→ đọc pending (debounce window passed)
→ gom batch theo COALESCE(source_document_ref, import_batch_ref)
→ emit durable events → iu_notification_event
→ mark processed → log → unlock
Directus/Nuxt → DEFERRED (P3D4C2)
Step 0: Preflight Checks
0A. pg_cron
SELECT name, installed_version FROM pg_available_extensions WHERE name = 'pg_cron';
-- NOT AVAILABLE → STOP (need container config)
-- AVAILABLE but not installed → proceed Step 1
-- Already installed → skip Step 1
0B. Schema state
SELECT 1 FROM information_schema.tables WHERE table_name = 'iu_notification_pending';
SELECT tgname FROM pg_trigger WHERE tgname ILIKE '%birth%notif%';
SELECT prosrc FROM pg_proc WHERE proname IN ('fn_iu_notif_version','fn_iu_notif_draft','fn_iu_notif_comment');
-- Verify O(1): no COUNT/JOIN/aggregation in source
0C. Config table
SELECT column_name, data_type FROM information_schema.columns
WHERE table_name = 'dot_config' ORDER BY ordinal_position;
-- NOT_FOUND or INCOMPATIBLE → STOP
0D. Gateway flow inventory (CRITICAL — Blocker 1)
-- Read fn_iu_save signature and body
SELECT proname, pg_get_function_arguments(oid), prosrc
FROM pg_proc WHERE proname = 'fn_iu_save';
Agent must determine:
- Does fn_iu_save create
information_unitANDunit_version(seq=1)in same call? - Is there a window between IU creation and UV creation where source_document_ref could be set?
- Does fn_iu_save currently accept source/batch params?
Decision tree:
- If fn_iu_save creates IU then UV in same function → source_document_ref MUST be passed as param to fn_iu_save → extend gateway (Step 2E)
- If IU and UV are created in separate calls → source_document_ref can be set on IU before UV → no gateway change needed
- If gateway extension is too risky → mark
batch_rollup_status=PARTIAL_REQUIRES_GATEWAY_PACKand emit only piece-level events until gateway is updated
Report: gateway_flow_analyzed=PASS, gateway_source_ref_timing=BEFORE_UV|SAME_CALL|UNKNOWN
0E. CHECK constraint names
SELECT conname FROM pg_constraint
WHERE conrelid = 'iu_notification_event'::regclass
AND conname IN ('chk_notif_event_type', 'chk_notif_event_type_stream');
-- Verify exact names before ALTER
Step 1: Install pg_cron (if needed)
CREATE EXTENSION IF NOT EXISTS pg_cron;
SELECT extname, extversion FROM pg_extension WHERE extname = 'pg_cron';
If needs shared_preload_libraries → STOP, admin action.
Step 2: Schema Additions
2A. Add columns to information_unit
ALTER TABLE information_unit
ADD COLUMN IF NOT EXISTS source_document_ref text,
ADD COLUMN IF NOT EXISTS import_batch_ref text;
COMMENT ON COLUMN information_unit.source_document_ref IS 'Opaque ref to source document for batch notification grouping. Nullable.';
COMMENT ON COLUMN information_unit.import_batch_ref IS 'Opaque ref to import batch/job. Nullable.';
2B. Create worker log table (BEFORE worker function)
CREATE TABLE iu_notification_worker_log (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
run_at timestamptz NOT NULL DEFAULT now(),
pending_pre int NOT NULL DEFAULT 0,
pending_post int NOT NULL DEFAULT 0,
groups_emitted int NOT NULL DEFAULT 0,
pieces_emitted int NOT NULL DEFAULT 0,
conflicts_skipped int NOT NULL DEFAULT 0,
rows_marked int NOT NULL DEFAULT 0,
duration_ms numeric,
error_count int NOT NULL DEFAULT 0,
error_text text
);
2C. Create staging table
CREATE TABLE iu_notification_pending (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
event_kind text NOT NULL DEFAULT 'piece_birth',
unit_id uuid NOT NULL,
canonical_address text NOT NULL,
source_document_ref text,
import_batch_ref text,
actor_ref text NOT NULL,
ref_id uuid NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
processed_at timestamptz
);
CREATE INDEX idx_notif_pending_unprocessed ON iu_notification_pending (created_at)
WHERE processed_at IS NULL;
CREATE INDEX idx_notif_pending_stable_key ON iu_notification_pending (source_document_ref, import_batch_ref)
WHERE processed_at IS NULL;
2D. Extend CHECK constraints (verify names first per 0E)
ALTER TABLE iu_notification_event
DROP CONSTRAINT chk_notif_event_type,
ADD CONSTRAINT chk_notif_event_type CHECK (
event_type IN ('comment_added','draft_created','version_applied','new_piece_created','document_imported')
);
ALTER TABLE iu_notification_event
DROP CONSTRAINT chk_notif_event_type_stream,
ADD CONSTRAINT chk_notif_event_type_stream CHECK (
(event_type='comment_added' AND event_stream='comment') OR
(event_type='draft_created' AND event_stream='review') OR
(event_type='version_applied' AND event_stream='update') OR
(event_type='new_piece_created' AND event_stream='update') OR
(event_type='document_imported' AND event_stream='update')
);
2E. Extend fn_iu_save gateway (only if 0D confirms SAME_CALL)
If fn_iu_save creates IU+UV in same function, add 2 optional nullable params:
-- NON-EXECUTABLE SKETCH — exact ALTER depends on 0D inventory
-- Add optional params: p_source_document_ref text DEFAULT NULL, p_import_batch_ref text DEFAULT NULL
-- Inside function: pass to INSERT INTO information_unit(..., source_document_ref, import_batch_ref)
-- O(1) change, backward-compatible (existing callers pass nothing → NULL)
-- Agent must read actual fn_iu_save body and apply minimal change
If gateway extension is too risky or complex → STOP, report batch_rollup_status=PARTIAL_REQUIRES_GATEWAY_PACK, proceed with piece-level only.
Step 3: Birth Hook (O(1) trigger)
CREATE OR REPLACE FUNCTION fn_iu_notif_birth()
RETURNS trigger LANGUAGE plpgsql SECURITY DEFINER SET search_path = public AS $$
BEGIN
IF NEW.version_seq <> 1 THEN RETURN NEW; END IF;
INSERT INTO iu_notification_pending
(event_kind, unit_id, canonical_address, source_document_ref, import_batch_ref, actor_ref, ref_id)
SELECT 'piece_birth', NEW.unit_id, iu.canonical_address,
iu.source_document_ref, iu.import_batch_ref, NEW.created_by, NEW.id
FROM information_unit iu WHERE iu.id = NEW.unit_id;
RETURN NEW;
END; $$;
REVOKE ALL ON FUNCTION fn_iu_notif_birth() FROM PUBLIC;
CREATE TRIGGER trg_aa_iu_notif_birth
AFTER INSERT ON unit_version FOR EACH ROW
WHEN (NEW.version_seq = 1)
EXECUTE FUNCTION fn_iu_notif_birth();
Step 4: Worker Function (exception-safe, detailed reporting)
CREATE OR REPLACE FUNCTION fn_iu_notification_worker_tick()
RETURNS jsonb LANGUAGE plpgsql SECURITY DEFINER SET search_path = public AS $$
DECLARE
v_lock boolean;
v_debounce int;
v_threshold int;
v_cutoff timestamptz;
v_pre int;
v_post int;
v_groups int := 0;
v_pieces int := 0;
v_conflicts int := 0;
v_marked int := 0;
v_start timestamptz := clock_timestamp();
v_result jsonb;
BEGIN
v_lock := pg_try_advisory_lock(hashtext('iu_notification_worker'));
IF NOT v_lock THEN RETURN '{"status":"skipped","reason":"lock_held"}'::jsonb; END IF;
BEGIN
-- Config with clamping
SELECT LEAST(300, GREATEST(60, COALESCE(
(SELECT value::int FROM dot_config WHERE key='notification.debounce_seconds'),90))) INTO v_debounce;
SELECT LEAST(50, GREATEST(2, COALESCE(
(SELECT value::int FROM dot_config WHERE key='notification.batch_piece_threshold'),2))) INTO v_threshold;
v_cutoff := now() - (v_debounce || ' seconds')::interval;
SELECT count(*) INTO v_pre FROM iu_notification_pending WHERE processed_at IS NULL AND created_at <= v_cutoff;
IF v_pre = 0 THEN
PERFORM pg_advisory_unlock(hashtext('iu_notification_worker'));
RETURN jsonb_build_object('status','idle','pending_pre',0);
END IF;
-- Step A: Identify rollup groups
CREATE TEMP TABLE _worker_eligible AS
SELECT id, unit_id, canonical_address,
COALESCE(source_document_ref, import_batch_ref) AS stable_key,
actor_ref, ref_id
FROM iu_notification_pending
WHERE processed_at IS NULL AND created_at <= v_cutoff
FOR UPDATE SKIP LOCKED;
CREATE TEMP TABLE _worker_rollup_keys AS
SELECT stable_key, count(*) AS piece_count,
min(unit_id) AS first_unit_id, min(canonical_address) AS first_address,
min(ref_id) AS first_ref_id, min(actor_ref) AS first_actor
FROM _worker_eligible
WHERE stable_key IS NOT NULL
GROUP BY stable_key
HAVING count(*) >= v_threshold;
-- Step B: Emit rollup events
WITH emitted AS (
INSERT INTO iu_notification_event
(event_type, event_stream, unit_id, canonical_address, ref_id, actor_ref, source, payload)
SELECT 'document_imported', 'update', first_unit_id, first_address, first_ref_id,
first_actor, 'worker', jsonb_build_object('piece_count', piece_count)
FROM _worker_rollup_keys
ON CONFLICT DO NOTHING
RETURNING 1
) SELECT count(*) INTO v_groups FROM emitted;
-- Count conflicts
v_conflicts := (SELECT count(*) FROM _worker_rollup_keys) - v_groups;
-- Step C: Mark rollup rows processed
UPDATE iu_notification_pending p SET processed_at = now()
FROM _worker_eligible e
WHERE p.id = e.id AND e.stable_key IN (SELECT stable_key FROM _worker_rollup_keys);
GET DIAGNOSTICS v_marked = ROW_COUNT;
-- Step D: Emit piece-level for remaining
WITH emitted AS (
INSERT INTO iu_notification_event
(event_type, event_stream, unit_id, canonical_address, ref_id, actor_ref, source)
SELECT 'new_piece_created', 'update', e.unit_id, e.canonical_address, e.ref_id, e.actor_ref, 'worker'
FROM _worker_eligible e
WHERE NOT EXISTS (SELECT 1 FROM _worker_rollup_keys rk WHERE rk.stable_key = e.stable_key)
ON CONFLICT DO NOTHING
RETURNING 1
) SELECT count(*) INTO v_pieces FROM emitted;
-- Step E: Mark remaining processed
UPDATE iu_notification_pending SET processed_at = now()
WHERE id IN (SELECT id FROM _worker_eligible e
WHERE NOT EXISTS (SELECT 1 FROM _worker_rollup_keys rk WHERE rk.stable_key = e.stable_key));
v_marked := v_marked + ROW_COUNT;
DROP TABLE IF EXISTS _worker_rollup_keys;
DROP TABLE IF EXISTS _worker_eligible;
SELECT count(*) INTO v_post FROM iu_notification_pending WHERE processed_at IS NULL;
INSERT INTO iu_notification_worker_log
(pending_pre, pending_post, groups_emitted, pieces_emitted, conflicts_skipped, rows_marked, duration_ms)
VALUES (v_pre, v_post, v_groups, v_pieces, v_conflicts, v_marked,
extract(milliseconds from clock_timestamp() - v_start));
v_result := jsonb_build_object('status','processed','pending_pre',v_pre,'pending_post',v_post,
'groups_emitted',v_groups,'pieces_emitted',v_pieces,'conflicts_skipped',v_conflicts,
'rows_marked',v_marked,'duration_ms',extract(milliseconds from clock_timestamp()-v_start));
EXCEPTION WHEN OTHERS THEN
DROP TABLE IF EXISTS _worker_rollup_keys;
DROP TABLE IF EXISTS _worker_eligible;
INSERT INTO iu_notification_worker_log
(pending_pre,pending_post,groups_emitted,pieces_emitted,duration_ms,error_count,error_text)
VALUES (COALESCE(v_pre,0),-1,v_groups,v_pieces,
extract(milliseconds from clock_timestamp()-v_start),1,SQLERRM);
v_result := jsonb_build_object('status','error','error',SQLERRM);
END;
PERFORM pg_advisory_unlock(hashtext('iu_notification_worker'));
RETURN v_result;
END; $$;
REVOKE ALL ON FUNCTION fn_iu_notification_worker_tick() FROM PUBLIC;
Step 5: Config Keys (only if dot_config exists per 0C)
INSERT INTO dot_config (key, value, description) VALUES
('notification.debounce_seconds','90','Clamped 60-300.'),
('notification.batch_piece_threshold','2','Clamped 2-50.')
ON CONFLICT (key) DO NOTHING;
Step 6: pg_cron Schedule (idempotent)
-- Check first
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM cron.job WHERE jobname = 'iu-notification-worker') THEN
PERFORM cron.schedule('iu-notification-worker','*/2 * * * *',
$$SELECT fn_iu_notification_worker_tick()$$);
END IF;
END $$;
Step 7: Tests (deterministic — backdate rows, no debounce=0)
All tests use backdated timestamps to bypass debounce window. No wall-clock wait.
| # | Test | Method | Expected | Cleanup |
|---|---|---|---|---|
| T1 | Single piece birth | fn_iu_save → verify pending row created. UPDATE pending SET created_at = now()-interval '120 seconds'. Manual tick. | 1 new_piece_created in event |
DELETE test IU+pending+event |
| T2 | Batch rollup | 3x fn_iu_save with same source_document_ref (if gateway extended). Backdate pending. Tick. | 1 document_imported (piece_count=3) |
DELETE test data |
| T3 | Unrelated pieces | 3x fn_iu_save different source_ref. Backdate. Tick. | 3 new_piece_created |
DELETE |
| T4 | No stable key | fn_iu_save without source_ref. Backdate. Tick. | 1 new_piece_created |
DELETE |
| T5 | Creator self-read | After T1, fn_iu_notification_board(creator) | implicit_self | — |
| T6 | P3D2 comment | fn_iu_comment → event immediately (NOT via pending) | comment_added in event | DELETE |
| T7 | P3D2 draft | Create draft → draft_created immediately | OK | DELETE |
| T8 | P3D2 version | Apply draft (seq>1) → version_applied immediately | OK | DELETE |
| T9 | Hot path O(1) | Inspect fn_iu_notif_birth source | 1 INSERT + 1 PK lookup | — |
| T10 | Advisory lock | 2 concurrent tick calls | Second returns skipped | — |
| T11 | Debounce | Insert pending with created_at=now(). Tick with default debounce=90. | Pending NOT processed (too recent) | DELETE |
| T12 | Rollback plan | Verify rollback script syntax + dependencies. DO NOT EXECUTE on success. | rollback_plan=PASS | — |
| T13 | Error handling | Corrupt a pending row (e.g. NULL actor_ref violating NOT NULL on event). Tick. | Worker logs error, lock released, no crash | Fix corrupted row |
Rollback policy:
rollback_plan=PASS|FAIL
rollback_executed=NO_ON_SUCCESS|YES_ON_FAIL|USER_APPROVED_DRILL
Rollback ONLY executes if implementation FAILS or User explicitly approves a drill run.
Step 8: Rollback Script (data-safe, conditional)
-- 1. Unschedule cron
DO $$ BEGIN PERFORM cron.unschedule('iu-notification-worker'); EXCEPTION WHEN OTHERS THEN NULL; END $$;
-- 2. Drop trigger + functions
DROP TRIGGER IF EXISTS trg_aa_iu_notif_birth ON unit_version;
DROP FUNCTION IF EXISTS fn_iu_notification_worker_tick();
DROP FUNCTION IF EXISTS fn_iu_notif_birth();
-- 3. Drop tables (check unprocessed first)
-- Agent: SELECT count(*) FROM iu_notification_pending WHERE processed_at IS NULL;
-- If > 0 → WARN, do not drop without user approval
DROP TABLE IF EXISTS iu_notification_worker_log;
DROP TABLE IF EXISTS iu_notification_pending;
-- 4. Config
DELETE FROM dot_config WHERE key IN ('notification.debounce_seconds','notification.batch_piece_threshold');
-- 5. CHECK constraints (only if no new event types in production)
-- Agent: SELECT count(*) FROM iu_notification_event WHERE event_type IN ('new_piece_created','document_imported');
-- If > 0 → DO NOT revert, warn user
-- If = 0:
ALTER TABLE iu_notification_event DROP CONSTRAINT chk_notif_event_type,
ADD CONSTRAINT chk_notif_event_type CHECK (event_type IN ('comment_added','draft_created','version_applied'));
ALTER TABLE iu_notification_event DROP CONSTRAINT chk_notif_event_type_stream,
ADD CONSTRAINT chk_notif_event_type_stream CHECK (
(event_type='comment_added' AND event_stream='comment') OR
(event_type='draft_created' AND event_stream='review') OR
(event_type='version_applied' AND event_stream='update'));
-- 6. Columns (only if no data)
-- Agent: SELECT count(*) FROM information_unit WHERE source_document_ref IS NOT NULL OR import_batch_ref IS NOT NULL;
-- If > 0 → DO NOT drop, warn
ALTER TABLE information_unit DROP COLUMN IF EXISTS source_document_ref, DROP COLUMN IF EXISTS import_batch_ref;
-- 7. Gateway revert (if extended): restore original fn_iu_save signature
-- Agent: only if Step 2E was applied
Verification
preflight_pg_cron_available=AVAILABLE|NOT_AVAILABLE
preflight_pg_cron_installed=INSTALLED|NOT_INSTALLED|ALREADY
preflight_pending_table=NOT_EXISTS|ALREADY_EXISTS
preflight_birth_hook=NOT_EXISTS|ALREADY_EXISTS
preflight_p3d2_triggers_o1=PASS|FAIL
preflight_dot_config=EXISTS_COMPATIBLE|NOT_FOUND|INCOMPATIBLE
preflight_constraint_names_verified=PASS|FAIL
gateway_flow_analyzed=PASS|FAIL
gateway_source_ref_timing=BEFORE_UV|SAME_CALL|UNKNOWN
gateway_extended=PASS|SKIPPED|PARTIAL_REQUIRES_GATEWAY_PACK
batch_rollup_status=FULL|PARTIAL_REQUIRES_GATEWAY_PACK
pg_cron_installed=PASS|FAIL|SKIPPED
schema_source_columns_added=PASS|FAIL
worker_log_table_created=PASS|FAIL
staging_table_created=PASS|FAIL
check_constraints_extended=PASS|FAIL
birth_trigger_created=PASS|FAIL
birth_trigger_hot_path=O(1)
worker_function_created=PASS|FAIL
worker_exception_safe=true
worker_reports_conflicts=true
config_keys_inserted=PASS|FAIL|SKIPPED_NO_DOT_CONFIG
pg_cron_scheduled=PASS|FAIL
pg_cron_idempotent=true
test_T1 through test_T13=PASS|FAIL
test_cleanup=PASS|FAIL
rollback_plan=PASS|FAIL
rollback_executed=NO_ON_SUCCESS|YES_ON_FAIL|USER_APPROVED_DRILL
no_directus_mutation=true
no_nuxt_code=true
no_hermes=true
no_codex_dispatch=true
next_required_pack=P3D4C2_DIRECTUS_NOTIFICATION_BOARD_EXPOSURE
P3D4C1 rev3 | Exception-safe | Gateway-aware | Data-safe rollback | Deterministic tests | CHƯA dispatch | Chờ GPT/User final review