# Unified OA Event Flow Unified OA Event Flow 是 UniDesk 的事件流与统计投影基础设施。它独立部署为主 server 用户服务 `id=oa-event-flow`,为 Pipeline、Code Queue 以及后续 agent runtime 提供统一的事实事件写入、按 tag 订阅、事件表查询和统计中心读接口。Pipeline 专有的流程推进规则仍以 `docs/reference/pipeline-oa-event-flow.md` 为准;本文件只定义跨业务共享的事件存储、订阅和统计中心契约。 ## Architecture Boundary - `oa-event-flow` 必须是独立微服务,运行在主 server Compose 中,只暴露 Docker 内网端口,不开放公网端口;浏览器只能通过 UniDesk frontend 同源 `/api/microservices/oa-event-flow/...` 访问。 - 业务服务只发布事实事件或控制意图,不直接修改其他服务状态;统计中心只消费事件流更新数据库投影,并通过读接口和派生事件暴露统计变化。 - 前端页面不得轮询业务 transcript、JSONL、runner 私有文件或旧 SSE 来推断 STEP/Trace 统计;应订阅 `trace-stats-updated` 等统计变化事件,并按需拉取 `oa_trace_stats` 投影。 - 事件表、投影表和 projection offset 必须写入主 PostgreSQL;`.state/` 只能保存日志、缓存或可重建工件,不能作为事件或统计权威状态。 - 兼容迁移只能在同一重构闭环内存在;交付态不得保留两套互相竞争的 STEP 统计、Trace 刷新或 Pipeline 推进权威来源。 ## Event Envelope 所有写入 `oa_events` 的事件都必须使用统一 envelope: - `eventId`:全局稳定幂等键;重复发布同一事实必须命中同一 `eventId`,不能造成重复统计。 - `type`:事实事件或派生事件类型,例如 `trace-step-created`、`task-updated`、`node-finished`、`trace-stats-updated`。 - `createdAt`:事件发生时间;服务端可补齐但不得覆盖调用方提供的真实事实时间。 - `sourceKind` 与 `sourceId`:事件来源类型和来源实例,例如 `service/code-queue`、`service/pipeline`、`projection/trace-stats`。 - `aggregateType` 与 `aggregateId`:事件所属聚合,例如 `task/`、`pipeline/`、`epoch/`。 - `correlationId` 与可选 `causationId`:跨服务追踪和派生事件回溯链路。 - `tags`:字符串数组,用于 scope、业务标签、订阅和投影筛选。 - `payload`:事件事实数据;只放事实和必要摘要,不放未分页的大 transcript、完整日志或 secret。 ## Tag Contract Tag 是 OA 事件流的订阅和投影索引。所有 tag 必须是稳定字符串,不依赖 UI 文案。 - 共享 tag:`service:`、`trace`、`stats`、`control`、`diagnostics`。 - Code Queue scope:`service:code-queue`、`task:`、`queue:`、`attempt:`。 - Pipeline scope:`service:pipeline`、`pipeline:`、`epoch:`、`node:`、`attempt:`、`procedure:`。 - 订阅接口按 tag 取交集:`tags=service:code-queue,task:` 只返回同时带有这两个 tag 的事件。 - 事件生产者必须自动附加业务 scope tag;前端只传当前页面或当前选择的 tag,不在浏览器里拼接跨业务控制语义。 ## Event API `oa-event-flow` 至少提供以下后端接口: - `GET /health`:返回数据库 ready、事件数、投影数、SSE client 数和服务启动时间。 - `POST /api/events`:批量或单条写入 OA 事件;服务端按 `eventId` 幂等去重,成功写入后同步推进内置投影。 - `GET /api/events?tags=&afterSeq=&limit=`:查询事件表,默认按 `sequence ASC` 返回有界结果。 - `GET /api/events/stream?tags=&afterSeq=`:SSE 订阅事件表;连接建立后先补发 `afterSeq` 之后的有界 backlog,再推送 live event。 - `GET /api/stats/trace?scopeId=` 或 `scopeIds=`:读取 Trace/STEP 统计投影,`scope` 形如 `task:`、`task::attempt:` 或 `pipeline:`。 - `GET /api/diagnostics`:返回事件表、投影表、近期事件类型、近期统计变化和订阅客户端摘要。 ## Statistics Center 统计中心是事件流内置投影,不由前端或业务页面临时重算: - `oa_trace_stats` 是 Trace/STEP 统计权威表,按 `scopeId` 保存 `statsRevision`、`stepCount`、`llmStepCount`、`readCount`、`editCount`、`runCount`、`errorCount`、`traceLineCount`、`outputMaxSeq`、attempt 摘要和 `updatedAt`;其中 `stepCount` 表示 TraceView 可见执行行数,不等同于工具调用数,工具调用数由 `readCount+editCount+runCount` 表达。 - Code Queue 任务级 scope `task:` 只表达整个任务的累计统计;每个执行过程摘要必须使用独立 attempt scope `task::attempt:`。同一个 `trace-step-created` 事实可由统计投影同时写入任务级和 attempt 级 `oa_trace_steps/oa_trace_stats`,但前端 attempt 卡片只能读取自己的 attempt scope,不得复用任务级累计统计。 - 投影只消费已经写入 `oa_events` 的事实事件;不得直接读取 Code Queue transcript、Pipeline `.state` 或 runner JSONL 作为实时统计来源。 - `trace-stats-snapshot` 可以作为迁移、服务重启或事件乱序时的权威种子事件;它仍必须进入事件表,由统计投影消费,不能绕过事件流直接写投影。投影消费 `trace-step-created` 时必须以 `oa_trace_steps` 的幂等行集合更新统计,不能把 snapshot 和 step 事件对同一个 step 重复累加。 - 投影更新后必须发布派生事件 `trace-stats-updated`,带 `stats` payload 和 `stats` tag;前端订阅该事件来更新左侧 STEP、Trace 摘要指标和 OA 可视化页面。 - 统计投影必须按 `eventId` 幂等;重复事实不能重复增加 STEP 或 read/edit/run 计数。 ## Code Queue Integration - Code Queue 不再维护独立 SSE 作为前端 Trace/STEP 刷新权威;任务输出、状态变更、queue 变更和统计快照必须发布到 `oa-event-flow`。 - Code Queue 左侧 task card 的 `STEP`、选中 task 的 Trace Summary 和全局统计只能读取 `oa_trace_stats`;本地 task JSON 中的历史字段只能作为发布 snapshot 事件的输入,不作为前端权威统计来源。统计尚未投影完成时必须明确显示 `statsSource=unavailable` 或 `STEP --`,不得回退到 transcript、本地 `stepCount` 或前端重算。 - 运行中每个新的 TraceView 可见执行行都必须发布 `trace-step-created`,并带 `task:`、`queue:`、`attempt:`、`service:code-queue`、`trace` tag,以及 payload 中的 `scopeId=task:`、`attemptIndex` 和 `attemptScopeId=task::attempt:`;统计中心据此幂等更新任务级累计统计和 attempt 级独立统计,message/error 行增加 STEP 或 error,system 行默认仅保留在任务原始输出/数据库中,不进入 STEP 计数且不伪装为工具调用。 - Trace Summary 顶部执行摘要读取任务级 `task:`;执行过程摘要 `#` 读取 `task::attempt:`。如果 attempt scope 尚未投影完成,必须显示 `statsSource=unavailable` 或 `--`,不得回退到任务级累计统计、transcript 重算或旧本地字段。 - 任务入队、开始、终态、移动 queue、标记已读等状态事实必须发布 `task-updated` 或更具体的事实事件,供事件表和后续审计使用。 - Code Queue judge 每次真正发起 MiniMax LLM 请求前必须发布 `judge-llm-request` 诊断事件,tag 至少包含 `service:code-queue`、`task:`、`attempt:`、`judge`、`diagnostics`,payload 必须包含不带 API key 的最终 request payload/messages、prompt/payload 尺寸和 repairAttempt;随后发布 `judge-llm-response`、`judge-json-parse-error`(如有)和 `judge-result`,用于追溯“最终发给 judge LLM 的 prompt”和安全覆盖结果。 - Code Queue 服务启动后可对 PostgreSQL 中已有任务回放每个 TraceView 可见执行行的 `trace-step-created`,并发布 `trace-stats-snapshot` 事件完成统计中心种子同步;回放必须使用相同 `eventId` 保持幂等,不得阻塞队列恢复。历史回放必须按 attempt start 行推导 `attemptIndex`,确保重建投影时 attempt scope 可独立恢复。 ## Pipeline Integration - Pipeline 运行时的 OA 事实、控制事件和 Gantt 取证事件应写入同一个 `oa-event-flow`,并带 `service:pipeline` 与 Pipeline scope tag。 - 当 Pipeline 后端部署在远端 provider 且尚未直接写入主 server `oa-event-flow` 时,主 server `oa-event-flow` 必须通过 backend-core 内网代理启用单一路径 Pipeline snapshot bridge:只读取 Pipeline 的 `/api/snapshot` 与 `/api/oa-event-flow/diagnostics` 结构化接口,并发布稳定的 `pipeline-run-snapshot` 迁移事件,确保统一事件表中可见 `service:pipeline` 与 `epoch:` 事实。bridge 只做迁移期事件可见性接入,不得再实现“先读 ledger,失败后 snapshot”的双路径 fallback;Pipeline 直接发布到统一事件流后,应删除该 bridge。 - Pipeline 后端可继续向 UniDesk 暴露 Pipeline 专用 snapshot、Gantt DTO、node detail 和 control API,但这些 DTO 的权威来源必须是统一 OA 事件流,而不是旧文件通道或旧 batch 推进状态。 - UniDesk Pipeline 页面可以保留结构化 Pipeline 诊断面板;跨业务事件表、tag 订阅和原始 OA 事件可见性统一进入 `用户服务 / OA Event Flow` 页面。 - Pipeline 接入完成态必须同时满足本文件的共享事件流契约和 `docs/reference/pipeline-oa-event-flow.md` 的 Pipeline 控制流去残留门禁。 ## Frontend Visibility `用户服务 / OA Event Flow` 是事件表和统计中心的可视化入口: - 页面必须显示服务健康、事件总数、最新 sequence、Trace stats scope 数和 live stream 状态。 - 页面必须提供 tag 过滤输入,展示事件表列:sequence、type、source、aggregate、tags、createdAt、payload 摘要和显式 `查看原始JSON`。 - 页面必须展示 Trace stats 表,至少包含 scope、service、subject、STEP、read/edit/run、error、outputMaxSeq、statsRevision 和更新时间。 - 默认界面不得裸铺完整 JSON;完整事件和统计记录只能通过显式原始 JSON 按钮打开。 ## Completion Gates - `config.json`、`docker-compose.yml`、CLI `server rebuild`、`server status/logs` 和 E2E 必须认识 `oa-event-flow`。 - Code Queue 前端事件订阅必须指向 `oa-event-flow` 的 tag stream;不得再依赖 Code Queue 私有 `/api/events` SSE 作为刷新权威。 - Code Queue Trace Summary 和 task list 响应必须带 `traceStats.source=oa-event-flow` 或明确的 `statsSource=unavailable` 状态;unavailable 时不得返回本地重算 STEP 伪装为权威统计。 - Code Queue TraceView 的 attempt 执行摘要必须带 `attemptScopeId`、attempt 级 `traceStats.source=oa-event-flow` 或明确的 `statsSource=unavailable`;前端收到 `task-attempt` 的 `trace-stats-updated` 时只能刷新选中任务 Trace Summary,不能把 attempt 统计覆盖到左侧 task 级 STEP。 - Pipeline bridge 启用时,`/api/diagnostics` 必须暴露 `pipelineBridge.mode=snapshot` 或各 run 的 `bridgeMode=snapshot` 状态,`GET /api/events?tags=service:pipeline` 必须能看到带 `epoch:` 的 Pipeline 事件;远端 Pipeline 后端直接发布到统一服务后,应删除 bridge 作为完成态清理项。 - 新增或修改 OA 事件流页面、Code Queue TraceView 或 Pipeline OA 可视化后,必须重建并验证 `oa-event-flow`、相关业务后端和 `frontend`。