Skip to content

系统整体架构

本文档描述 UNET-DT 水声网络数字孪生调度平台 自身的完整系统架构,涵盖分层视图、核心数据流转、关键数据通路以及未来演进方向。

本文所述"系统"指毕设项目本体(Python 后端 + React 前端 + 仿真适配层),而非文档站点架构。


1. 系统总体架构图(分层视图)

┌──────────────────────────────────────────────────────────────────────┐
│                         前端层 (Frontend)                            │
│  React 19 + Vite 7 + TailwindCSS 4 + TypeScript SPA                │
│  页面: Dashboard / Scenarios / Runs / Monitor / Config              │
│  通信: REST fetch + WebSocket useWebSocket / useRunPolling          │
└────────────────────────┬───────────────────┬─────────────────────────┘
                         │ HTTP/REST         │ WebSocket
                         ▼                   ▼
┌──────────────────────────────────────────────────────────────────────┐
│                     API 网关层 (API Gateway)                         │
│  FastAPI + Uvicorn + CORS(allow_origins=*) + RequestLoggingMW       │
│  路由组:                                                             │
│    /health      健康检查         /config     运行时配置               │
│    /scenarios   场景 CRUD        /strategies 策略列表                 │
│    /runs        运行生命周期      /ws/runs/{id} WebSocket 实时推送    │
│  依赖注入: get_repo / get_run_service / get_scenario_repo /          │
│            get_trace_repo / get_ws_broadcaster                       │
└────────────────────────┬─────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────────┐
│                    业务服务层 (Business Service)                      │
│                                                                      │
│  RunService              场景解析 + 状态机守卫 + Worker 生命周期管理    │
│    ├─ start_run()        创建 ExecutionWorker → 启动后台线程          │
│    ├─ stop_run()         设置 stop_event → 状态过渡 stopping/stopped  │
│    └─ get_run()          查询 + 汇总                                 │
│                                                                      │
│  ExecutionWorker         后台执行线程 (daemon thread)                 │
│    ├─ _run()             调用 executor → 写 DB → 发布 WS 事件        │
│    ├─ _build_step_cb()   构建 step_callback 闭包: DB + WS 双写       │
│    ├─ _persist_traces()  批量写入 traces/metrics 到数据库             │
│    └─ _upsert_summary()  汇总写入 run_summary                       │
│                                                                      │
│  LocationInitializer     运行前节点坐标初始化 (subprocess 调用脚本)    │
└────────────────────────┬─────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────────┐
│                   调度引擎层 (Orchestrator Engine)                    │
│                                                                      │
│  runner.run_demo()       入口分发: mock / gateway 模式               │
│    │                                                                 │
│    ├─ run_demo_mock()    Mock 模式: 随机生成 traces → 离线计算        │
│    │    ├─ _generate_mock_traces()     伪随机 TX/RX 生成             │
│    │    ├─ compute_window_metrics()    按步窗口聚合指标               │
│    │    └─ step_callback 模拟推送                                    │
│    │                                                                 │
│    └─ run_demo_gateway() Gateway 模式: 真实水声仿真                   │
│         ├─ run_phy_preflight()    PHY 健康预检 + 地址发现             │
│         ├─ adapter.connect()      建立 fjage Gateway 连接            │
│         ├─ adapter.subscribe_rx() 每节点独立 RX 线程                  │
│         ├─ Step Loop              时间步循环 (核心调度)               │
│         │    ├─ strategy.decide()     调度策略决策                    │
│         │    ├─ quota 检查 + TX 计划  流量规划                       │
│         │    ├─ pack_header()         24字节 DT 协议头编码            │
│         │    ├─ adapter.send_frame()  发送至 UNet 节点               │
│         │    ├─ collector.record_tx() TX 登记                        │
│         │    ├─ rx_settle 等待        声学传播沉淀                    │
│         │    ├─ collector.finalize()  快照 + 指标计算                 │
│         │    └─ step_callback()       DB写入 + WS推送                │
│         └─ Grace Period → 关闭 RX → 导出 CSV + 报告                 │
│                                                                      │
│  TraceCollector          TX/RX 事件配对引擎                           │
│    ├─ record_tx()        以 (run_u32, src, dst, seq) 为 key 登记     │
│    ├─ record_rx()        匹配 TX → 标记 received + rx_time_ms       │
│    └─ finalize()         生成完整 trace 列表 (含丢包推断)             │
│                                                                      │
│  Scheduler               调度策略抽象                                 │
│    ├─ RoundRobinStrategy    轮转: 每步激活 1 个节点                   │
│    └─ AdaptiveQosStrategy   自适应: loss>0.2 时扩展到 2 个活跃节点    │
└────────────────────────┬─────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────────┐
│                     适配器层 (Adapter Layer)                          │
│                                                                      │
│  UnetAdapter (ABC)       抽象基类: connect/close/probe/check_phy/    │
│                          subscribe_rx/send_frame/get_time/            │
│                          query_node_address                          │
│                                                                      │
│  GatewayAdapterFjagepy   fjagepy.Gateway 实现 (首选后端)             │
│    ├─ 独立 TX/RX Gateway   避免 request/receive 竞争                 │
│    ├─ DatagramReq → TxFrameReq 降级策略                              │
│    ├─ PHYSICAL/LINK 服务发现 (排除 transport agent)                   │
│    └─ query_node_address()  NODE_INFO ParameterReq 查询              │
│                                                                      │
│  GatewayAdapterArlpy     arlpy.unet.Gateway 实现 (备选后端)          │
│    ├─ TxFrameReq 发送                                                │
│    └─ probe fallback: 从 services 列表推断 PHY 可用性                │
│                                                                      │
│  LogOnlyAdapter          空实现 (无真实通信能力, 仅用于日志)          │
│                                                                      │
│  factory.get_adapter()   自动检测: arlpy → fjagepy → logonly         │
│                                                                      │
│  Gateway Preflight       运行前健康检查                               │
│    ├─ probe + check_phy   验证 PHYSICAL 服务可用                     │
│    ├─ query_node_address  发现 UNet 运行时地址                       │
│    └─ 选择策略: 优先 preferred → 补充 healthy → 截断 required_count  │
│                                                                      │
│  辅助模块:                                                            │
│    rx_parse.py           自适应 RX 消息解析 (兼容多版本 fjage 消息)   │
│    utils.py              best_effort_subscribe / normalize_message    │
│    payload_header.py     24字节小端协议头: run_id(u32) + step_id(u32) │
│                          + seq(u32) + tx_time_ms(u64) + src(u16)     │
│                          + dst(u16) = 24 bytes                       │
└────────────────────────┬─────────────────────────────────────────────┘
                         │ fjage JSON / TCP

