KB-26E4 rev 3

23-P3D4C1 — Staging Outbox + Worker Notification Implementation Prompt (rev3)

20 min read Revision 3
p3d4c1promptimplementationstagingoutboxworkerpg_cronbirthnotificationrev3gateway

23-P3D4C1 — Staging Outbox + Worker Notification Implementation Prompt (rev3)

Date: 2026-05-08 Status: PROMPT rev3 — chờ GPT/User final review. CHƯA dispatch. Report: knowledge/dev/laws/dieu44-trien-khai/reports/23-p3d4c1-staging-outbox-worker-notification-implementation-report.md Scope: PG implementation — staging table, birth trigger, worker function, pg_cron. NO Directus/Nuxt. Rev2→Rev3: Fix 3 blockers (gateway populate timing, test debounce clamp, rollback-on-success), plus constraint name verify, worker detail reporting.


Hard Boundaries

  • ❌ KHÔNG code Nuxt
  • ❌ KHÔNG mutate Directus config
  • ❌ KHÔNG start Hermes
  • ❌ KHÔNG dispatch Codex (cần user approval)
  • ❌ KHÔNG heavy computation trên hot path
  • ❌ KHÔNG external scheduler — pg_cron only
  • ❌ KHÔNG viết tool/service mới
  • ❌ KHÔNG expose body content
  • ❌ KHÔNG backfill nặng
  • ❌ KHÔNG claim batch grouping hoàn chỉnh nếu stable ref chưa populate kịp trước birth hook
  • ✅ PG schema changes (reviewed, with data-safe rollback)
  • ✅ pg_cron extension install (admin action, with preflight)

Architecture

AI ghi miếng → fn_iu_save(... , p_source_document_ref, p_import_batch_ref) → PG (fast)
  → IU created with source/batch ref populated
  → UV(seq=1) created → birth trigger O(1) → iu_notification_pending (stable key available)
  → KHÔNG tính toán gì thêm

pg_cron worker (mỗi ~2 phút)
  → advisory lock (exception-safe)
  → đọc pending (debounce window passed)
  → gom batch theo COALESCE(source_document_ref, import_batch_ref)
  → emit durable events → iu_notification_event
  → mark processed → log → unlock

Directus/Nuxt → DEFERRED (P3D4C2)

Step 0: Preflight Checks

0A. pg_cron

SELECT name, installed_version FROM pg_available_extensions WHERE name = 'pg_cron';
-- NOT AVAILABLE → STOP (need container config)
-- AVAILABLE but not installed → proceed Step 1
-- Already installed → skip Step 1

0B. Schema state

SELECT 1 FROM information_schema.tables WHERE table_name = 'iu_notification_pending';
SELECT tgname FROM pg_trigger WHERE tgname ILIKE '%birth%notif%';
SELECT prosrc FROM pg_proc WHERE proname IN ('fn_iu_notif_version','fn_iu_notif_draft','fn_iu_notif_comment');
-- Verify O(1): no COUNT/JOIN/aggregation in source

0C. Config table

SELECT column_name, data_type FROM information_schema.columns
WHERE table_name = 'dot_config' ORDER BY ordinal_position;
-- NOT_FOUND or INCOMPATIBLE → STOP

0D. Gateway flow inventory (CRITICAL — Blocker 1)

-- Read fn_iu_save signature and body
SELECT proname, pg_get_function_arguments(oid), prosrc
FROM pg_proc WHERE proname = 'fn_iu_save';

Agent must determine:

  1. Does fn_iu_save create information_unit AND unit_version(seq=1) in same call?
  2. Is there a window between IU creation and UV creation where source_document_ref could be set?
  3. Does fn_iu_save currently accept source/batch params?

Decision tree:

  • If fn_iu_save creates IU then UV in same function → source_document_ref MUST be passed as param to fn_iu_save → extend gateway (Step 2E)
  • If IU and UV are created in separate calls → source_document_ref can be set on IU before UV → no gateway change needed
  • If gateway extension is too risky → mark batch_rollup_status=PARTIAL_REQUIRES_GATEWAY_PACK and emit only piece-level events until gateway is updated

Report: gateway_flow_analyzed=PASS, gateway_source_ref_timing=BEFORE_UV|SAME_CALL|UNKNOWN

0E. CHECK constraint names

SELECT conname FROM pg_constraint
WHERE conrelid = 'iu_notification_event'::regclass
AND conname IN ('chk_notif_event_type', 'chk_notif_event_type_stream');
-- Verify exact names before ALTER

Step 1: Install pg_cron (if needed)

