Skip to content

运行状态与数据流

Run 状态机

系统采用有限状态机管理 run 生命周期,状态转换规则定义在 persistence/repo.py 中:

                    ┌────────────┐
                    │  created   │
                    └──┬───┬──┬──┘
                       │   │  │
              ┌────────┘   │  └───────┐
              ▼            ▼          ▼
         ┌─────────┐  ┌────────┐  ┌────────┐
         │ running  │  │stopped │  │ failed │
         └──┬─┬─┬─┬┘  └────────┘  └────────┘
            │ │ │ │
    ┌───────┘ │ │ └──────────┐
    ▼         ▼ ▼            ▼
┌────────┐ ┌────────┐  ┌────────┐
│stopping│ │finished│  │ failed │
└──┬─┬─┬─┘ └────────┘  └────────┘
   │ │ │
   ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│stopped │ │finished│ │ failed │
└────────┘ └────────┘ └────────┘

完整转换规则

当前状态允许转换到
createdrunningstoppedfailed
runningstoppingstoppedfinishedfailed
stoppingstoppedfinishedfailed
stopped(终态)
finished(终态)
failed(终态)

状态转换时间戳

  • createdrunning:写入 started_at
  • 任意 → stopped / finished / failed:写入 finished_at

转换触发源

转换触发者说明
created → runningRunService.start_run()启动 ExecutionWorker
created → stoppedRunService.stop_run()未启动直接停止
created → failedRunService.start_run()LocationInitializer 失败
running → stoppingRunService.stop_run()用户请求停止
running → finishedExecutionWorker._run()正常完成
running → failedExecutionWorker._run()执行异常
stopping → stoppedExecutionWorker._resolve_terminal_status()检测到 stop_event
stopping → finishedExecutionWorker._resolve_terminal_status()停止信号前已完成
stopping → failedExecutionWorker._run()停止过程中异常

Step 数据

每个 step 由 step_callback 实时报告,包含:

  • step_id:当前步序号
  • total_steps:总步数
  • 调度决策:active_nodes、quota_frames 等
  • 窗口指标:throughput_bps、loss_rate、delay_p50_ms、delay_p95_ms、sent_count、recv_count、sync_error_ms、overhead_ms

Step 数据流向:

  1. step_callback → 更新 runs.current_step(DB)
  2. step_callback → 写入 run_metrics 表(实时)
  3. step_callback → 发布 WS 事件(step + schedule + metrics)

落盘与一致性

  • trace 数据在运行结束后从 traces.csv 批量写入 run_traces
  • metrics 数据在每步由 step_callback 实时写入 run_metrics 表;运行结束时从 CSV 兜底补录
  • summary 数据在运行结束时由 _upsert_summary() 写入 run_summary
  • 事件日志通过 repo.append_event() 写入 run_events
  • 状态变更通过 transition_run_status() 保证原子性,非法转换抛出 ValueError