12 KiB
12 KiB
Unified OA Event Flow
Unified OA Event Flow 是 UniDesk 的事件流与统计投影基础设施。它独立部署为主 server 用户服务 id=oa-event-flow,为 Pipeline、Code Queue 以及后续 agent runtime 提供统一的事实事件写入、按 tag 订阅、事件表查询和统计中心读接口。Pipeline 专有的流程推进规则仍以 docs/reference/pipeline-oa-event-flow.md 为准;本文件只定义跨业务共享的事件存储、订阅和统计中心契约。
Architecture Boundary
oa-event-flow必须是独立微服务,运行在主 server Compose 中,只暴露 Docker 内网端口,不开放公网端口;浏览器只能通过 UniDesk frontend 同源/api/microservices/oa-event-flow/...访问。- 业务服务只发布事实事件或控制意图,不直接修改其他服务状态;统计中心只消费事件流更新数据库投影,并通过读接口和派生事件暴露统计变化。
- 前端页面不得轮询业务 transcript、JSONL、runner 私有文件或旧 SSE 来推断 STEP/Trace 统计;应订阅
trace-stats-updated等统计变化事件,并按需拉取oa_trace_stats投影。 - 事件表、投影表和 projection offset 必须写入主 PostgreSQL;
.state/只能保存日志、缓存或可重建工件,不能作为事件或统计权威状态。 - 兼容迁移只能在同一重构闭环内存在;交付态不得保留两套互相竞争的 STEP 统计、Trace 刷新或 Pipeline 推进权威来源。
Event Envelope
所有写入 oa_events 的事件都必须使用统一 envelope:
eventId:全局稳定幂等键;重复发布同一事实必须命中同一eventId,不能造成重复统计。type:事实事件或派生事件类型,例如trace-step-created、task-updated、node-finished、trace-stats-updated。createdAt:事件发生时间;服务端可补齐但不得覆盖调用方提供的真实事实时间。sourceKind与sourceId:事件来源类型和来源实例,例如service/code-queue、service/pipeline、projection/trace-stats。aggregateType与aggregateId:事件所属聚合,例如task/<taskId>、pipeline/<pipelineId>、epoch/<runId>。correlationId与可选causationId:跨服务追踪和派生事件回溯链路。tags:字符串数组,用于 scope、业务标签、订阅和投影筛选。payload:事件事实数据;只放事实和必要摘要,不放未分页的大 transcript、完整日志或 secret。
Tag Contract
Tag 是 OA 事件流的订阅和投影索引。所有 tag 必须是稳定字符串,不依赖 UI 文案。
- 共享 tag:
service:<id>、trace、stats、control、diagnostics。 - Code Queue scope:
service:code-queue、task:<taskId>、queue:<queueId>、attempt:<index>。 - Pipeline scope:
service:pipeline、pipeline:<pipelineId>、epoch:<runId>、node:<nodeId>、attempt:<attemptId>、procedure:<procedureRunId>。 - 订阅接口按 tag 取交集:
tags=service:code-queue,task:<taskId>只返回同时带有这两个 tag 的事件。 - 事件生产者必须自动附加业务 scope tag;前端只传当前页面或当前选择的 tag,不在浏览器里拼接跨业务控制语义。
Event API
oa-event-flow 至少提供以下后端接口:
GET /health:返回数据库 ready、事件数、投影数、SSE client 数和服务启动时间。POST /api/events:批量或单条写入 OA 事件;服务端按eventId幂等去重,成功写入后同步推进内置投影。GET /api/events?tags=<tag,...>&afterSeq=<n>&limit=<n>:查询事件表,默认按sequence ASC返回有界结果。GET /api/events/stream?tags=<tag,...>&afterSeq=<n>:SSE 订阅事件表;连接建立后先补发afterSeq之后的有界 backlog,再推送 live event。GET /api/stats/trace?scopeId=<scope>或scopeIds=<scope,...>:读取 Trace/STEP 统计投影,scope形如task:<taskId>、task:<taskId>:attempt:<index>或pipeline:<pipelineId>。GET /api/diagnostics:返回事件表、投影表、近期事件类型、近期统计变化和订阅客户端摘要。
Statistics Center
统计中心是事件流内置投影,不由前端或业务页面临时重算:
oa_trace_stats是 Trace/STEP 统计权威表,按scopeId保存statsRevision、stepCount、llmStepCount、readCount、editCount、runCount、errorCount、traceLineCount、outputMaxSeq、attempt 摘要和updatedAt;其中stepCount表示 TraceView 可见执行行数,不等同于工具调用数,工具调用数由readCount+editCount+runCount表达。- Code Queue 任务级 scope
task:<taskId>只表达整个任务的累计统计;每个执行过程摘要必须使用独立 attempt scopetask:<taskId>:attempt:<index>。同一个trace-step-created事实可由统计投影同时写入任务级和 attempt 级oa_trace_steps/oa_trace_stats,但前端 attempt 卡片只能读取自己的 attempt scope,不得复用任务级累计统计。 - 投影只消费已经写入
oa_events的事实事件;不得直接读取 Code Queue transcript、Pipeline.state或 runner JSONL 作为实时统计来源。 trace-stats-snapshot可以作为迁移、服务重启或事件乱序时的权威种子事件;它仍必须进入事件表,由统计投影消费,不能绕过事件流直接写投影。投影消费trace-step-created时必须以oa_trace_steps的幂等行集合更新统计,不能把 snapshot 和 step 事件对同一个 step 重复累加。- 投影更新后必须发布派生事件
trace-stats-updated,带statspayload 和statstag;前端订阅该事件来更新左侧 STEP、Trace 摘要指标和 OA 可视化页面。 - 统计投影必须按
eventId幂等;重复事实不能重复增加 STEP 或 read/edit/run 计数。
Code Queue Integration
- Code Queue 不再维护独立 SSE 作为前端 Trace/STEP 刷新权威;任务输出、状态变更、queue 变更和统计快照必须发布到
oa-event-flow。 - Code Queue 左侧 task card 的
STEP、选中 task 的 Trace Summary 和全局统计只能读取oa_trace_stats;本地 task JSON 中的历史字段只能作为发布 snapshot 事件的输入,不作为前端权威统计来源。统计尚未投影完成时必须明确显示statsSource=unavailable或STEP --,不得回退到 transcript、本地stepCount或前端重算。 - 运行中每个新的 TraceView 可见执行行都必须发布
trace-step-created,并带task:<taskId>、queue:<queueId>、attempt:<index>、service:code-queue、tracetag,以及 payload 中的scopeId=task:<taskId>、attemptIndex和attemptScopeId=task:<taskId>:attempt:<index>;统计中心据此幂等更新任务级累计统计和 attempt 级独立统计,message/error 行增加 STEP 或 error,system 行默认仅保留在任务原始输出/数据库中,不进入 STEP 计数且不伪装为工具调用。 - Trace Summary 顶部执行摘要读取任务级
task:<taskId>;执行过程摘要#<index>读取task:<taskId>:attempt:<index>。如果 attempt scope 尚未投影完成,必须显示statsSource=unavailable或--,不得回退到任务级累计统计、transcript 重算或旧本地字段。 - 任务入队、开始、终态、移动 queue、标记已读等状态事实必须发布
task-updated或更具体的事实事件,供事件表和后续审计使用。 - Code Queue judge 每次真正发起 MiniMax LLM 请求前必须发布
judge-llm-request诊断事件,tag 至少包含service:code-queue、task:<taskId>、attempt:<index>、judge、diagnostics,payload 必须包含不带 API key 的最终 request payload/messages、prompt/payload 尺寸和 repairAttempt;随后发布judge-llm-response、judge-json-parse-error(如有)和judge-result,用于追溯“最终发给 judge LLM 的 prompt”和安全覆盖结果。 - Code Queue 服务启动后可对 PostgreSQL 中已有任务回放每个 TraceView 可见执行行的
trace-step-created,并发布trace-stats-snapshot事件完成统计中心种子同步;回放必须使用相同eventId保持幂等,不得阻塞队列恢复。历史回放必须按 attempt start 行推导attemptIndex,确保重建投影时 attempt scope 可独立恢复。
Pipeline Integration
- Pipeline 运行时的 OA 事实、控制事件和 Gantt 取证事件应写入同一个
oa-event-flow,并带service:pipeline与 Pipeline scope tag。 - 当 Pipeline 后端部署在远端 provider 且尚未直接写入主 server
oa-event-flow时,主 serveroa-event-flow必须通过 backend-core 内网代理启用单一路径 Pipeline snapshot bridge:只读取 Pipeline 的/api/snapshot与/api/oa-event-flow/diagnostics结构化接口,并发布稳定的pipeline-run-snapshot迁移事件,确保统一事件表中可见service:pipeline与epoch:<runId>事实。bridge 只做迁移期事件可见性接入,不得再实现“先读 ledger,失败后 snapshot”的双路径 fallback;Pipeline 直接发布到统一事件流后,应删除该 bridge。 - Pipeline 后端可继续向 UniDesk 暴露 Pipeline 专用 snapshot、Gantt DTO、node detail 和 control API,但这些 DTO 的权威来源必须是统一 OA 事件流,而不是旧文件通道或旧 batch 推进状态。
- UniDesk Pipeline 页面可以保留结构化 Pipeline 诊断面板;跨业务事件表、tag 订阅和原始 OA 事件可见性统一进入
用户服务 / OA Event Flow页面。 - Pipeline 接入完成态必须同时满足本文件的共享事件流契约和
docs/reference/pipeline-oa-event-flow.md的 Pipeline 控制流去残留门禁。
Frontend Visibility
用户服务 / OA Event Flow 是事件表和统计中心的可视化入口:
- 页面必须显示服务健康、事件总数、最新 sequence、Trace stats scope 数和 live stream 状态。
- 页面必须提供 tag 过滤输入,展示事件表列:sequence、type、source、aggregate、tags、createdAt、payload 摘要和显式
查看原始JSON。 - 页面必须展示 Trace stats 表,至少包含 scope、service、subject、STEP、read/edit/run、error、outputMaxSeq、statsRevision 和更新时间。
- 默认界面不得裸铺完整 JSON;完整事件和统计记录只能通过显式原始 JSON 按钮打开。
Completion Gates
config.json、docker-compose.yml、CLIserver rebuild、server status/logs和 E2E 必须认识oa-event-flow。- Code Queue 前端事件订阅必须指向
oa-event-flow的 tag stream;不得再依赖 Code Queue 私有/api/eventsSSE 作为刷新权威。 - Code Queue Trace Summary 和 task list 响应必须带
traceStats.source=oa-event-flow或明确的statsSource=unavailable状态;unavailable 时不得返回本地重算 STEP 伪装为权威统计。 - Code Queue TraceView 的 attempt 执行摘要必须带
attemptScopeId、attempt 级traceStats.source=oa-event-flow或明确的statsSource=unavailable;前端收到task-attempt的trace-stats-updated时只能刷新选中任务 Trace Summary,不能把 attempt 统计覆盖到左侧 task 级 STEP。 - Pipeline bridge 启用时,
/api/diagnostics必须暴露pipelineBridge.mode=snapshot或各 run 的bridgeMode=snapshot状态,GET /api/events?tags=service:pipeline必须能看到带epoch:<runId>的 Pipeline 事件;远端 Pipeline 后端直接发布到统一服务后,应删除 bridge 作为完成态清理项。 - 新增或修改 OA 事件流页面、Code Queue TraceView 或 Pipeline OA 可视化后,必须重建并验证
oa-event-flow、相关业务后端和frontend。