Files
pikasTech-unidesk/docs/reference/oa-event-flow.md
T

12 KiB
Raw Blame History

Unified OA Event Flow

Unified OA Event Flow 是 UniDesk 的事件流与统计投影基础设施。它独立部署为主 server 用户服务 id=oa-event-flow,为 Pipeline、Code Queue 以及后续 agent runtime 提供统一的事实事件写入、按 tag 订阅、事件表查询和统计中心读接口。Pipeline 专有的流程推进规则仍以 docs/reference/pipeline-oa-event-flow.md 为准;本文件只定义跨业务共享的事件存储、订阅和统计中心契约。

Architecture Boundary

  • oa-event-flow 必须是独立微服务,运行在主 server Compose 中,只暴露 Docker 内网端口,不开放公网端口;浏览器只能通过 UniDesk frontend 同源 /api/microservices/oa-event-flow/... 访问。
  • 业务服务只发布事实事件或控制意图,不直接修改其他服务状态;统计中心只消费事件流更新数据库投影,并通过读接口和派生事件暴露统计变化。
  • 前端页面不得轮询业务 transcript、JSONL、runner 私有文件或旧 SSE 来推断 STEP/Trace 统计;应订阅 trace-stats-updated 等统计变化事件,并按需拉取 oa_trace_stats 投影。
  • 事件表、投影表和 projection offset 必须写入主 PostgreSQL.state/ 只能保存日志、缓存或可重建工件,不能作为事件或统计权威状态。
  • 兼容迁移只能在同一重构闭环内存在;交付态不得保留两套互相竞争的 STEP 统计、Trace 刷新或 Pipeline 推进权威来源。

Event Envelope

所有写入 oa_events 的事件都必须使用统一 envelope:

  • eventId:全局稳定幂等键;重复发布同一事实必须命中同一 eventId,不能造成重复统计。
  • type:事实事件或派生事件类型,例如 trace-step-createdtask-updatednode-finishedtrace-stats-updated
  • createdAt:事件发生时间;服务端可补齐但不得覆盖调用方提供的真实事实时间。
  • sourceKindsourceId:事件来源类型和来源实例,例如 service/code-queueservice/pipelineprojection/trace-stats
  • aggregateTypeaggregateId:事件所属聚合,例如 task/<taskId>pipeline/<pipelineId>epoch/<runId>
  • correlationId 与可选 causationId:跨服务追踪和派生事件回溯链路。
  • tags:字符串数组,用于 scope、业务标签、订阅和投影筛选。
  • payload:事件事实数据;只放事实和必要摘要,不放未分页的大 transcript、完整日志或 secret。

Tag Contract

Tag 是 OA 事件流的订阅和投影索引。所有 tag 必须是稳定字符串,不依赖 UI 文案。

  • 共享 tagservice:<id>tracestatscontroldiagnostics
  • Code Queue scopeservice:code-queuetask:<taskId>queue:<queueId>attempt:<index>
  • Pipeline scopeservice:pipelinepipeline:<pipelineId>epoch:<runId>node:<nodeId>attempt:<attemptId>procedure:<procedureRunId>
  • 订阅接口按 tag 取交集:tags=service:code-queue,task:<taskId> 只返回同时带有这两个 tag 的事件。
  • 事件生产者必须自动附加业务 scope tag;前端只传当前页面或当前选择的 tag,不在浏览器里拼接跨业务控制语义。

Event API

oa-event-flow 至少提供以下后端接口:

  • GET /health:返回数据库 ready、事件数、投影数、SSE client 数和服务启动时间。
  • POST /api/events:批量或单条写入 OA 事件;服务端按 eventId 幂等去重,成功写入后同步推进内置投影。
  • GET /api/events?tags=<tag,...>&afterSeq=<n>&limit=<n>:查询事件表,默认按 sequence ASC 返回有界结果。
  • GET /api/events/stream?tags=<tag,...>&afterSeq=<n>:SSE 订阅事件表;连接建立后先补发 afterSeq 之后的有界 backlog,再推送 live event。
  • GET /api/stats/trace?scopeId=<scope>scopeIds=<scope,...>:读取 Trace/STEP 统计投影,scope 形如 task:<taskId>task:<taskId>:attempt:<index>pipeline:<pipelineId>
  • GET /api/diagnostics:返回事件表、投影表、近期事件类型、近期统计变化和订阅客户端摘要。

