Appearance
系统整体架构
本文档描述 UNET-DT 水声网络数字孪生调度平台 自身的完整系统架构,涵盖分层视图、核心数据流转、关键数据通路以及未来演进方向。
本文所述"系统"指毕设项目本体(Python 后端 + React 前端 + 仿真适配层),而非文档站点架构。
1. 系统总体架构图(分层视图)
┌──────────────────────────────────────────────────────────────────────┐
│ 前端层 (Frontend) │
│ React 19 + Vite 7 + TailwindCSS 4 + TypeScript SPA │
│ 页面: Dashboard / Scenarios / Runs / Monitor / Config │
│ 通信: REST fetch + WebSocket useWebSocket / useRunPolling │
└────────────────────────┬───────────────────┬─────────────────────────┘
│ HTTP/REST │ WebSocket
▼ ▼
┌──────────────────────────────────────────────────────────────────────┐
│ API 网关层 (API Gateway) │
│ FastAPI + Uvicorn + CORS(allow_origins=*) + RequestLoggingMW │
│ 路由组: │
│ /health 健康检查 /config 运行时配置 │
│ /scenarios 场景 CRUD /strategies 策略列表 │
│ /runs 运行生命周期 /ws/runs/{id} WebSocket 实时推送 │
│ 依赖注入: get_repo / get_run_service / get_scenario_repo / │
│ get_trace_repo / get_ws_broadcaster │
└────────────────────────┬─────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────┐
│ 业务服务层 (Business Service) │
│ │
│ RunService 场景解析 + 状态机守卫 + Worker 生命周期管理 │
│ ├─ start_run() 创建 ExecutionWorker → 启动后台线程 │
│ ├─ stop_run() 设置 stop_event → 状态过渡 stopping/stopped │
│ └─ get_run() 查询 + 汇总 │
│ │
│ ExecutionWorker 后台执行线程 (daemon thread) │
│ ├─ _run() 调用 executor → 写 DB → 发布 WS 事件 │
│ ├─ _build_step_cb() 构建 step_callback 闭包: DB + WS 双写 │
│ ├─ _persist_traces() 批量写入 traces/metrics 到数据库 │
│ └─ _upsert_summary() 汇总写入 run_summary │
│ │
│ LocationInitializer 运行前节点坐标初始化 (subprocess 调用脚本) │
└────────────────────────┬─────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────┐
│ 调度引擎层 (Orchestrator Engine) │
│ │
│ runner.run_demo() 入口分发: mock / gateway 模式 │
│ │ │
│ ├─ run_demo_mock() Mock 模式: 随机生成 traces → 离线计算 │
│ │ ├─ _generate_mock_traces() 伪随机 TX/RX 生成 │
│ │ ├─ compute_window_metrics() 按步窗口聚合指标 │
│ │ └─ step_callback 模拟推送 │
│ │ │
│ └─ run_demo_gateway() Gateway 模式: 真实水声仿真 │
│ ├─ run_phy_preflight() PHY 健康预检 + 地址发现 │
│ ├─ adapter.connect() 建立 fjage Gateway 连接 │
│ ├─ adapter.subscribe_rx() 每节点独立 RX 线程 │
│ ├─ Step Loop 时间步循环 (核心调度) │
│ │ ├─ strategy.decide() 调度策略决策 │
│ │ ├─ quota 检查 + TX 计划 流量规划 │
│ │ ├─ pack_header() 24字节 DT 协议头编码 │
│ │ ├─ adapter.send_frame() 发送至 UNet 节点 │
│ │ ├─ collector.record_tx() TX 登记 │
│ │ ├─ rx_settle 等待 声学传播沉淀 │
│ │ ├─ collector.finalize() 快照 + 指标计算 │
│ │ └─ step_callback() DB写入 + WS推送 │
│ └─ Grace Period → 关闭 RX → 导出 CSV + 报告 │
│ │
│ TraceCollector TX/RX 事件配对引擎 │
│ ├─ record_tx() 以 (run_u32, src, dst, seq) 为 key 登记 │
│ ├─ record_rx() 匹配 TX → 标记 received + rx_time_ms │
│ └─ finalize() 生成完整 trace 列表 (含丢包推断) │
│ │
│ Scheduler 调度策略抽象 │
│ ├─ RoundRobinStrategy 轮转: 每步激活 1 个节点 │
│ └─ AdaptiveQosStrategy 自适应: loss>0.2 时扩展到 2 个活跃节点 │
└────────────────────────┬─────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────┐
│ 适配器层 (Adapter Layer) │
│ │
│ UnetAdapter (ABC) 抽象基类: connect/close/probe/check_phy/ │
│ subscribe_rx/send_frame/get_time/ │
│ query_node_address │
│ │
│ GatewayAdapterFjagepy fjagepy.Gateway 实现 (首选后端) │
│ ├─ 独立 TX/RX Gateway 避免 request/receive 竞争 │
│ ├─ DatagramReq → TxFrameReq 降级策略 │
│ ├─ PHYSICAL/LINK 服务发现 (排除 transport agent) │
│ └─ query_node_address() NODE_INFO ParameterReq 查询 │
│ │
│ GatewayAdapterArlpy arlpy.unet.Gateway 实现 (备选后端) │
│ ├─ TxFrameReq 发送 │
│ └─ probe fallback: 从 services 列表推断 PHY 可用性 │
│ │
│ LogOnlyAdapter 空实现 (无真实通信能力, 仅用于日志) │
│ │
│ factory.get_adapter() 自动检测: arlpy → fjagepy → logonly │
│ │
│ Gateway Preflight 运行前健康检查 │
│ ├─ probe + check_phy 验证 PHYSICAL 服务可用 │
│ ├─ query_node_address 发现 UNet 运行时地址 │
│ └─ 选择策略: 优先 preferred → 补充 healthy → 截断 required_count │
│ │
│ 辅助模块: │
│ rx_parse.py 自适应 RX 消息解析 (兼容多版本 fjage 消息) │
│ utils.py best_effort_subscribe / normalize_message │
│ payload_header.py 24字节小端协议头: run_id(u32) + step_id(u32) │
│ + seq(u32) + tx_time_ms(u64) + src(u16) │
│ + dst(u16) = 24 bytes │
└────────────────────────┬─────────────────────────────────────────────┘
│ fjage JSON / TCP
▼
┌──────────────────────────────────────────────────────────────────────┐
│ 外部系统 (External Systems) │
│ │
│ UNet 仿真器集群 (7节点) │
│ 远程服务器: 198.144.181.94 │
│ 端口映射: 1101-1107 (每端口对应一个仿真节点) │
│ 协议: fjage JSON-over-TCP │
│ 服务: PHYSICAL / LINK / NODE_INFO / DATAGRAM(transport) │
│ │
│ 文件系统 │
│ scenarios/ 场景 YAML 定义文件 │
│ runs/<run_id>/ 运行产物目录 │
│ ├─ scenario.yaml 运行快照 │
│ ├─ strategy.json 策略配置 │
│ ├─ versions.json 版本 + 运行元数据 │
│ ├─ traces.csv 帧级 TX/RX 追踪记录 │
│ ├─ metrics.csv 步级聚合指标 │
│ ├─ schedule_decisions.csv 调度决策日志 │
│ └─ report.md 运行摘要报告 │
└────────────────────────┬─────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────┐
│ 持久化层 (Persistence) │
│ │
│ Database SQLAlchemy Engine + Session 工厂 │
│ ├─ 连接池: pool_size / max_overflow / pool_recycle │
│ ├─ pool_pre_ping 心跳检测 │
│ └─ session_scope() 上下文管理 (自动 commit/rollback) │
│ │
│ RunRepository runs + run_summary + run_events 表操作 │
│ ├─ 反射式表发现 (MetaData autoload) │
│ ├─ 状态机过渡: created→running→stopping→stopped/finished/failed │
│ ├─ _filter_columns() 容忍 schema 差异 │
│ └─ ensure_tables_with_columns() 自动 ALTER ADD COLUMN │
│ │
│ TraceRepository run_traces + run_metrics 表操作 │
│ ├─ bulk_insert_traces/metrics 批量写入 │
│ ├─ FK 预检: 验证 run_id 存在再 INSERT │
│ ├─ charset 一致性检查 (MySQL utf8mb4 vs utf8mb3) │
│ └─ has_traces/has_metrics 快速存在性判断 │
│ │
│ ScenarioRepository 场景 DB + YAML 双源同步 │
│ ├─ sync_to_yaml() DB → 磁盘 YAML 文件同步 │
│ └─ CRUD 操作 场景的增删改查 │
│ │
│ 存储后端: MySQL (远程) / SQLite (本地开发) │
│ 表结构: │
│ dt_runs 运行主表 (状态机 + 参数 + 时间戳) │
│ dt_run_summary 运行汇总 (avg_throughput / loss / delay) │
│ dt_run_events 运行事件流 (event_type + payload_json) │
│ dt_run_traces 帧级追踪 (step_id / seq / tx_time / rx_time) │
│ dt_run_metrics 步级指标 (throughput / loss / delay_p50/p95) │
│ dt_scenarios 场景定义 (YAML 解析后的结构化存储) │
└──────────────────────────────────────────────────────────────────────┘2. 数据流转图
2.1 运行创建与执行主流程
用户 (前端 SPA)
│
│ POST /runs {scenario_id, strategy_id, mode, steps, dt, seed}
▼
API 路由层 (routes/runs.py)
│
│ 场景查找: ScenarioRepository(DB) → YAML fallback
│ 生成 run_id → RunRepository.create_run() → status="created"
│
│ POST /runs/{id}/start
▼
RunService.start_run()
│
├─ LocationInitializer.run_before_execution() # 节点坐标初始化
│ └─ subprocess → 初始化脚本 → 设置 UNet 节点位置
│
├─ RunRepository.transition_run_status("running")
│
└─ ExecutionWorker(daemon thread).start()
│
├─ step_callback = _build_step_callback()
│ 闭包: DB current_step 更新 + metrics 实时写入 + WS 事件发布
│
└─ executor(request, stop_event, step_callback)
│
└─ runner.run_demo(mode=...)
│
├─ mode="mock" → run_demo_mock()
│ 生成伪随机 traces → 计算 metrics → CSV 导出
│ 模拟步循环: step_callback 逐步推送
│
└─ mode="gateway" → run_demo_gateway()
(详见 2.2 节)2.2 Gateway 模式核心数据流
run_demo_gateway()
│
├─ [1] 初始化阶段
│ ├─ load_scenario(YAML) 解析场景配置
│ ├─ get_adapter() 检测: arlpy → fjagepy → logonly
│ ├─ 构建 UnetInstance 列表 从 scenario.nodes[].tcp 提取 host:port
│ ├─ run_phy_preflight() PHY 健康预检
│ │ ├─ adapter.connect() 建立连接
│ │ ├─ adapter.probe() 探查 agents/services
│ │ ├─ adapter.check_phy() 验证 PHYSICAL 服务存在
│ │ ├─ adapter.query_node_address() 发现 UNet 运行时地址
│ │ └─ select(preferred, healthy) 选择满足 required_count 的节点集
│ ├─ 地址映射构建 addr_by_node_id: scenario unet_addr > preflight
│ ├─ MTU 检测 get_datagram_mtu() → max_user_bytes
│ └─ adapter.connect() (正式连接)
│
├─ [2] RX 订阅阶段
│ └─ 每个节点启动独立 daemon 线程:
│ adapter.subscribe_rx(inst, on_msg, stop_event, ready_event)
│ └─ 独立 fjagepy.Gateway 连接 (避免 TX/RX 竞争)
│ └─ best_effort_subscribe() → receive_message() 循环
│ └─ on_msg → on_rx() 回调
│ ├─ parse_rx_message() 自适应解析 payload
│ ├─ PayloadHeader.from_bytes() 解码 24 字节头
│ ├─ run_id 校验 过滤其他实验的帧
│ └─ collector.record_rx() TX/RX 配对
│ └─ rx_ready_events 等待 (15s 超时) 确保 RX 就绪后才发送
│
├─ [3] Step Loop (核心调度循环)
│ for step_id in range(steps):
│ │
│ ├─ strategy.decide(StepContext)
│ │ 输入: step_id, t_window, last_window_metrics, queues, link_hints
│ │ 输出: ScheduleDecision(active_nodes, quota_frames, meta)
│ │
│ ├─ TX 计划生成
│ │ 遍历 active_flows → 检查 flow_next_due_ms → 构建 tx_plan
│ │
│ ├─ TX 执行
│ │ for (src, dst, bytes, flow_id) in tx_plan:
│ │ ├─ quota 检查: sent_in_step[src] < quota[src]
│ │ ├─ cap = min(payload_bytes, max_user_bytes)
│ │ ├─ pack_header(run_u32, step_id, seq, tx_time_ms, src, dst)
│ │ │ → 24 字节头 + user_data
│ │ ├─ dst_addr = addr_by_node_id[dst] (运行时地址映射)
│ │ ├─ adapter.send_frame(inst, to=dst_addr, payload)
│ │ │ → DatagramReq(phy) / TxFrameReq 降级
│ │ └─ collector.record_tx()
│ │
│ ├─ RX 沉淀等待
│ │ time.sleep(min(2.0, dt * 0.5)) 声学传播时间
│ │
│ ├─ 指标快照
│ │ collector.finalize() → compute_window_metrics()
│ │ last_window_metrics = m[-2] (用上一步修正后的值)
│ │
│ ├─ step_callback(run_id, step_id, decision, metrics)
│ │ ├─ repo.set_run_fields(current_step=step_id)
│ │ ├─ trace_repo.bulk_insert_metrics(实时写入本步 metrics)
│ │ └─ WS publish: step + schedule + metrics 三类事件
│ │
│ └─ 步间节拍对齐
│ target = start_perf + (step_id+1) * dt_seconds
│ sleep(target - now)
│
└─ [4] 收尾阶段
├─ Grace Period (rx_grace_s, 默认 5s) 等待在途帧到达
├─ stop_event.set() → RX 线程退出
├─ adapter.close() 关闭所有 Gateway 连接
├─ collector.finalize() → export_traces/metrics/schedule CSV
├─ compute_overall_summary() → report.md
└─ (fallback) Gateway 失败 + 非 strict → 降级为 mock 模式2.3 前端实时数据推送
ExecutionWorker 线程 FastAPI 主线程 浏览器 SPA
│ │ │
│ step_callback() │ │
│──publisher.publish_from_sync()────>│ │
│ (thread-safe: future.result) │ │
│ │ RunEventBroadcaster │
│ │ ._channels[run_id] │
│ │ → Queue → sender_task │
│ │──ws.send_json(envelope)──>│
│ │ │
│ │ 事件类型: │
│ │ step: {step_id, total} │
│ │ schedule: {active, quota}│
│ │ metrics: {throughput...} │
│ │ state: {status} │
│ │ │
同时: │ │
│ repo.set_run_fields() │ │
│ trace_repo.bulk_insert_metrics() │ │
│ │ GET /runs/{id}/status ──>│
│ │ (轮询降级: 当 WS 不可用) │
│ │<── RunStatusResponse │2.4 场景数据双源同步
┌────────────────┐
│ 用户创建场景 │
└───────┬────────┘
│
┌────────────┴────────────┐
▼ ▼
POST /scenarios (API) 手动编辑 YAML
│ │
▼ ▼
ScenarioRepository scenarios/*.yaml
(dt_scenarios 表) (文件系统)
│ │
└─────── sync ────────────┘
│
▼
RunService._resolve_scenario_path()
1. scenario_repo.sync_to_yaml() DB → YAML
2. glob(scenarios/*.yaml) 扫描匹配
3. load_scenario(path) Pydantic 校验3. 关键数据通路详解
3.1 TX 路径 (发送链路)
Runner Step Loop
│
├─ strategy.decide(ctx) 选择活跃节点 + 配额
│ ctx.last_window_metrics 上一步的信道质量反馈
│
├─ quota 检查 sent_in_step[src] < quota_frames[src]
│
├─ PayloadHeader 编码 24 字节小端固定头
│ run_id:u32 ──── 实验唯一标识 (随机 u32)
│ step_id:u32 ──── 时间步序号
│ seq:u32 ──── 源-目的对内序列号
│ tx_time_ms:u64 ─ 高精度发送时间戳 (perf_counter_ns)
│ src:u16 ──── 逻辑源节点 ID (场景定义)
│ dst:u16 ──── 逻辑目的节点 ID (场景定义)
│
├─ MTU 裁剪 min(payload_bytes, max_user_bytes)
│ max_user_bytes = MTU - 24 (典型: 56 - 24 = 32 bytes)
│
├─ adapter.send_frame(inst, to=dst_addr, payload=header+user_data)
│ dst_addr = addr_by_node_id[dst] 逻辑ID → 运行时地址映射
│ │
│ ├─ [fjagepy] DatagramReq(phy, to, protocol=0, data=[int])
│ │ recipient = agentForService(PHYSICAL/LINK)
│ │ 排除 transport agent (避免可靠传输拥塞)
│ │ gw.request(msg, timeout=5000)
│ │ AGREE → 成功; 其他 → 降级 TxFrameReq
│ │
│ └─ [arlpy] TxFrameReq(to, data)
│ gw.request(msg) 或 gw.send(msg)
│
└─ collector.record_tx(run_u32, step_id, seq, tx_time_ms, src, dst)
以 key=(run_u32, src, dst, seq) 登记到 _tx_by_key3.2 RX 路径 (接收链路)
UNet 仿真节点 (PHY 层)
│
│ 声学帧到达 → fjage notification (DatagramNtf / RxFrameNtf)
▼
fjagepy.Gateway (独立 RX 连接)
│
│ receive_message(gw, timeout=1.0)
▼
normalize_message(msg) 统一消息字典格式
│
▼
on_rx(inst, msg) 回调
│
├─ parse_rx_message(msg) 自适应 payload 提取
│ ├─ _find_payload_bytes() 按 score 排序: data > payload > bytes
│ │ 支持: bytes / bytearray / list[int] / numpy uint8
│ ├─ _find_int_field(src/dst) 兼容 src/source/from/sender 等变体
│ └─ 返回 RxParseResult(payload, src, dst, meta)
│
├─ 长度检查: len(payload) >= HEADER_SIZE (24)
│
├─ PayloadHeader.from_bytes(payload) 解码 DT 协议头
│
├─ run_id 校验: hdr.run_id == run_u32 过滤非本次实验的帧
│
├─ 计算延迟: rx_ms = now_ms(); delay = rx_ms - hdr.tx_time_ms
│
└─ collector.record_rx(header, rx_time_ms)
key = (run_u32, src, dst, seq)
匹配 _tx_by_key[key] → tr.received=True, tr.rx_time_ms=rx_ms
无匹配 → orphan_rx (用于调试)3.3 监控数据通路
Runner Step Loop
│
│ step_callback(run_id, step_id, decision, window_metrics)
▼
ExecutionWorker._build_step_callback() 闭包
│
├─ DB 写入路径 (持久化)
│ ├─ repo.set_run_fields(current_step=step_id) 更新进度
│ │ → UPDATE dt_runs SET current_step=? WHERE id=?
│ │
│ └─ trace_repo.bulk_insert_metrics(run_id, [row]) 实时 metrics
│ → INSERT INTO dt_run_metrics (run_id, step_id, ...)
│ FK 预检: SELECT id FROM dt_runs WHERE id=?
│ IntegrityError → 禁用后续写入 (_metrics_write_disabled)
│
├─ WS 推送路径 (实时)
│ publisher.publish_from_sync()
│ │
│ ├─ 同线程检测: asyncio.get_running_loop() == self._loop?
│ │ 是 → loop.create_task(publish())
│ │ 否 → asyncio.run_coroutine_threadsafe(publish(), loop)
│ │
│ └─ RunEventBroadcaster.publish()
│ → _channels[run_id] → 每个连接的 Queue → sender_task
│ → websocket.send_json(RunEventEnvelope)
│ 事件类型: step / schedule / metrics
│
└─ 前端消费
├─ useWebSocket(/ws/runs/{id}) 优先: 实时推送
│ connect → 初始 state + metrics → 持续 step/schedule/metrics
│
└─ useRunPolling 降级: HTTP 轮询
GET /runs/{id}/status
→ RunStatusResponse(run, summary, current_step, latest_metrics)4. Run 状态机
┌─────────┐
│ created │
└────┬────┘
│ start_run()
▼
┌─────────┐
┌─────│ running │─────┐
│ └────┬────┘ │
│ │ │
stop_run() 完成 异常
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌────────┐
│ stopping │ │ finished │ │ failed │
└────┬─────┘ └──────────┘ └────────┘
│
▼
┌─────────┐
│ stopped │
└─────────┘
终态: stopped / finished / failed (不可再转换)5. 部署架构
5.1 当前部署 (单机开发)
┌─────────────────────────────────────────────────────────┐
│ 开发机 (Windows) │
│ │
│ ┌──────────────┐ ┌──────────────────────────────────┐ │
│ │ Vite Dev │ │ Uvicorn (Python) │ │
│ │ Server │ │ FastAPI app │ │
│ │ :5173 │ │ :8000 │ │
│ │ React SPA │ │ ExecutionWorker threads │ │
│ │ │──│ WebSocket broadcaster │ │
│ │ HMR 热更新 │ │ │ │
│ └──────────────┘ └────────────┬─────────────────────┘ │
│ │ │
│ ┌────────────┴───────────┐ │
│ │ SQLAlchemy Engine │ │
│ │ MySQL / SQLite │ │
│ └────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│ TCP (fjage JSON)
▼
┌─────────────────────────────────────────────────────────┐
│ 远程 UNet 仿真服务器: 198.144.181.94 │
│ │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │:1101 │ │:1102 │ │:1103 │ │... │ 共 7 个节点 │
│ │node-1│ │node-2│ │node-3│ │:1107 │ │
│ └──────┘ └──────┘ └──────┘ └──────┘ │
│ │
│ UnetStack 仿真器 (Java) │
│ PHYSICAL / LINK / NODE_INFO 服务 │
│ 声学信道建模 + 帧级仿真 │
└─────────────────────────────────────────────────────────┘5.2 数据库连接拓扑
Python 后端
│
├─ MySQL (生产)
│ URL: mysql+pymysql://user:pass@host:3306/dbname
│ 连接池: pool_size=5, max_overflow=10, pool_recycle=3600
│ pool_pre_ping=True (心跳检测断连)
│ charset 一致性: utf8mb4_general_ci
│
└─ SQLite (开发/测试)
URL: sqlite:///local.db
无连接池 (SQLite 单文件)6. 未来架构演进
6.1 短期优化 (当前可行)
目标: 在不改变整体架构的前提下,提升系统稳定性和用户体验。
当前: 目标:
┌─────────────┐ ┌─────────────┐
│ 前端轮询 │ GET /status 每2s │ WebSocket │ 全量 WS 推送
│ useRunPolling│ ──────────────────> │ useWebSocket │ step/metrics
└─────────────┘ └─────────────┘
▲
当前: │ publish_from_sync()
┌─────────────┐ ┌───┴─────────┐
│ 步循环无屏障 │ sleep(dt) 固定节拍 │ barrier/clock│ 仿真时钟同步
│ rx_settle │ │ 激活 │ 精确步边界
└─────────────┘ └─────────────┘具体改进:
- WebSocket 替代轮询: 前端已有
useWebSockethook 和后端RunEventBroadcaster,将useRunPolling中的 HTTP 轮询切换到 WS 实时推送。 - Barrier/Clock 激活: 利用 UNet 仿真器的
ClockSync服务对齐时间步边界,替代当前的time.sleep(dt)固定等待。 - MTU 协商增强: 当前仅在首个节点上检测 MTU,扩展为所有活跃节点的 MTU 共识取最小值。
- Adapter 连接池: 复用 Gateway 连接,避免每次 preflight 后重建连接。
6.2 中期重构 (有一定挑战)
目标: 解耦计算密集的执行层与 I/O 密集的 API 层,支持并发运行。
当前: 单进程 + daemon 线程 目标: 多 Worker + 消息队列
┌──────────┐ ┌──────────┐
│ FastAPI │ │ FastAPI │
│ + Worker │ │ (API only)│
│ in-process│ └─────┬─────┘
└──────────┘ │ 任务投递
▼
┌───────────┐
│ Redis / │
│ RabbitMQ │
└─────┬─────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Worker-1 │ │ Worker-2 │ │ Worker-N │
│ (独立进程)│ │ (独立进程)│ │ (独立进程)│
└──────────┘ └──────────┘ └──────────┘具体改进:
- 消息队列解耦: 引入 Redis (Celery) 或 RabbitMQ 作为任务队列,API 层只负责入队,Worker 独立消费。
- 多 Worker 并发: 每个 Worker 进程独立运行一个
run_demo_gateway(),支持同时执行多个实验。 - 结果回写: Worker 完成后通过消息队列或 DB 通知 API 层更新状态。
- WebSocket 代理: API 层从 Redis Pub/Sub 订阅 Worker 事件,转发给前端 WS 连接。
- 调度策略插件化: 将
Strategy接口扩展为可动态注册的插件系统,支持运行时加载新策略。
6.3 长期愿景 (架构升级)
目标: 支持大规模、多场景并行实验,集成高保真信道模型。
┌──────────────────────────────────────────────────────────────────┐
│ Kubernetes 集群 │
│ │
│ ┌─────────┐ ┌──────────────┐ ┌──────────────────────────────┐ │
│ │ Ingress │ │ API Pod (x2) │ │ Orchestrator Pod (x N) │ │
│ │ Nginx │→ │ FastAPI │ │ 每 Pod 一个 Runner 实例 │ │
│ │ │ │ 无状态 │ │ 独占一组 UNet 节点 │ │
│ └─────────┘ └──────┬───────┘ └──────────┬───────────────────┘ │
│ │ │ │
│ ┌───────┴──────┐ ┌───────┴──────┐ │
│ │ Redis Cluster│ │ UNet Operator │ │
│ │ 任务队列+缓存│ │ (CRD 管理) │ │
│ └──────────────┘ │ 自动扩缩容 │ │
│ └───────────────┘ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ MySQL (主从) │ │ 对象存储 │ │
│ │ 运行元数据 │ │ traces/CSV │ │
│ └──────────────┘ └──────────────┘ │
└──────────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ UNet 仿真器集群 (动态扩缩) │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Cluster A │ │ Cluster B │ │ Cluster C │ │
│ │ 7-node sim │ │ 7-node sim │ │ 14-node sim │ │
│ │ Bellhop 信道 │ │ 标准信道 │ │ 自定义信道 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└──────────────────────────────────────────────────────────────────┘具体改进:
- 分布式 Orchestrator: 多个 Runner 实例并行执行不同实验,通过 Redis 协调任务分配。
- Kubernetes 编排: 将后端和仿真器容器化,利用 K8s HPA 自动扩缩 Worker 数量。
- UNet CRD Operator: 自定义 Kubernetes Operator 管理 UNet 仿真器的生命周期(创建、销毁、健康检查)。
- Bellhop 信道模型集成: 替换 UNet 内置的简化信道模型,集成 Bellhop 射线追踪声学传播模型,提供更高保真的水声仿真。
- 对象存储: 将大量 CSV 产物从本地文件系统迁移到 MinIO / S3,支持跨节点访问和长期归档。
- 时序数据库: 将实时 metrics 从 MySQL 迁移到 InfluxDB / TimescaleDB,优化时间序列查询性能。
6.4 演进阶段对比
| 维度 | 当前 (v1) | 短期 | 中期 | 长期 |
|---|---|---|---|---|
| 进程模型 | 单进程 + daemon 线程 | 同左 | 多进程 Worker | K8s Pod 编排 |
| 实时推送 | WS + HTTP 轮询混合 | 全量 WS | WS + Redis Pub/Sub | WS + 事件总线 |
| 任务队列 | 无 (in-process) | 无 | Redis / RabbitMQ | Redis Cluster |
| 并发实验 | 1 个 | 1 个 | N 个 (多 Worker) | 无上限 (自动扩缩) |
| 仿真集群 | 固定 7 节点 | 同左 | 多组固定集群 | 动态创建/销毁 |
| 信道模型 | UNet 内置 | + Clock 同步 | 同左 | + Bellhop 射线追踪 |
| 存储 | MySQL + 本地文件 | 同左 | MySQL + 对象存储 | 时序DB + 对象存储 |
| 部署 | 开发机 | Docker Compose | Docker Compose | Kubernetes |
7. 关键设计决策记录
7.1 TX/RX 分离 Gateway 连接
问题: fjagepy Gateway 的 request() 和 receive() 共享同一 TCP 连接,RX 线程可能消费掉 TX 路径的 AGREE 响应。
决策: RX 线程使用独立的 fjagepy.Gateway 连接,TX 使用原始连接。代价是每个节点多一条 TCP 连接,但彻底消除竞争。
7.2 逻辑 ID vs 运行时地址
问题: 场景 YAML 定义的 node_id 是逻辑标识(1, 2, 3...),UNet 仿真器分配的 address 可能不同。
决策: addr_by_node_id 映射表在 preflight 阶段通过 NODE_INFO 查询构建。DT 协议头中 src/dst 保持逻辑 ID(用于稳定的 metrics 关联),send_frame(to=) 使用运行时地址。
7.3 反射式表操作
问题: MySQL 表可能由外部工具创建,列集合与 ORM 模型不完全一致。
决策: RunRepository 和 TraceRepository 使用 SQLAlchemy MetaData.reflect() 加载真实表结构,_filter_columns() 自动过滤不存在的列,ensure_tables_with_columns() 自动补充缺失列。
7.4 step_callback 双写容错
问题: 步循环中 DB 写入和 WS 推送需要独立容错,一方失败不应影响另一方。
决策: _build_step_callback() 中 _step_write_disabled 和 _metrics_write_disabled 两个标志独立控制。遇到 IntegrityError 时禁用对应写入路径,但步循环和 WS 推送继续运行。