┌──────────────────────────────────────────────────────────────────────┐
│                     外部系统 (External Systems)                       │
│                                                                      │
│  UNet 仿真器集群 (7节点)                                              │
│    远程服务器: 198.144.181.94                                         │
│    端口映射: 1101-1107 (每端口对应一个仿真节点)                       │
│    协议: fjage JSON-over-TCP                                         │
│    服务: PHYSICAL / LINK / NODE_INFO / DATAGRAM(transport)           │
│                                                                      │
│  文件系统                                                             │
│    scenarios/            场景 YAML 定义文件                           │
│    runs/<run_id>/        运行产物目录                                 │
│      ├─ scenario.yaml    运行快照                                    │
│      ├─ strategy.json    策略配置                                    │
│      ├─ versions.json    版本 + 运行元数据                           │
│      ├─ traces.csv       帧级 TX/RX 追踪记录                        │
│      ├─ metrics.csv      步级聚合指标                                │
│      ├─ schedule_decisions.csv  调度决策日志                         │
│      └─ report.md        运行摘要报告                                │
└────────────────────────┬─────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────────┐
│                     持久化层 (Persistence)                            │
│                                                                      │
│  Database                SQLAlchemy Engine + Session 工厂             │
│    ├─ 连接池: pool_size / max_overflow / pool_recycle                │
│    ├─ pool_pre_ping 心跳检测                                         │
│    └─ session_scope() 上下文管理 (自动 commit/rollback)              │
│                                                                      │
│  RunRepository           runs + run_summary + run_events 表操作      │
│    ├─ 反射式表发现 (MetaData autoload)                               │
│    ├─ 状态机过渡: created→running→stopping→stopped/finished/failed   │
│    ├─ _filter_columns() 容忍 schema 差异                             │
│    └─ ensure_tables_with_columns() 自动 ALTER ADD COLUMN             │
│                                                                      │
│  TraceRepository         run_traces + run_metrics 表操作             │
│    ├─ bulk_insert_traces/metrics   批量写入                          │
│    ├─ FK 预检: 验证 run_id 存在再 INSERT                             │
│    ├─ charset 一致性检查 (MySQL utf8mb4 vs utf8mb3)                  │
│    └─ has_traces/has_metrics 快速存在性判断                           │
│                                                                      │
│  ScenarioRepository      场景 DB + YAML 双源同步                     │
│    ├─ sync_to_yaml()     DB → 磁盘 YAML 文件同步                    │
│    └─ CRUD 操作          场景的增删改查                               │
│                                                                      │
│  存储后端: MySQL (远程) / SQLite (本地开发)                           │
│  表结构:                                                              │
│    dt_runs         运行主表 (状态机 + 参数 + 时间戳)                  │
│    dt_run_summary  运行汇总 (avg_throughput / loss / delay)          │
│    dt_run_events   运行事件流 (event_type + payload_json)            │
│    dt_run_traces   帧级追踪 (step_id / seq / tx_time / rx_time)     │
│    dt_run_metrics  步级指标 (throughput / loss / delay_p50/p95)      │
│    dt_scenarios    场景定义 (YAML 解析后的结构化存储)                  │
└──────────────────────────────────────────────────────────────────────┘