CREATE EXTENSION IF NOT EXISTS pg_cron;
SELECT extname, extversion FROM pg_extension WHERE extname = 'pg_cron';

If needs shared_preload_libraries → STOP, admin action.


Step 2: Schema Additions

2A. Add columns to information_unit

ALTER TABLE information_unit
  ADD COLUMN IF NOT EXISTS source_document_ref text,
  ADD COLUMN IF NOT EXISTS import_batch_ref text;
COMMENT ON COLUMN information_unit.source_document_ref IS 'Opaque ref to source document for batch notification grouping. Nullable.';
COMMENT ON COLUMN information_unit.import_batch_ref IS 'Opaque ref to import batch/job. Nullable.';

2B. Create worker log table (BEFORE worker function)

CREATE TABLE iu_notification_worker_log (
  id              uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  run_at          timestamptz NOT NULL DEFAULT now(),
  pending_pre     int NOT NULL DEFAULT 0,
  pending_post    int NOT NULL DEFAULT 0,
  groups_emitted  int NOT NULL DEFAULT 0,
  pieces_emitted  int NOT NULL DEFAULT 0,
  conflicts_skipped int NOT NULL DEFAULT 0,
  rows_marked     int NOT NULL DEFAULT 0,
  duration_ms     numeric,
  error_count     int NOT NULL DEFAULT 0,
  error_text      text
);

2C. Create staging table

CREATE TABLE iu_notification_pending (
  id                  uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  event_kind          text NOT NULL DEFAULT 'piece_birth',
  unit_id             uuid NOT NULL,
  canonical_address   text NOT NULL,
  source_document_ref text,
  import_batch_ref    text,
  actor_ref           text NOT NULL,
  ref_id              uuid NOT NULL,
  created_at          timestamptz NOT NULL DEFAULT now(),
  processed_at        timestamptz
);
CREATE INDEX idx_notif_pending_unprocessed ON iu_notification_pending (created_at)
  WHERE processed_at IS NULL;
CREATE INDEX idx_notif_pending_stable_key ON iu_notification_pending (source_document_ref, import_batch_ref)
  WHERE processed_at IS NULL;

2D. Extend CHECK constraints (verify names first per 0E)

ALTER TABLE iu_notification_event
  DROP CONSTRAINT chk_notif_event_type,
  ADD CONSTRAINT chk_notif_event_type CHECK (
    event_type IN ('comment_added','draft_created','version_applied','new_piece_created','document_imported')
  );
ALTER TABLE iu_notification_event
  DROP CONSTRAINT chk_notif_event_type_stream,
  ADD CONSTRAINT chk_notif_event_type_stream CHECK (
    (event_type='comment_added' AND event_stream='comment') OR
    (event_type='draft_created' AND event_stream='review') OR
    (event_type='version_applied' AND event_stream='update') OR
    (event_type='new_piece_created' AND event_stream='update') OR
    (event_type='document_imported' AND event_stream='update')
  );

2E. Extend fn_iu_save gateway (only if 0D confirms SAME_CALL)

If fn_iu_save creates IU+UV in same function, add 2 optional nullable params:

-- NON-EXECUTABLE SKETCH — exact ALTER depends on 0D inventory
-- Add optional params: p_source_document_ref text DEFAULT NULL, p_import_batch_ref text DEFAULT NULL
-- Inside function: pass to INSERT INTO information_unit(..., source_document_ref, import_batch_ref)
-- O(1) change, backward-compatible (existing callers pass nothing → NULL)
-- Agent must read actual fn_iu_save body and apply minimal change

If gateway extension is too risky or complex → STOP, report batch_rollup_status=PARTIAL_REQUIRES_GATEWAY_PACK, proceed with piece-level only.


Step 3: Birth Hook (O(1) trigger)

CREATE OR REPLACE FUNCTION fn_iu_notif_birth()
RETURNS trigger LANGUAGE plpgsql SECURITY DEFINER SET search_path = public AS $$
BEGIN
  IF NEW.version_seq <> 1 THEN RETURN NEW; END IF;
  INSERT INTO iu_notification_pending
    (event_kind, unit_id, canonical_address, source_document_ref, import_batch_ref, actor_ref, ref_id)
  SELECT 'piece_birth', NEW.unit_id, iu.canonical_address,
         iu.source_document_ref, iu.import_batch_ref, NEW.created_by, NEW.id
  FROM information_unit iu WHERE iu.id = NEW.unit_id;
  RETURN NEW;
END; $$;
REVOKE ALL ON FUNCTION fn_iu_notif_birth() FROM PUBLIC;

