93 lines
12 KiB
Markdown
93 lines
12 KiB
Markdown
# Unified OA Event Flow
|
||
|
||
Unified OA Event Flow 是 UniDesk 的事件流与统计投影基础设施。它独立部署为主 server 用户服务 `id=oa-event-flow`,为 Pipeline、Code Queue 以及后续 agent runtime 提供统一的事实事件写入、按 tag 订阅、事件表查询和统计中心读接口。Pipeline 专有的流程推进规则仍以 `docs/reference/pipeline-oa-event-flow.md` 为准;本文件只定义跨业务共享的事件存储、订阅和统计中心契约。
|
||
|
||
## Architecture Boundary
|
||
|
||
- `oa-event-flow` 必须是独立微服务,运行在主 server Compose 中,只暴露 Docker 内网端口,不开放公网端口;浏览器只能通过 UniDesk frontend 同源 `/api/microservices/oa-event-flow/...` 访问。
|
||
- 业务服务只发布事实事件或控制意图,不直接修改其他服务状态;统计中心只消费事件流更新数据库投影,并通过读接口和派生事件暴露统计变化。
|
||
- 前端页面不得轮询业务 transcript、JSONL、runner 私有文件或旧 SSE 来推断 STEP/Trace 统计;应订阅 `trace-stats-updated` 等统计变化事件,并按需拉取 `oa_trace_stats` 投影。
|
||
- 事件表、投影表和 projection offset 必须写入主 PostgreSQL;`.state/` 只能保存日志、缓存或可重建工件,不能作为事件或统计权威状态。
|
||
- 兼容迁移只能在同一重构闭环内存在;交付态不得保留两套互相竞争的 STEP 统计、Trace 刷新或 Pipeline 推进权威来源。
|
||
|
||
## Event Envelope
|
||
|
||
所有写入 `oa_events` 的事件都必须使用统一 envelope:
|
||
|
||
- `eventId`:全局稳定幂等键;重复发布同一事实必须命中同一 `eventId`,不能造成重复统计。
|
||
- `type`:事实事件或派生事件类型,例如 `trace-step-created`、`task-updated`、`node-finished`、`trace-stats-updated`。
|
||
- `createdAt`:事件发生时间;服务端可补齐但不得覆盖调用方提供的真实事实时间。
|
||
- `sourceKind` 与 `sourceId`:事件来源类型和来源实例,例如 `service/code-queue`、`service/pipeline`、`projection/trace-stats`。
|
||
- `aggregateType` 与 `aggregateId`:事件所属聚合,例如 `task/<taskId>`、`pipeline/<pipelineId>`、`epoch/<runId>`。
|
||
- `correlationId` 与可选 `causationId`:跨服务追踪和派生事件回溯链路。
|
||
- `tags`:字符串数组,用于 scope、业务标签、订阅和投影筛选。
|
||
- `payload`:事件事实数据;只放事实和必要摘要,不放未分页的大 transcript、完整日志或 secret。
|
||
|
||
## Tag Contract
|
||
|
||
Tag 是 OA 事件流的订阅和投影索引。所有 tag 必须是稳定字符串,不依赖 UI 文案。
|
||
|
||
- 共享 tag:`service:<id>`、`trace`、`stats`、`control`、`diagnostics`。
|
||
- Code Queue scope:`service:code-queue`、`task:<taskId>`、`queue:<queueId>`、`attempt:<index>`。
|
||
- Pipeline scope:`service:pipeline`、`pipeline:<pipelineId>`、`epoch:<runId>`、`node:<nodeId>`、`attempt:<attemptId>`、`procedure:<procedureRunId>`。
|
||
- 订阅接口按 tag 取交集:`tags=service:code-queue,task:<taskId>` 只返回同时带有这两个 tag 的事件。
|
||
- 事件生产者必须自动附加业务 scope tag;前端只传当前页面或当前选择的 tag,不在浏览器里拼接跨业务控制语义。
|
||
|
||
## Event API
|
||
|
||
`oa-event-flow` 至少提供以下后端接口:
|
||
|
||
- `GET /health`:返回数据库 ready、事件数、投影数、SSE client 数和服务启动时间。
|
||
- `POST /api/events`:批量或单条写入 OA 事件;服务端按 `eventId` 幂等去重,成功写入后同步推进内置投影。
|
||
- `GET /api/events?tags=<tag,...>&afterSeq=<n>&limit=<n>`:查询事件表,默认按 `sequence ASC` 返回有界结果。
|
||
- `GET /api/events/stream?tags=<tag,...>&afterSeq=<n>`:SSE 订阅事件表;连接建立后先补发 `afterSeq` 之后的有界 backlog,再推送 live event。
|
||
- `GET /api/stats/trace?scopeId=<scope>` 或 `scopeIds=<scope,...>`:读取 Trace/STEP 统计投影,`scope` 形如 `task:<taskId>`、`task:<taskId>:attempt:<index>` 或 `pipeline:<pipelineId>`。
|
||
- `GET /api/diagnostics`:返回事件表、投影表、近期事件类型、近期统计变化和订阅客户端摘要。
|
||
|
||
## Statistics Center
|
||
|
||
统计中心是事件流内置投影,不由前端或业务页面临时重算:
|
||
|
||
- `oa_trace_stats` 是 Trace/STEP 统计权威表,按 `scopeId` 保存 `statsRevision`、`stepCount`、`llmStepCount`、`readCount`、`editCount`、`runCount`、`errorCount`、`traceLineCount`、`outputMaxSeq`、attempt 摘要和 `updatedAt`;其中 `stepCount` 表示 TraceView 可见执行行数,不等同于工具调用数,工具调用数由 `readCount+editCount+runCount` 表达。
|
||
- Code Queue 任务级 scope `task:<taskId>` 只表达整个任务的累计统计;每个执行过程摘要必须使用独立 attempt scope `task:<taskId>:attempt:<index>`。同一个 `trace-step-created` 事实可由统计投影同时写入任务级和 attempt 级 `oa_trace_steps/oa_trace_stats`,但前端 attempt 卡片只能读取自己的 attempt scope,不得复用任务级累计统计。
|
||
- 投影只消费已经写入 `oa_events` 的事实事件;不得直接读取 Code Queue transcript、Pipeline `.state` 或 runner JSONL 作为实时统计来源。
|
||
- `trace-stats-snapshot` 可以作为迁移、服务重启或事件乱序时的权威种子事件;它仍必须进入事件表,由统计投影消费,不能绕过事件流直接写投影。投影消费 `trace-step-created` 时必须以 `oa_trace_steps` 的幂等行集合更新统计,不能把 snapshot 和 step 事件对同一个 step 重复累加。
|
||
- 投影更新后必须发布派生事件 `trace-stats-updated`,带 `stats` payload 和 `stats` tag;前端订阅该事件来更新左侧 STEP、Trace 摘要指标和 OA 可视化页面。
|
||
- 统计投影必须按 `eventId` 幂等;重复事实不能重复增加 STEP 或 read/edit/run 计数。
|
||
|
||
## Code Queue Integration
|
||
|
||
- Code Queue 不再维护独立 SSE 作为前端 Trace/STEP 刷新权威;任务输出、状态变更、queue 变更和统计快照必须发布到 `oa-event-flow`。
|
||
- Code Queue 左侧 task card 的 `STEP`、选中 task 的 Trace Summary 和全局统计只能读取 `oa_trace_stats`;本地 task JSON 中的历史字段只能作为发布 snapshot 事件的输入,不作为前端权威统计来源。统计尚未投影完成时必须明确显示 `statsSource=unavailable` 或 `STEP --`,不得回退到 transcript、本地 `stepCount` 或前端重算。
|
||
- 运行中每个新的 TraceView 可见执行行都必须发布 `trace-step-created`,并带 `task:<taskId>`、`queue:<queueId>`、`attempt:<index>`、`service:code-queue`、`trace` tag,以及 payload 中的 `scopeId=task:<taskId>`、`attemptIndex` 和 `attemptScopeId=task:<taskId>:attempt:<index>`;统计中心据此幂等更新任务级累计统计和 attempt 级独立统计,message/error 行增加 STEP 或 error,system 行默认仅保留在任务原始输出/数据库中,不进入 STEP 计数且不伪装为工具调用。
|
||
- Trace Summary 顶部执行摘要读取任务级 `task:<taskId>`;执行过程摘要 `#<index>` 读取 `task:<taskId>:attempt:<index>`。如果 attempt scope 尚未投影完成,必须显示 `statsSource=unavailable` 或 `--`,不得回退到任务级累计统计、transcript 重算或旧本地字段。
|
||
- 任务入队、开始、终态、移动 queue、标记已读等状态事实必须发布 `task-updated` 或更具体的事实事件,供事件表和后续审计使用。
|
||
- Code Queue judge 每次真正发起 MiniMax LLM 请求前必须发布 `judge-llm-request` 诊断事件,tag 至少包含 `service:code-queue`、`task:<taskId>`、`attempt:<index>`、`judge`、`diagnostics`,payload 必须包含不带 API key 的最终 request payload/messages、prompt/payload 尺寸和 repairAttempt;随后发布 `judge-llm-response`、`judge-json-parse-error`(如有)和 `judge-result`,用于追溯“最终发给 judge LLM 的 prompt”和安全覆盖结果。
|
||
- Code Queue 服务启动后可对 PostgreSQL 中已有任务回放每个 TraceView 可见执行行的 `trace-step-created`,并发布 `trace-stats-snapshot` 事件完成统计中心种子同步;回放必须使用相同 `eventId` 保持幂等,不得阻塞队列恢复。历史回放必须按 attempt start 行推导 `attemptIndex`,确保重建投影时 attempt scope 可独立恢复。
|
||
|
||
## Pipeline Integration
|
||
|
||
- Pipeline 运行时的 OA 事实、控制事件和 Gantt 取证事件应写入同一个 `oa-event-flow`,并带 `service:pipeline` 与 Pipeline scope tag。
|
||
- 当 Pipeline 后端部署在远端 provider 且尚未直接写入主 server `oa-event-flow` 时,主 server `oa-event-flow` 必须通过 backend-core 内网代理启用单一路径 Pipeline snapshot bridge:只读取 Pipeline 的 `/api/snapshot` 与 `/api/oa-event-flow/diagnostics` 结构化接口,并发布稳定的 `pipeline-run-snapshot` 迁移事件,确保统一事件表中可见 `service:pipeline` 与 `epoch:<runId>` 事实。bridge 只做迁移期事件可见性接入,不得再实现“先读 ledger,失败后 snapshot”的双路径 fallback;Pipeline 直接发布到统一事件流后,应删除该 bridge。
|
||
- Pipeline 后端可继续向 UniDesk 暴露 Pipeline 专用 snapshot、Gantt DTO、node detail 和 control API,但这些 DTO 的权威来源必须是统一 OA 事件流,而不是旧文件通道或旧 batch 推进状态。
|
||
- UniDesk Pipeline 页面可以保留结构化 Pipeline 诊断面板;跨业务事件表、tag 订阅和原始 OA 事件可见性统一进入 `用户服务 / OA Event Flow` 页面。
|
||
- Pipeline 接入完成态必须同时满足本文件的共享事件流契约和 `docs/reference/pipeline-oa-event-flow.md` 的 Pipeline 控制流去残留门禁。
|
||
|
||
## Frontend Visibility
|
||
|
||
`用户服务 / OA Event Flow` 是事件表和统计中心的可视化入口:
|
||
|
||
- 页面必须显示服务健康、事件总数、最新 sequence、Trace stats scope 数和 live stream 状态。
|
||
- 页面必须提供 tag 过滤输入,展示事件表列:sequence、type、source、aggregate、tags、createdAt、payload 摘要和显式 `查看原始JSON`。
|
||
- 页面必须展示 Trace stats 表,至少包含 scope、service、subject、STEP、read/edit/run、error、outputMaxSeq、statsRevision 和更新时间。
|
||
- 默认界面不得裸铺完整 JSON;完整事件和统计记录只能通过显式原始 JSON 按钮打开。
|
||
|
||
## Completion Gates
|
||
|
||
- `config.json`、`docker-compose.yml`、CLI `server rebuild`、`server status/logs` 和 E2E 必须认识 `oa-event-flow`。
|
||
- Code Queue 前端事件订阅必须指向 `oa-event-flow` 的 tag stream;不得再依赖 Code Queue 私有 `/api/events` SSE 作为刷新权威。
|
||
- Code Queue Trace Summary 和 task list 响应必须带 `traceStats.source=oa-event-flow` 或明确的 `statsSource=unavailable` 状态;unavailable 时不得返回本地重算 STEP 伪装为权威统计。
|
||
- Code Queue TraceView 的 attempt 执行摘要必须带 `attemptScopeId`、attempt 级 `traceStats.source=oa-event-flow` 或明确的 `statsSource=unavailable`;前端收到 `task-attempt` 的 `trace-stats-updated` 时只能刷新选中任务 Trace Summary,不能把 attempt 统计覆盖到左侧 task 级 STEP。
|
||
- Pipeline bridge 启用时,`/api/diagnostics` 必须暴露 `pipelineBridge.mode=snapshot` 或各 run 的 `bridgeMode=snapshot` 状态,`GET /api/events?tags=service:pipeline` 必须能看到带 `epoch:<runId>` 的 Pipeline 事件;远端 Pipeline 后端直接发布到统一服务后,应删除 bridge 作为完成态清理项。
|
||
- 新增或修改 OA 事件流页面、Code Queue TraceView 或 Pipeline OA 可视化后,必须重建并验证 `oa-event-flow`、相关业务后端和 `frontend`。
|