2. 数据流转图

2.1 运行创建与执行主流程

用户 (前端 SPA)

  │ POST /runs  {scenario_id, strategy_id, mode, steps, dt, seed}

API 路由层 (routes/runs.py)

  │ 场景查找: ScenarioRepository(DB) → YAML fallback
  │ 生成 run_id → RunRepository.create_run() → status="created"

  │ POST /runs/{id}/start

RunService.start_run()

  ├─ LocationInitializer.run_before_execution()   # 节点坐标初始化
  │     └─ subprocess → 初始化脚本 → 设置 UNet 节点位置

  ├─ RunRepository.transition_run_status("running")

  └─ ExecutionWorker(daemon thread).start()

       ├─ step_callback = _build_step_callback()
       │     闭包: DB current_step 更新 + metrics 实时写入 + WS 事件发布

       └─ executor(request, stop_event, step_callback)

            └─ runner.run_demo(mode=...)

                 ├─ mode="mock" → run_demo_mock()
                 │     生成伪随机 traces → 计算 metrics → CSV 导出
                 │     模拟步循环: step_callback 逐步推送

                 └─ mode="gateway" → run_demo_gateway()
                       (详见 2.2 节)

2.2 Gateway 模式核心数据流

run_demo_gateway()

  ├─ [1] 初始化阶段
  │    ├─ load_scenario(YAML)              解析场景配置
  │    ├─ get_adapter()                    检测: arlpy → fjagepy → logonly
  │    ├─ 构建 UnetInstance 列表           从 scenario.nodes[].tcp 提取 host:port
  │    ├─ run_phy_preflight()              PHY 健康预检
  │    │    ├─ adapter.connect()           建立连接
  │    │    ├─ adapter.probe()             探查 agents/services
  │    │    ├─ adapter.check_phy()         验证 PHYSICAL 服务存在
  │    │    ├─ adapter.query_node_address() 发现 UNet 运行时地址
  │    │    └─ select(preferred, healthy)  选择满足 required_count 的节点集
  │    ├─ 地址映射构建                      addr_by_node_id: scenario unet_addr > preflight
  │    ├─ MTU 检测                          get_datagram_mtu() → max_user_bytes
  │    └─ adapter.connect() (正式连接)

  ├─ [2] RX 订阅阶段
  │    └─ 每个节点启动独立 daemon 线程:
  │         adapter.subscribe_rx(inst, on_msg, stop_event, ready_event)
  │           └─ 独立 fjagepy.Gateway 连接 (避免 TX/RX 竞争)
  │           └─ best_effort_subscribe() → receive_message() 循环
  │           └─ on_msg → on_rx() 回调
  │                 ├─ parse_rx_message()     自适应解析 payload
  │                 ├─ PayloadHeader.from_bytes()  解码 24 字节头
  │                 ├─ run_id 校验              过滤其他实验的帧
  │                 └─ collector.record_rx()    TX/RX 配对
  │    └─ rx_ready_events 等待 (15s 超时)    确保 RX 就绪后才发送

  ├─ [3] Step Loop (核心调度循环)
  │    for step_id in range(steps):
  │      │
  │      ├─ strategy.decide(StepContext)
  │      │    输入: step_id, t_window, last_window_metrics, queues, link_hints
  │      │    输出: ScheduleDecision(active_nodes, quota_frames, meta)
  │      │
  │      ├─ TX 计划生成
  │      │    遍历 active_flows → 检查 flow_next_due_ms → 构建 tx_plan
  │      │
  │      ├─ TX 执行
  │      │    for (src, dst, bytes, flow_id) in tx_plan:
  │      │      ├─ quota 检查: sent_in_step[src] < quota[src]
  │      │      ├─ cap = min(payload_bytes, max_user_bytes)
  │      │      ├─ pack_header(run_u32, step_id, seq, tx_time_ms, src, dst)
  │      │      │    → 24 字节头 + user_data
  │      │      ├─ dst_addr = addr_by_node_id[dst]   (运行时地址映射)
  │      │      ├─ adapter.send_frame(inst, to=dst_addr, payload)
  │      │      │    → DatagramReq(phy) / TxFrameReq 降级
  │      │      └─ collector.record_tx()
  │      │
  │      ├─ RX 沉淀等待
  │      │    time.sleep(min(2.0, dt * 0.5))   声学传播时间
  │      │
  │      ├─ 指标快照
  │      │    collector.finalize() → compute_window_metrics()
  │      │    last_window_metrics = m[-2]  (用上一步修正后的值)
  │      │
  │      ├─ step_callback(run_id, step_id, decision, metrics)
  │      │    ├─ repo.set_run_fields(current_step=step_id)
  │      │    ├─ trace_repo.bulk_insert_metrics(实时写入本步 metrics)
  │      │    └─ WS publish: step + schedule + metrics 三类事件
  │      │
  │      └─ 步间节拍对齐
  │           target = start_perf + (step_id+1) * dt_seconds
  │           sleep(target - now)

  └─ [4] 收尾阶段
       ├─ Grace Period (rx_grace_s, 默认 5s) 等待在途帧到达
       ├─ stop_event.set() → RX 线程退出
       ├─ adapter.close() 关闭所有 Gateway 连接
       ├─ collector.finalize() → export_traces/metrics/schedule CSV
       ├─ compute_overall_summary() → report.md
       └─ (fallback) Gateway 失败 + 非 strict → 降级为 mock 模式

