Skip to content

运行流程与数据流转说明

本文用大白话说明:执行 python -m unet_dt.orchestrator.runner ... 后,程序先做什么、再做什么、数据怎么一步步流转、最后输出到 runs/<run_id>/ 的哪些文件。便于理解项目运行方式与排错。

Run 目录结构与各文件含义详见 数据模型与落盘规范


入口与两种模式

入口python -m unet_dt.orchestrator.runner 会执行 src/unet_dt/orchestrator/runner.pymain()

你敲完命令后main() 先解析 --scenario--mode--strategy--steps--dt--seed--rx-grace 等参数,然后根据 --mode 选一条路走:

  • --mode mock(默认):走 Demo-0 路径。不连真实网络,程序用随机数在内存里“模拟”出一批收发记录(traces),再按时间窗算指标、按策略导出调度表,最后把 traces、指标、调度表、报告写到 runs/<run_id>/。适合离线验证与答辩兜底。
  • --mode gateway:走 Demo-1 路径。先连上真实 UNET 网关(每个节点一个 TCP 地址),启动后台线程收包;然后按步推进:每步用策略决定“谁发”、按场景流量算“发多少”、通过网关真实发包,并把“发了啥”记进 TraceCollector;收到的包若带本次运行的 header 就与已发的记录配对,标为“收到”。所有步跑完后,等一小段 --rx-grace 让在途包到达,再关掉收包、从 TraceCollector 取出完整 traces,算指标、写调度表与报告到 runs/<run_id>/

最终产物(两种模式都会在 runs/<run_id>/ 下生成):scenario.yamlstrategy.jsonversions.jsontraces.csvmetrics.csvschedule_decisions.csvreport.md。区别在于 traces 的来源:mock 是程序随机生成的,gateway 是真实发收后配对得到的。


Mock 模式(Demo-0)数据流

按执行顺序,每步「输入是什么、产出是什么」:

  1. 读场景load_scenario(scenario_path) 把 YAML 解析并校验成 Scenario 对象,里面有节点列表、流量列表(flows)、以及 run 配置(steps、dt_seconds、seed 等)。
  2. 建输出目录:生成时间戳 run_id,创建 runs/<run_id>/,并写入 scenario.yaml(场景快照)、strategy.json(策略 id)、versions.json(git、Python 版本等)。
  3. 生成 mock traces_generate_mock_traces(seed, steps, dt_seconds, flows) 按每条流的周期与随机丢包/时延,在内存里生成一列 TraceRecord(每条代表一次“发送”,部分标记为“收到”)。没有真实网络。
  4. 算指标compute_window_metrics(traces, step_ms, steps) 按时间窗(每个 step 一个窗)聚合,得到每一步的 throughput_bps、loss_rate、delay_p50_ms、delay_p95_ms 等,得到 metrics 列表。
  5. 导出调度表_export_schedule_from_metrics 对每一步的指标调用策略的 decide(ctx),得到该步的 active_nodes、quota_frames,写入 schedule_decisions.csv
  6. 汇总报告compute_overall_summary(metrics) 得到整次运行的均值/最大 p95 等,写入 report.md

数据流关系(Mermaid):

mermaid
flowchart LR
  subgraph input [Input]
    ScenarioFile[scenario_file]
    CLI[CLI_params]
  end
  subgraph mock [Mock Path]
    Load[load_scenario]
    MockTraces[_generate_mock_traces]
    Metrics[compute_window_metrics]
    Schedule[_export_schedule_from_metrics]
    Summary[compute_overall_summary]
  end
  subgraph output [Output]
    RunDir[runs_run_id]
    TracesCSV[traces.csv]
    MetricsCSV[metrics.csv]
    ScheduleCSV[schedule_decisions.csv]
    ReportMD[report.md]
  end
  ScenarioFile --> Load
  CLI --> Load
  Load --> MockTraces
  MockTraces --> Metrics
  Metrics --> Schedule
  Metrics --> Summary
  Schedule --> ScheduleCSV
  Summary --> ReportMD
  MockTraces --> TracesCSV
  Metrics --> MetricsCSV
  RunDir --> TracesCSV
  RunDir --> MetricsCSV
  RunDir --> ScheduleCSV
  RunDir --> ReportMD

Gateway 模式(Demo-1)数据流