Statistics Center

统计中心是事件流内置投影,不由前端或业务页面临时重算:

  • oa_trace_stats 是 Trace/STEP 统计权威表,按 scopeId 保存 statsRevisionstepCountllmStepCountreadCounteditCountrunCounterrorCounttraceLineCountoutputMaxSeq、attempt 摘要和 updatedAt;其中 stepCount 表示 TraceView 可见执行行数,不等同于工具调用数,工具调用数由 readCount+editCount+runCount 表达。
  • Code Queue 任务级 scope task:<taskId> 只表达整个任务的累计统计;每个执行过程摘要必须使用独立 attempt scope task:<taskId>:attempt:<index>。同一个 trace-step-created 事实可由统计投影同时写入任务级和 attempt 级 oa_trace_steps/oa_trace_stats,但前端 attempt 卡片只能读取自己的 attempt scope,不得复用任务级累计统计。
  • 投影只消费已经写入 oa_events 的事实事件;不得直接读取 Code Queue transcript、Pipeline .state 或 runner JSONL 作为实时统计来源。
  • trace-stats-snapshot 可以作为迁移、服务重启或事件乱序时的权威种子事件;它仍必须进入事件表,由统计投影消费,不能绕过事件流直接写投影。投影消费 trace-step-created 时必须以 oa_trace_steps 的幂等行集合更新统计,不能把 snapshot 和 step 事件对同一个 step 重复累加。
  • 投影更新后必须发布派生事件 trace-stats-updated,带 stats payload 和 stats tag;前端订阅该事件来更新左侧 STEP、Trace 摘要指标和 OA 可视化页面。
  • 统计投影必须按 eventId 幂等;重复事实不能重复增加 STEP 或 read/edit/run 计数。

Code Queue Integration

  • Code Queue 不再维护独立 SSE 作为前端 Trace/STEP 刷新权威;任务输出、状态变更、queue 变更和统计快照必须发布到 oa-event-flow
  • Code Queue 左侧 task card 的 STEP、选中 task 的 Trace Summary 和全局统计只能读取 oa_trace_stats;本地 task JSON 中的历史字段只能作为发布 snapshot 事件的输入,不作为前端权威统计来源。统计尚未投影完成时必须明确显示 statsSource=unavailableSTEP --,不得回退到 transcript、本地 stepCount 或前端重算。
  • 运行中每个新的 TraceView 可见执行行都必须发布 trace-step-created,并带 task:<taskId>queue:<queueId>attempt:<index>service:code-queuetrace tag,以及 payload 中的 scopeId=task:<taskId>attemptIndexattemptScopeId=task:<taskId>:attempt:<index>;统计中心据此幂等更新任务级累计统计和 attempt 级独立统计,message/error 行增加 STEP 或 errorsystem 行默认仅保留在任务原始输出/数据库中,不进入 STEP 计数且不伪装为工具调用。
  • Trace Summary 顶部执行摘要读取任务级 task:<taskId>;执行过程摘要 #<index> 读取 task:<taskId>:attempt:<index>。如果 attempt scope 尚未投影完成,必须显示 statsSource=unavailable--,不得回退到任务级累计统计、transcript 重算或旧本地字段。
  • 任务入队、开始、终态、移动 queue、标记已读等状态事实必须发布 task-updated 或更具体的事实事件,供事件表和后续审计使用。
  • Code Queue judge 每次真正发起 MiniMax LLM 请求前必须发布 judge-llm-request 诊断事件,tag 至少包含 service:code-queuetask:<taskId>attempt:<index>judgediagnosticspayload 必须包含不带 API key 的最终 request payload/messages、prompt/payload 尺寸和 repairAttempt;随后发布 judge-llm-responsejudge-json-parse-error(如有)和 judge-result,用于追溯“最终发给 judge LLM 的 prompt”和安全覆盖结果。
  • Code Queue 服务启动后可对 PostgreSQL 中已有任务回放每个 TraceView 可见执行行的 trace-step-created,并发布 trace-stats-snapshot 事件完成统计中心种子同步;回放必须使用相同 eventId 保持幂等,不得阻塞队列恢复。历史回放必须按 attempt start 行推导 attemptIndex,确保重建投影时 attempt scope 可独立恢复。

