Appearance
模块通信
核心通信路径
Orchestrator → UnetAdapter
connect(inst)/close(inst):建立/关闭网关连接probe(inst):探测节点 agent 列表与服务check_phy(inst):检查物理层状态query_node_address(inst):查询节点 UNet 地址send_frame(inst, ...):下发发送机会subscribe_rx(inst, ...):订阅接收通知
Orchestrator → Scheduler
StepContext→ScheduleDecision:每步传入上下文,返回调度决策
Orchestrator → Metrics
- 写入 trace(trace_collector)
- 聚合并写入 metrics(compute + export)
RunService → ExecutionWorker
RunService.start_run(run_id)
│
├── LocationInitializer.run_before_execution() # 可选位置初始化
├── repo.transition_run_status("running") # 状态转换
├── ExecutionWorker(repo, request, on_done) # 创建 worker
└── worker.start() # 启动后台线程RunService持有_workers: Dict[str, ExecutionWorker]映射ExecutionWorker通过threading.Event实现协作式停止- worker 完成时通过
on_done回调通知RunService清理
RunService → LocationInitializer
- 在
start_run()中调用run_before_execution(scenario_path) - 失败时将 run 标记为
failed并抛出RunConflictError - 产生 warning 时记录为
location_init_warning事件
ExecutionWorker → Repository
set_run_fields(current_step=...):实时更新步进度transition_run_status():结束时设置终态upsert_summary():写入运行摘要append_event():记录状态变更事件
ExecutionWorker → TraceRepository
bulk_insert_metrics():每步实时写入指标(via step_callback)bulk_insert_traces():运行结束后批量写入 trace- IntegrityError 容错:单路径失败后自动禁用该路径写入,不影响另一路径
EventPublisher → WS Broadcaster
ExecutionWorker._publish()
│
└── RunEventBroadcaster.publish_from_sync() # 线程安全的同步→异步桥接
│
└── asyncio.run_coroutine_threadsafe()
│
└── RunEventBroadcaster.publish() # 分发到所有订阅连接publish_from_sync()为同步线程(worker)提供发布能力- 每个 WebSocket 连接拥有独立的
asyncio.Queue(128 槽),队列满时丢弃最旧消息
API → ScenarioRepository → DB
- 场景 CRUD 通过
ScenarioRepository操作 DB - 创建/更新后自动调用
sync_to_yaml()保持 YAML 文件同步 - 读取时先查 DB,未找到再查本地 YAML 文件
解耦原则
- UNET 侧 agent 名称与消息类名不得写死在主流程;通过
probe()或配置注入 hint。 - 指标计算基于 payload header/trace 字段,避免依赖 UNET 特定日志格式。
- Service 层通过依赖注入接受 Repository 和 Adapter,便于测试时替换 mock 实现。
- WebSocket 广播器通过
bind_loop()绑定事件循环,worker 线程通过publish_from_sync()安全跨线程发布。