2.3 前端实时数据推送

ExecutionWorker 线程                  FastAPI 主线程                   浏览器 SPA
        │                                    │                            │
        │  step_callback()                   │                            │
        │──publisher.publish_from_sync()────>│                            │
        │    (thread-safe: future.result)    │                            │
        │                                    │ RunEventBroadcaster        │
        │                                    │   ._channels[run_id]       │
        │                                    │   → Queue → sender_task    │
        │                                    │──ws.send_json(envelope)──>│
        │                                    │                            │
        │                                    │  事件类型:                  │
        │                                    │   step: {step_id, total}   │
        │                                    │   schedule: {active, quota}│
        │                                    │   metrics: {throughput...} │
        │                                    │   state: {status}          │
        │                                    │                            │
 同时:                                       │                            │
        │  repo.set_run_fields()             │                            │
        │  trace_repo.bulk_insert_metrics()  │                            │
        │                                    │  GET /runs/{id}/status ──>│
        │                                    │  (轮询降级: 当 WS 不可用)  │
        │                                    │<── RunStatusResponse       │

2.4 场景数据双源同步

                    ┌────────────────┐
                    │  用户创建场景    │
                    └───────┬────────┘

               ┌────────────┴────────────┐
               ▼                         ▼
    POST /scenarios (API)          手动编辑 YAML
               │                         │
               ▼                         ▼
    ScenarioRepository             scenarios/*.yaml
    (dt_scenarios 表)                (文件系统)
               │                         │
               └─────── sync ────────────┘


           RunService._resolve_scenario_path()
             1. scenario_repo.sync_to_yaml()  DB → YAML
             2. glob(scenarios/*.yaml)         扫描匹配
             3. load_scenario(path)            Pydantic 校验

3. 关键数据通路详解

3.1 TX 路径 (发送链路)

Runner Step Loop

  ├─ strategy.decide(ctx)                选择活跃节点 + 配额
  │    ctx.last_window_metrics           上一步的信道质量反馈

  ├─ quota 检查                          sent_in_step[src] < quota_frames[src]

  ├─ PayloadHeader 编码                  24 字节小端固定头
  │    run_id:u32  ──── 实验唯一标识 (随机 u32)
  │    step_id:u32 ──── 时间步序号
  │    seq:u32     ──── 源-目的对内序列号
  │    tx_time_ms:u64 ─ 高精度发送时间戳 (perf_counter_ns)
  │    src:u16     ──── 逻辑源节点 ID (场景定义)
  │    dst:u16     ──── 逻辑目的节点 ID (场景定义)

  ├─ MTU 裁剪                            min(payload_bytes, max_user_bytes)
  │    max_user_bytes = MTU - 24         (典型: 56 - 24 = 32 bytes)

  ├─ adapter.send_frame(inst, to=dst_addr, payload=header+user_data)
  │    dst_addr = addr_by_node_id[dst]   逻辑ID → 运行时地址映射
  │    │
  │    ├─ [fjagepy] DatagramReq(phy, to, protocol=0, data=[int])
  │    │    recipient = agentForService(PHYSICAL/LINK)
  │    │    排除 transport agent (避免可靠传输拥塞)
  │    │    gw.request(msg, timeout=5000)
  │    │    AGREE → 成功; 其他 → 降级 TxFrameReq
  │    │
  │    └─ [arlpy] TxFrameReq(to, data)
  │         gw.request(msg) 或 gw.send(msg)

  └─ collector.record_tx(run_u32, step_id, seq, tx_time_ms, src, dst)
       以 key=(run_u32, src, dst, seq) 登记到 _tx_by_key

3.2 RX 路径 (接收链路)

UNet 仿真节点 (PHY 层)

  │ 声学帧到达 → fjage notification (DatagramNtf / RxFrameNtf)

fjagepy.Gateway (独立 RX 连接)

  │ receive_message(gw, timeout=1.0)

normalize_message(msg)              统一消息字典格式


on_rx(inst, msg) 回调

  ├─ parse_rx_message(msg)          自适应 payload 提取
  │    ├─ _find_payload_bytes()      按 score 排序: data > payload > bytes
  │    │    支持: bytes / bytearray / list[int] / numpy uint8
  │    ├─ _find_int_field(src/dst)   兼容 src/source/from/sender 等变体
  │    └─ 返回 RxParseResult(payload, src, dst, meta)

  ├─ 长度检查: len(payload) >= HEADER_SIZE (24)

  ├─ PayloadHeader.from_bytes(payload)   解码 DT 协议头

  ├─ run_id 校验: hdr.run_id == run_u32  过滤非本次实验的帧

  ├─ 计算延迟: rx_ms = now_ms(); delay = rx_ms - hdr.tx_time_ms

  └─ collector.record_rx(header, rx_time_ms)
       key = (run_u32, src, dst, seq)
       匹配 _tx_by_key[key] → tr.received=True, tr.rx_time_ms=rx_ms
       无匹配 → orphan_rx (用于调试)

3.3 监控数据通路

Runner Step Loop

  │ step_callback(run_id, step_id, decision, window_metrics)

ExecutionWorker._build_step_callback() 闭包

  ├─ DB 写入路径 (持久化)
  │    ├─ repo.set_run_fields(current_step=step_id)    更新进度
  │    │    → UPDATE dt_runs SET current_step=? WHERE id=?
  │    │
  │    └─ trace_repo.bulk_insert_metrics(run_id, [row]) 实时 metrics
  │         → INSERT INTO dt_run_metrics (run_id, step_id, ...)
  │         FK 预检: SELECT id FROM dt_runs WHERE id=?
  │         IntegrityError → 禁用后续写入 (_metrics_write_disabled)

  ├─ WS 推送路径 (实时)
  │    publisher.publish_from_sync()
  │      │
  │      ├─ 同线程检测: asyncio.get_running_loop() == self._loop?
  │      │    是 → loop.create_task(publish())
  │      │    否 → asyncio.run_coroutine_threadsafe(publish(), loop)
  │      │
  │      └─ RunEventBroadcaster.publish()
  │           → _channels[run_id] → 每个连接的 Queue → sender_task
  │           → websocket.send_json(RunEventEnvelope)
  │           事件类型: step / schedule / metrics

  └─ 前端消费
       ├─ useWebSocket(/ws/runs/{id})    优先: 实时推送
       │    connect → 初始 state + metrics → 持续 step/schedule/metrics

       └─ useRunPolling                   降级: HTTP 轮询
            GET /runs/{id}/status
            → RunStatusResponse(run, summary, current_step, latest_metrics)

4. Run 状态机

                    ┌─────────┐
                    │ created │
                    └────┬────┘
                         │ start_run()

                    ┌─────────┐
              ┌─────│ running │─────┐
              │     └────┬────┘     │
              │          │          │
         stop_run()    完成      异常
              │          │          │
              ▼          ▼          ▼
         ┌──────────┐ ┌──────────┐ ┌────────┐
         │ stopping │ │ finished │ │ failed │
         └────┬─────┘ └──────────┘ └────────┘


         ┌─────────┐
         │ stopped │
         └─────────┘

终态: stopped / finished / failed (不可再转换)

5. 部署架构

5.1 当前部署 (单机开发)

┌─────────────────────────────────────────────────────────┐
│  开发机 (Windows)                                        │
│                                                          │
│  ┌──────────────┐  ┌──────────────────────────────────┐ │
│  │ Vite Dev     │  │ Uvicorn (Python)                 │ │
│  │ Server       │  │   FastAPI app                    │ │
│  │ :5173        │  │   :8000                          │ │
│  │ React SPA    │  │   ExecutionWorker threads        │ │
│  │              │──│   WebSocket broadcaster          │ │
│  │ HMR 热更新   │  │                                  │ │
│  └──────────────┘  └────────────┬─────────────────────┘ │
│                                 │                        │
│                    ┌────────────┴───────────┐            │
│                    │ SQLAlchemy Engine      │            │
│                    │ MySQL / SQLite         │            │
│                    └────────────────────────┘            │
└─────────────────────────────────────────────────────────┘
                         │ TCP (fjage JSON)

┌─────────────────────────────────────────────────────────┐
│  远程 UNet 仿真服务器: 198.144.181.94                    │
│                                                          │
│  ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐                   │
│  │:1101 │ │:1102 │ │:1103 │ │...   │  共 7 个节点       │
│  │node-1│ │node-2│ │node-3│ │:1107 │                    │
│  └──────┘ └──────┘ └──────┘ └──────┘                    │
│                                                          │
│  UnetStack 仿真器 (Java)                                 │
│    PHYSICAL / LINK / NODE_INFO 服务                      │
│    声学信道建模 + 帧级仿真                                │
└─────────────────────────────────────────────────────────┘

5.2 数据库连接拓扑

Python 后端

  ├─ MySQL (生产)
  │    URL: mysql+pymysql://user:pass@host:3306/dbname
  │    连接池: pool_size=5, max_overflow=10, pool_recycle=3600
  │    pool_pre_ping=True (心跳检测断连)
  │    charset 一致性: utf8mb4_general_ci

  └─ SQLite (开发/测试)
       URL: sqlite:///local.db
       无连接池 (SQLite 单文件)

6. 未来架构演进

6.1 短期优化 (当前可行)

目标: 在不改变整体架构的前提下,提升系统稳定性和用户体验。

当前:                                    目标:
┌─────────────┐                         ┌─────────────┐
│ 前端轮询     │  GET /status 每2s       │ WebSocket    │  全量 WS 推送
│ useRunPolling│ ──────────────────>     │ useWebSocket │  step/metrics
└─────────────┘                         └─────────────┘

当前:                                       │ publish_from_sync()
┌─────────────┐                         ┌───┴─────────┐
│ 步循环无屏障 │  sleep(dt) 固定节拍      │ barrier/clock│ 仿真时钟同步
│ rx_settle    │                         │ 激活         │ 精确步边界
└─────────────┘                         └─────────────┘

具体改进:

  • WebSocket 替代轮询: 前端已有 useWebSocket hook 和后端 RunEventBroadcaster,将 useRunPolling 中的 HTTP 轮询切换到 WS 实时推送。
  • Barrier/Clock 激活: 利用 UNet 仿真器的 ClockSync 服务对齐时间步边界,替代当前的 time.sleep(dt) 固定等待。
  • MTU 协商增强: 当前仅在首个节点上检测 MTU,扩展为所有活跃节点的 MTU 共识取最小值。
  • Adapter 连接池: 复用 Gateway 连接,避免每次 preflight 后重建连接。

6.2 中期重构 (有一定挑战)

目标: 解耦计算密集的执行层与 I/O 密集的 API 层,支持并发运行。

当前: 单进程 + daemon 线程                  目标: 多 Worker + 消息队列

┌──────────┐                              ┌──────────┐
│ FastAPI   │                              │ FastAPI   │
│ + Worker  │                              │ (API only)│
│ in-process│                              └─────┬─────┘
└──────────┘                                     │ 任务投递

                                           ┌───────────┐
                                           │  Redis /   │
                                           │  RabbitMQ  │
                                           └─────┬─────┘

                                    ┌────────────┼────────────┐
                                    ▼            ▼            ▼
                              ┌──────────┐ ┌──────────┐ ┌──────────┐
                              │ Worker-1 │ │ Worker-2 │ │ Worker-N │
                              │ (独立进程)│ │ (独立进程)│ │ (独立进程)│
                              └──────────┘ └──────────┘ └──────────┘

具体改进:

  • 消息队列解耦: 引入 Redis (Celery) 或 RabbitMQ 作为任务队列,API 层只负责入队,Worker 独立消费。
  • 多 Worker 并发: 每个 Worker 进程独立运行一个 run_demo_gateway(),支持同时执行多个实验。
  • 结果回写: Worker 完成后通过消息队列或 DB 通知 API 层更新状态。
  • WebSocket 代理: API 层从 Redis Pub/Sub 订阅 Worker 事件,转发给前端 WS 连接。
  • 调度策略插件化: 将 Strategy 接口扩展为可动态注册的插件系统,支持运行时加载新策略。

6.3 长期愿景 (架构升级)

目标: 支持大规模、多场景并行实验,集成高保真信道模型。

┌──────────────────────────────────────────────────────────────────┐
│                     Kubernetes 集群                               │
│                                                                   │
│  ┌─────────┐  ┌──────────────┐  ┌──────────────────────────────┐ │
│  │ Ingress │  │ API Pod (x2) │  │ Orchestrator Pod (x N)       │ │
│  │ Nginx   │→ │ FastAPI      │  │   每 Pod 一个 Runner 实例     │ │
│  │         │  │ 无状态       │  │   独占一组 UNet 节点          │ │
│  └─────────┘  └──────┬───────┘  └──────────┬───────────────────┘ │
│                       │                      │                    │
│               ┌───────┴──────┐       ┌───────┴──────┐            │
│               │ Redis Cluster│       │ UNet Operator │            │
│               │ 任务队列+缓存│       │ (CRD 管理)    │            │
│               └──────────────┘       │ 自动扩缩容    │            │
│                                      └───────────────┘            │
│  ┌──────────────┐  ┌──────────────┐                              │
│  │ MySQL (主从) │  │ 对象存储      │                              │
│  │ 运行元数据   │  │ traces/CSV   │                              │
│  └──────────────┘  └──────────────┘                              │
└──────────────────────────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────┐
│  UNet 仿真器集群 (动态扩缩)                                       │
│                                                                   │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐           │
│  │ Cluster A    │  │ Cluster B    │  │ Cluster C    │           │
│  │ 7-node sim   │  │ 7-node sim   │  │ 14-node sim  │           │
│  │ Bellhop 信道 │  │ 标准信道     │  │ 自定义信道   │            │
│  └──────────────┘  └──────────────┘  └──────────────┘           │
└──────────────────────────────────────────────────────────────────┘

具体改进:

  • 分布式 Orchestrator: 多个 Runner 实例并行执行不同实验,通过 Redis 协调任务分配。
  • Kubernetes 编排: 将后端和仿真器容器化,利用 K8s HPA 自动扩缩 Worker 数量。
  • UNet CRD Operator: 自定义 Kubernetes Operator 管理 UNet 仿真器的生命周期(创建、销毁、健康检查)。
  • Bellhop 信道模型集成: 替换 UNet 内置的简化信道模型,集成 Bellhop 射线追踪声学传播模型,提供更高保真的水声仿真。
  • 对象存储: 将大量 CSV 产物从本地文件系统迁移到 MinIO / S3,支持跨节点访问和长期归档。
  • 时序数据库: 将实时 metrics 从 MySQL 迁移到 InfluxDB / TimescaleDB,优化时间序列查询性能。

6.4 演进阶段对比

维度当前 (v1)短期中期长期
进程模型单进程 + daemon 线程同左多进程 WorkerK8s Pod 编排
实时推送WS + HTTP 轮询混合全量 WSWS + Redis Pub/SubWS + 事件总线
任务队列无 (in-process)Redis / RabbitMQRedis Cluster
并发实验1 个1 个N 个 (多 Worker)无上限 (自动扩缩)
仿真集群固定 7 节点同左多组固定集群动态创建/销毁
信道模型UNet 内置+ Clock 同步同左+ Bellhop 射线追踪
存储MySQL + 本地文件同左MySQL + 对象存储时序DB + 对象存储
部署开发机Docker ComposeDocker ComposeKubernetes

7. 关键设计决策记录

7.1 TX/RX 分离 Gateway 连接

问题: fjagepy Gateway 的 request()receive() 共享同一 TCP 连接,RX 线程可能消费掉 TX 路径的 AGREE 响应。

决策: RX 线程使用独立的 fjagepy.Gateway 连接,TX 使用原始连接。代价是每个节点多一条 TCP 连接,但彻底消除竞争。

7.2 逻辑 ID vs 运行时地址

问题: 场景 YAML 定义的 node_id 是逻辑标识(1, 2, 3...),UNet 仿真器分配的 address 可能不同。

决策: addr_by_node_id 映射表在 preflight 阶段通过 NODE_INFO 查询构建。DT 协议头中 src/dst 保持逻辑 ID(用于稳定的 metrics 关联),send_frame(to=) 使用运行时地址。

7.3 反射式表操作

问题: MySQL 表可能由外部工具创建,列集合与 ORM 模型不完全一致。

决策: RunRepositoryTraceRepository 使用 SQLAlchemy MetaData.reflect() 加载真实表结构,_filter_columns() 自动过滤不存在的列,ensure_tables_with_columns() 自动补充缺失列。

7.4 step_callback 双写容错

问题: 步循环中 DB 写入和 WS 推送需要独立容错,一方失败不应影响另一方。

决策: _build_step_callback()_step_write_disabled_metrics_write_disabled 两个标志独立控制。遇到 IntegrityError 时禁用对应写入路径,但步循环和 WS 推送继续运行。