Skip to content

模块通信

核心通信路径

Orchestrator → UnetAdapter

  • connect(inst) / close(inst):建立/关闭网关连接
  • probe(inst):探测节点 agent 列表与服务
  • check_phy(inst):检查物理层状态
  • query_node_address(inst):查询节点 UNet 地址
  • send_frame(inst, ...):下发发送机会
  • subscribe_rx(inst, ...):订阅接收通知

Orchestrator → Scheduler

  • StepContextScheduleDecision:每步传入上下文,返回调度决策

Orchestrator → Metrics

  • 写入 trace(trace_collector)
  • 聚合并写入 metrics(compute + export)

RunService → ExecutionWorker

RunService.start_run(run_id)

    ├── LocationInitializer.run_before_execution()  # 可选位置初始化
    ├── repo.transition_run_status("running")       # 状态转换
    ├── ExecutionWorker(repo, request, on_done)      # 创建 worker
    └── worker.start()                               # 启动后台线程
  • RunService 持有 _workers: Dict[str, ExecutionWorker] 映射
  • ExecutionWorker 通过 threading.Event 实现协作式停止
  • worker 完成时通过 on_done 回调通知 RunService 清理

RunService → LocationInitializer

  • start_run() 中调用 run_before_execution(scenario_path)
  • 失败时将 run 标记为 failed 并抛出 RunConflictError
  • 产生 warning 时记录为 location_init_warning 事件

ExecutionWorker → Repository

  • set_run_fields(current_step=...) :实时更新步进度
  • transition_run_status() :结束时设置终态
  • upsert_summary() :写入运行摘要
  • append_event() :记录状态变更事件

ExecutionWorker → TraceRepository

  • bulk_insert_metrics() :每步实时写入指标(via step_callback)
  • bulk_insert_traces() :运行结束后批量写入 trace
  • IntegrityError 容错:单路径失败后自动禁用该路径写入,不影响另一路径

EventPublisher → WS Broadcaster

ExecutionWorker._publish()

    └── RunEventBroadcaster.publish_from_sync()  # 线程安全的同步→异步桥接

            └── asyncio.run_coroutine_threadsafe()

                    └── RunEventBroadcaster.publish()  # 分发到所有订阅连接
  • publish_from_sync() 为同步线程(worker)提供发布能力
  • 每个 WebSocket 连接拥有独立的 asyncio.Queue(128 槽),队列满时丢弃最旧消息

API → ScenarioRepository → DB

  • 场景 CRUD 通过 ScenarioRepository 操作 DB
  • 创建/更新后自动调用 sync_to_yaml() 保持 YAML 文件同步
  • 读取时先查 DB,未找到再查本地 YAML 文件

解耦原则

  • UNET 侧 agent 名称与消息类名不得写死在主流程;通过 probe() 或配置注入 hint。
  • 指标计算基于 payload header/trace 字段,避免依赖 UNET 特定日志格式。
  • Service 层通过依赖注入接受 Repository 和 Adapter,便于测试时替换 mock 实现。
  • WebSocket 广播器通过 bind_loop() 绑定事件循环,worker 线程通过 publish_from_sync() 安全跨线程发布。