CREATE TRIGGER trg_aa_iu_notif_birth
  AFTER INSERT ON unit_version FOR EACH ROW
  WHEN (NEW.version_seq = 1)
  EXECUTE FUNCTION fn_iu_notif_birth();

Step 4: Worker Function (exception-safe, detailed reporting)

CREATE OR REPLACE FUNCTION fn_iu_notification_worker_tick()
RETURNS jsonb LANGUAGE plpgsql SECURITY DEFINER SET search_path = public AS $$
DECLARE
  v_lock boolean;
  v_debounce int;
  v_threshold int;
  v_cutoff timestamptz;
  v_pre int;
  v_post int;
  v_groups int := 0;
  v_pieces int := 0;
  v_conflicts int := 0;
  v_marked int := 0;
  v_start timestamptz := clock_timestamp();
  v_result jsonb;
BEGIN
  v_lock := pg_try_advisory_lock(hashtext('iu_notification_worker'));
  IF NOT v_lock THEN RETURN '{"status":"skipped","reason":"lock_held"}'::jsonb; END IF;

  BEGIN
    -- Config with clamping
    SELECT LEAST(300, GREATEST(60, COALESCE(
      (SELECT value::int FROM dot_config WHERE key='notification.debounce_seconds'),90))) INTO v_debounce;
    SELECT LEAST(50, GREATEST(2, COALESCE(
      (SELECT value::int FROM dot_config WHERE key='notification.batch_piece_threshold'),2))) INTO v_threshold;
    v_cutoff := now() - (v_debounce || ' seconds')::interval;

    SELECT count(*) INTO v_pre FROM iu_notification_pending WHERE processed_at IS NULL AND created_at <= v_cutoff;
    IF v_pre = 0 THEN
      PERFORM pg_advisory_unlock(hashtext('iu_notification_worker'));
      RETURN jsonb_build_object('status','idle','pending_pre',0);
    END IF;

    -- Step A: Identify rollup groups
    CREATE TEMP TABLE _worker_eligible AS
      SELECT id, unit_id, canonical_address,
             COALESCE(source_document_ref, import_batch_ref) AS stable_key,
             actor_ref, ref_id
      FROM iu_notification_pending
      WHERE processed_at IS NULL AND created_at <= v_cutoff
      FOR UPDATE SKIP LOCKED;

    CREATE TEMP TABLE _worker_rollup_keys AS
      SELECT stable_key, count(*) AS piece_count,
             min(unit_id) AS first_unit_id, min(canonical_address) AS first_address,
             min(ref_id) AS first_ref_id, min(actor_ref) AS first_actor
      FROM _worker_eligible
      WHERE stable_key IS NOT NULL
      GROUP BY stable_key
      HAVING count(*) >= v_threshold;

    -- Step B: Emit rollup events
    WITH emitted AS (
      INSERT INTO iu_notification_event
        (event_type, event_stream, unit_id, canonical_address, ref_id, actor_ref, source, payload)
      SELECT 'document_imported', 'update', first_unit_id, first_address, first_ref_id,
             first_actor, 'worker', jsonb_build_object('piece_count', piece_count)
      FROM _worker_rollup_keys
      ON CONFLICT DO NOTHING
      RETURNING 1
    ) SELECT count(*) INTO v_groups FROM emitted;
    -- Count conflicts
    v_conflicts := (SELECT count(*) FROM _worker_rollup_keys) - v_groups;

    -- Step C: Mark rollup rows processed
    UPDATE iu_notification_pending p SET processed_at = now()
    FROM _worker_eligible e
    WHERE p.id = e.id AND e.stable_key IN (SELECT stable_key FROM _worker_rollup_keys);
    GET DIAGNOSTICS v_marked = ROW_COUNT;

    -- Step D: Emit piece-level for remaining
    WITH emitted AS (
      INSERT INTO iu_notification_event
        (event_type, event_stream, unit_id, canonical_address, ref_id, actor_ref, source)
      SELECT 'new_piece_created', 'update', e.unit_id, e.canonical_address, e.ref_id, e.actor_ref, 'worker'
      FROM _worker_eligible e
      WHERE NOT EXISTS (SELECT 1 FROM _worker_rollup_keys rk WHERE rk.stable_key = e.stable_key)
      ON CONFLICT DO NOTHING
      RETURNING 1
    ) SELECT count(*) INTO v_pieces FROM emitted;

    -- Step E: Mark remaining processed
    UPDATE iu_notification_pending SET processed_at = now()
    WHERE id IN (SELECT id FROM _worker_eligible e
      WHERE NOT EXISTS (SELECT 1 FROM _worker_rollup_keys rk WHERE rk.stable_key = e.stable_key));
    v_marked := v_marked + ROW_COUNT;

    DROP TABLE IF EXISTS _worker_rollup_keys;
    DROP TABLE IF EXISTS _worker_eligible;

    SELECT count(*) INTO v_post FROM iu_notification_pending WHERE processed_at IS NULL;

    INSERT INTO iu_notification_worker_log
      (pending_pre, pending_post, groups_emitted, pieces_emitted, conflicts_skipped, rows_marked, duration_ms)
    VALUES (v_pre, v_post, v_groups, v_pieces, v_conflicts, v_marked,
            extract(milliseconds from clock_timestamp() - v_start));

    v_result := jsonb_build_object('status','processed','pending_pre',v_pre,'pending_post',v_post,
      'groups_emitted',v_groups,'pieces_emitted',v_pieces,'conflicts_skipped',v_conflicts,
      'rows_marked',v_marked,'duration_ms',extract(milliseconds from clock_timestamp()-v_start));

  EXCEPTION WHEN OTHERS THEN
    DROP TABLE IF EXISTS _worker_rollup_keys;
    DROP TABLE IF EXISTS _worker_eligible;
    INSERT INTO iu_notification_worker_log
      (pending_pre,pending_post,groups_emitted,pieces_emitted,duration_ms,error_count,error_text)
    VALUES (COALESCE(v_pre,0),-1,v_groups,v_pieces,
            extract(milliseconds from clock_timestamp()-v_start),1,SQLERRM);
    v_result := jsonb_build_object('status','error','error',SQLERRM);
  END;

  PERFORM pg_advisory_unlock(hashtext('iu_notification_worker'));
  RETURN v_result;