按执行顺序,谁在采集、谁在发送、数据怎么进 TraceCollector、最后怎么变成文件:

  1. 读场景:同上,且要求每个 node 有 tcp 地址(如 tcp://localhost:1101),用于连接 UNET 网关。
  2. 建输出目录:同上;versions.json 会多出 mode=gatewaybackend(arlpy/fjagepy)、run_u32unet_instances(各节点 host/port)等。
  3. 连网关get_adapter() 探测得到后端(arlpy 或 fjagepy);对每个节点的 tcp 调用 adapter.connect(inst),得到 inst_by_node_id,后续发收都通过这些连接。
  4. 地址与 MTU(fjagepy):若后端是 fjagepy,会解析各 node 的 UNET 运行时地址 addr_by_node_id(用于 Tx 时填“发给谁”),以及 Datagram MTU;payload 会被裁剪到 MTU-24(24 字节给 DT header),避免超长包。
  5. TraceCollector 与 RX 线程:创建 TraceCollector;为每个节点起一个后台线程执行 adapter.subscribe_rx(..., on_rx)。当网关收到消息时回调 on_rx:解析 payload,若长度够且带 DT header、且 run_id 等于本次 run_u32,则 collector.record_rx(header, rx_time_ms),把之前 record_tx 里同 (run_u32, src, dst, seq) 的那条标为 received、填上 rx_time_ms。
  6. 按步循环(每一步)
    • strategy.decide(ctx) 得到本步 active_nodes 与 quota;
    • 按场景 traffic 与周期算出本步要发的 tx_plan(src, dst, payload_bytes, flow_id);
    • 对 tx_plan 中每条:若 src 在 active 且未超 quota,则组包(pack_header + user_data),adapter.send_frame(inst, to=addr, payload),然后 collector.record_tx(...)
    • 本步结束前 collector.finalize() 得到当前快照,用 compute_window_metrics(snap, ...) 得到到本步为止的 metrics,用于下一步的 last_window_metrics
    • dt 做 real-time sleep,进入下一步。
  7. 收尾rx_grace_s 内再等一会(在途包到达),停止 RX 线程、关闭 adapter;traces = collector.finalize(),再一次性 compute_window_metrics(traces, ...) 得到完整 metrics;写出 traces.csv(source=gateway)、metrics.csvschedule_decisions.csv(循环里已攒的 schedule_rows)、report.md

数据流关系(Mermaid):

mermaid
flowchart TB
  subgraph input_gw [Input]
    ScenarioFileGW[scenario_file]
    CLIGW[CLI_params]
  end
  subgraph gateway_flow [Gateway Path]
    LoadGW[load_scenario]
    GetAdapter[get_adapter]
    Connect[adapter.connect]
    AddrMtu[addr_by_node_id_MTU]
    Collector[TraceCollector]
    RXThreads[subscribe_rx_threads]
    Loop[per_step_loop]
    StrategyDecide[strategy.decide]
    TxPlan[tx_plan]
    SendFrame[adapter.send_frame]
    RecordTx[collector.record_tx]
    RecordRx[on_rx_record_rx]
    Finalize[collector.finalize]
    MetricsGW[compute_window_metrics]
    Grace[rx_grace_sleep]
  end
  subgraph output_gw [Output]
    RunDirGW[runs_run_id]
    TracesCSVGW[traces.csv]
    MetricsCSVGW[metrics.csv]
    ScheduleCSVGW[schedule_decisions.csv]
    ReportMDGW[report.md]
  end
  ScenarioFileGW --> LoadGW
  CLIGW --> LoadGW
  LoadGW --> GetAdapter
  GetAdapter --> Connect
  Connect --> AddrMtu
  AddrMtu --> Collector
  AddrMtu --> RXThreads
  Collector --> RXThreads
  RXThreads --> RecordRx
  RXThreads --> Loop
  Loop --> StrategyDecide
  StrategyDecide --> TxPlan
  TxPlan --> SendFrame
  SendFrame --> RecordTx
  RecordRx --> Collector
  Loop --> Finalize
  Finalize --> MetricsGW
  Loop --> Grace
  Grace --> Finalize
  MetricsGW --> TracesCSVGW
  MetricsGW --> MetricsCSVGW
  Finalize --> TracesCSVGW
  Loop --> ScheduleCSVGW
  MetricsGW --> ReportMDGW
  RunDirGW --> TracesCSVGW
  RunDirGW --> MetricsCSVGW
  RunDirGW --> ScheduleCSVGW
  RunDirGW --> ReportMDGW

关键产物对应关系

产物Mock 模式来源Gateway 模式来源
traces.csv_generate_mock_traces(...) 生成的 TraceRecord 列表TraceCollector.finalize():每步 record_tx + 回调里 record_rx 配对后的列表
metrics.csvcompute_window_metrics(traces, step_ms, steps) 按窗聚合同上,最后用完整 traces 再算一遍
schedule_decisions.csv_export_schedule_from_metrics(metrics):对每步指标调用策略 decide 得到循环里每步 strategy.decide(ctx) 结果写入 schedule_rows,最后一次性写出

两种模式下,scenario.yamlstrategy.jsonversions.jsonreport.md 的写法一致;versions.json 在 gateway 下会多出 modebackendrun_u32unet_instances 等字段。

想观察每步数据变化,可在命令中加 --debug,程序会在关键阶段向 stderr 打印摘要(场景、traces 条数、每步发送/接收数、最终 summary 等)。详见 验收与演示脚本 中的说明。