Pipeline Integration

  • Pipeline 运行时的 OA 事实、控制事件和 Gantt 取证事件应写入同一个 oa-event-flow,并带 service:pipeline 与 Pipeline scope tag。
  • 当 Pipeline 后端部署在远端 provider 且尚未直接写入主 server oa-event-flow 时,主 server oa-event-flow 必须通过 backend-core 内网代理启用单一路径 Pipeline snapshot bridge:只读取 Pipeline 的 /api/snapshot/api/oa-event-flow/diagnostics 结构化接口,并发布稳定的 pipeline-run-snapshot 迁移事件,确保统一事件表中可见 service:pipelineepoch:<runId> 事实。bridge 只做迁移期事件可见性接入,不得再实现“先读 ledger,失败后 snapshot”的双路径 fallbackPipeline 直接发布到统一事件流后,应删除该 bridge。
  • Pipeline 后端可继续向 UniDesk 暴露 Pipeline 专用 snapshot、Gantt DTO、node detail 和 control API,但这些 DTO 的权威来源必须是统一 OA 事件流,而不是旧文件通道或旧 batch 推进状态。
  • UniDesk Pipeline 页面可以保留结构化 Pipeline 诊断面板;跨业务事件表、tag 订阅和原始 OA 事件可见性统一进入 用户服务 / OA Event Flow 页面。
  • Pipeline 接入完成态必须同时满足本文件的共享事件流契约和 docs/reference/pipeline-oa-event-flow.md 的 Pipeline 控制流去残留门禁。

Frontend Visibility

用户服务 / OA Event Flow 是事件表和统计中心的可视化入口:

  • 页面必须显示服务健康、事件总数、最新 sequence、Trace stats scope 数和 live stream 状态。
  • 页面必须提供 tag 过滤输入,展示事件表列:sequence、type、source、aggregate、tags、createdAt、payload 摘要和显式 查看原始JSON
  • 页面必须展示 Trace stats 表,至少包含 scope、service、subject、STEP、read/edit/run、error、outputMaxSeq、statsRevision 和更新时间。
  • 默认界面不得裸铺完整 JSON;完整事件和统计记录只能通过显式原始 JSON 按钮打开。

Completion Gates

  • config.jsondocker-compose.yml、CLI server rebuildserver status/logs 和 E2E 必须认识 oa-event-flow
  • Code Queue 前端事件订阅必须指向 oa-event-flow 的 tag stream;不得再依赖 Code Queue 私有 /api/events SSE 作为刷新权威。
  • Code Queue Trace Summary 和 task list 响应必须带 traceStats.source=oa-event-flow 或明确的 statsSource=unavailable 状态;unavailable 时不得返回本地重算 STEP 伪装为权威统计。
  • Code Queue TraceView 的 attempt 执行摘要必须带 attemptScopeId、attempt 级 traceStats.source=oa-event-flow 或明确的 statsSource=unavailable;前端收到 task-attempttrace-stats-updated 时只能刷新选中任务 Trace Summary,不能把 attempt 统计覆盖到左侧 task 级 STEP。
  • Pipeline bridge 启用时,/api/diagnostics 必须暴露 pipelineBridge.mode=snapshot 或各 run 的 bridgeMode=snapshot 状态,GET /api/events?tags=service:pipeline 必须能看到带 epoch:<runId> 的 Pipeline 事件;远端 Pipeline 后端直接发布到统一服务后,应删除 bridge 作为完成态清理项。
  • 新增或修改 OA 事件流页面、Code Queue TraceView 或 Pipeline OA 可视化后,必须重建并验证 oa-event-flow、相关业务后端和 frontend