END; $$;
REVOKE ALL ON FUNCTION fn_iu_notification_worker_tick() FROM PUBLIC;

Step 5: Config Keys (only if dot_config exists per 0C)

INSERT INTO dot_config (key, value, description) VALUES
  ('notification.debounce_seconds','90','Clamped 60-300.'),
  ('notification.batch_piece_threshold','2','Clamped 2-50.')
ON CONFLICT (key) DO NOTHING;

Step 6: pg_cron Schedule (idempotent)

-- Check first
DO $$ BEGIN
  IF NOT EXISTS (SELECT 1 FROM cron.job WHERE jobname = 'iu-notification-worker') THEN
    PERFORM cron.schedule('iu-notification-worker','*/2 * * * *',
      $$SELECT fn_iu_notification_worker_tick()$$);
  END IF;
END $$;

Step 7: Tests (deterministic — backdate rows, no debounce=0)

All tests use backdated timestamps to bypass debounce window. No wall-clock wait.

# Test Method Expected Cleanup
T1 Single piece birth fn_iu_save → verify pending row created. UPDATE pending SET created_at = now()-interval '120 seconds'. Manual tick. 1 new_piece_created in event DELETE test IU+pending+event
T2 Batch rollup 3x fn_iu_save with same source_document_ref (if gateway extended). Backdate pending. Tick. 1 document_imported (piece_count=3) DELETE test data
T3 Unrelated pieces 3x fn_iu_save different source_ref. Backdate. Tick. 3 new_piece_created DELETE
T4 No stable key fn_iu_save without source_ref. Backdate. Tick. 1 new_piece_created DELETE
T5 Creator self-read After T1, fn_iu_notification_board(creator) implicit_self
T6 P3D2 comment fn_iu_comment → event immediately (NOT via pending) comment_added in event DELETE
T7 P3D2 draft Create draft → draft_created immediately OK DELETE
T8 P3D2 version Apply draft (seq>1) → version_applied immediately OK DELETE
T9 Hot path O(1) Inspect fn_iu_notif_birth source 1 INSERT + 1 PK lookup
T10 Advisory lock 2 concurrent tick calls Second returns skipped
T11 Debounce Insert pending with created_at=now(). Tick with default debounce=90. Pending NOT processed (too recent) DELETE
T12 Rollback plan Verify rollback script syntax + dependencies. DO NOT EXECUTE on success. rollback_plan=PASS
T13 Error handling Corrupt a pending row (e.g. NULL actor_ref violating NOT NULL on event). Tick. Worker logs error, lock released, no crash Fix corrupted row

Rollback policy:

rollback_plan=PASS|FAIL
rollback_executed=NO_ON_SUCCESS|YES_ON_FAIL|USER_APPROVED_DRILL

Rollback ONLY executes if implementation FAILS or User explicitly approves a drill run.


Step 8: Rollback Script (data-safe, conditional)

-- 1. Unschedule cron
DO $$ BEGIN PERFORM cron.unschedule('iu-notification-worker'); EXCEPTION WHEN OTHERS THEN NULL; END $$;

-- 2. Drop trigger + functions
DROP TRIGGER IF EXISTS trg_aa_iu_notif_birth ON unit_version;
DROP FUNCTION IF EXISTS fn_iu_notification_worker_tick();
DROP FUNCTION IF EXISTS fn_iu_notif_birth();

-- 3. Drop tables (check unprocessed first)
-- Agent: SELECT count(*) FROM iu_notification_pending WHERE processed_at IS NULL;
-- If > 0 → WARN, do not drop without user approval
DROP TABLE IF EXISTS iu_notification_worker_log;
DROP TABLE IF EXISTS iu_notification_pending;

-- 4. Config
DELETE FROM dot_config WHERE key IN ('notification.debounce_seconds','notification.batch_piece_threshold');

-- 5. CHECK constraints (only if no new event types in production)
-- Agent: SELECT count(*) FROM iu_notification_event WHERE event_type IN ('new_piece_created','document_imported');
-- If > 0 → DO NOT revert, warn user
-- If = 0:
ALTER TABLE iu_notification_event DROP CONSTRAINT chk_notif_event_type,
  ADD CONSTRAINT chk_notif_event_type CHECK (event_type IN ('comment_added','draft_created','version_applied'));
ALTER TABLE iu_notification_event DROP CONSTRAINT chk_notif_event_type_stream,
  ADD CONSTRAINT chk_notif_event_type_stream CHECK (
    (event_type='comment_added' AND event_stream='comment') OR
    (event_type='draft_created' AND event_stream='review') OR
    (event_type='version_applied' AND event_stream='update'));

-- 6. Columns (only if no data)
-- Agent: SELECT count(*) FROM information_unit WHERE source_document_ref IS NOT NULL OR import_batch_ref IS NOT NULL;
-- If > 0 → DO NOT drop, warn
ALTER TABLE information_unit DROP COLUMN IF EXISTS source_document_ref, DROP COLUMN IF EXISTS import_batch_ref;

-- 7. Gateway revert (if extended): restore original fn_iu_save signature
-- Agent: only if Step 2E was applied

Verification

preflight_pg_cron_available=AVAILABLE|NOT_AVAILABLE
preflight_pg_cron_installed=INSTALLED|NOT_INSTALLED|ALREADY
preflight_pending_table=NOT_EXISTS|ALREADY_EXISTS
preflight_birth_hook=NOT_EXISTS|ALREADY_EXISTS
preflight_p3d2_triggers_o1=PASS|FAIL
preflight_dot_config=EXISTS_COMPATIBLE|NOT_FOUND|INCOMPATIBLE
preflight_constraint_names_verified=PASS|FAIL
gateway_flow_analyzed=PASS|FAIL
gateway_source_ref_timing=BEFORE_UV|SAME_CALL|UNKNOWN
gateway_extended=PASS|SKIPPED|PARTIAL_REQUIRES_GATEWAY_PACK
batch_rollup_status=FULL|PARTIAL_REQUIRES_GATEWAY_PACK
pg_cron_installed=PASS|FAIL|SKIPPED
schema_source_columns_added=PASS|FAIL
worker_log_table_created=PASS|FAIL
staging_table_created=PASS|FAIL
check_constraints_extended=PASS|FAIL
birth_trigger_created=PASS|FAIL
birth_trigger_hot_path=O(1)
worker_function_created=PASS|FAIL
worker_exception_safe=true
worker_reports_conflicts=true
config_keys_inserted=PASS|FAIL|SKIPPED_NO_DOT_CONFIG
pg_cron_scheduled=PASS|FAIL
pg_cron_idempotent=true
test_T1 through test_T13=PASS|FAIL
test_cleanup=PASS|FAIL
rollback_plan=PASS|FAIL
rollback_executed=NO_ON_SUCCESS|YES_ON_FAIL|USER_APPROVED_DRILL
no_directus_mutation=true
no_nuxt_code=true
no_hermes=true
no_codex_dispatch=true
next_required_pack=P3D4C2_DIRECTUS_NOTIFICATION_BOARD_EXPOSURE

P3D4C1 rev3 | Exception-safe | Gateway-aware | Data-safe rollback | Deterministic tests | CHƯA dispatch | Chờ GPT/User final review

Back to Knowledge Hub knowledge/dev/laws/dieu44-trien-khai/prompts/23-p3d4c1-staging-outbox-worker-notification-implementation-prompt.md