Appearance
运行流程与数据流转说明
本文用大白话说明:执行 python -m unet_dt.orchestrator.runner ... 后,程序先做什么、再做什么、数据怎么一步步流转、最后输出到 runs/<run_id>/ 的哪些文件。便于理解项目运行方式与排错。
Run 目录结构与各文件含义详见 数据模型与落盘规范。
入口与两种模式
入口:python -m unet_dt.orchestrator.runner 会执行 src/unet_dt/orchestrator/runner.py 的 main()。
你敲完命令后:main() 先解析 --scenario、--mode、--strategy、--steps、--dt、--seed、--rx-grace 等参数,然后根据 --mode 选一条路走:
--mode mock(默认):走 Demo-0 路径。不连真实网络,程序用随机数在内存里“模拟”出一批收发记录(traces),再按时间窗算指标、按策略导出调度表,最后把 traces、指标、调度表、报告写到runs/<run_id>/。适合离线验证与答辩兜底。--mode gateway:走 Demo-1 路径。先连上真实 UNET 网关(每个节点一个 TCP 地址),启动后台线程收包;然后按步推进:每步用策略决定“谁发”、按场景流量算“发多少”、通过网关真实发包,并把“发了啥”记进 TraceCollector;收到的包若带本次运行的 header 就与已发的记录配对,标为“收到”。所有步跑完后,等一小段--rx-grace让在途包到达,再关掉收包、从 TraceCollector 取出完整 traces,算指标、写调度表与报告到runs/<run_id>/。
最终产物(两种模式都会在 runs/<run_id>/ 下生成):scenario.yaml、strategy.json、versions.json、traces.csv、metrics.csv、schedule_decisions.csv、report.md。区别在于 traces 的来源:mock 是程序随机生成的,gateway 是真实发收后配对得到的。
Mock 模式(Demo-0)数据流
按执行顺序,每步「输入是什么、产出是什么」:
- 读场景:
load_scenario(scenario_path)把 YAML 解析并校验成Scenario对象,里面有节点列表、流量列表(flows)、以及 run 配置(steps、dt_seconds、seed 等)。 - 建输出目录:生成时间戳
run_id,创建runs/<run_id>/,并写入scenario.yaml(场景快照)、strategy.json(策略 id)、versions.json(git、Python 版本等)。 - 生成 mock traces:
_generate_mock_traces(seed, steps, dt_seconds, flows)按每条流的周期与随机丢包/时延,在内存里生成一列TraceRecord(每条代表一次“发送”,部分标记为“收到”)。没有真实网络。 - 算指标:
compute_window_metrics(traces, step_ms, steps)按时间窗(每个 step 一个窗)聚合,得到每一步的 throughput_bps、loss_rate、delay_p50_ms、delay_p95_ms 等,得到metrics列表。 - 导出调度表:
_export_schedule_from_metrics对每一步的指标调用策略的decide(ctx),得到该步的 active_nodes、quota_frames,写入schedule_decisions.csv。 - 汇总报告:
compute_overall_summary(metrics)得到整次运行的均值/最大 p95 等,写入report.md。
数据流关系(Mermaid):
mermaid
flowchart LR
subgraph input [Input]
ScenarioFile[scenario_file]
CLI[CLI_params]
end
subgraph mock [Mock Path]
Load[load_scenario]
MockTraces[_generate_mock_traces]
Metrics[compute_window_metrics]
Schedule[_export_schedule_from_metrics]
Summary[compute_overall_summary]
end
subgraph output [Output]
RunDir[runs_run_id]
TracesCSV[traces.csv]
MetricsCSV[metrics.csv]
ScheduleCSV[schedule_decisions.csv]
ReportMD[report.md]
end
ScenarioFile --> Load
CLI --> Load
Load --> MockTraces
MockTraces --> Metrics
Metrics --> Schedule
Metrics --> Summary
Schedule --> ScheduleCSV
Summary --> ReportMD
MockTraces --> TracesCSV
Metrics --> MetricsCSV
RunDir --> TracesCSV
RunDir --> MetricsCSV
RunDir --> ScheduleCSV
RunDir --> ReportMDGateway 模式(Demo-1)数据流
按执行顺序,谁在采集、谁在发送、数据怎么进 TraceCollector、最后怎么变成文件:
- 读场景:同上,且要求每个 node 有
tcp地址(如tcp://localhost:1101),用于连接 UNET 网关。 - 建输出目录:同上;
versions.json会多出mode=gateway、backend(arlpy/fjagepy)、run_u32、unet_instances(各节点 host/port)等。 - 连网关:
get_adapter()探测得到后端(arlpy 或 fjagepy);对每个节点的 tcp 调用adapter.connect(inst),得到inst_by_node_id,后续发收都通过这些连接。 - 地址与 MTU(fjagepy):若后端是 fjagepy,会解析各 node 的 UNET 运行时地址
addr_by_node_id(用于 Tx 时填“发给谁”),以及 Datagram MTU;payload 会被裁剪到 MTU-24(24 字节给 DT header),避免超长包。 - TraceCollector 与 RX 线程:创建
TraceCollector;为每个节点起一个后台线程执行adapter.subscribe_rx(..., on_rx)。当网关收到消息时回调on_rx:解析 payload,若长度够且带 DT header、且run_id等于本次run_u32,则collector.record_rx(header, rx_time_ms),把之前record_tx里同 (run_u32, src, dst, seq) 的那条标为 received、填上 rx_time_ms。 - 按步循环(每一步):
- 用
strategy.decide(ctx)得到本步 active_nodes 与 quota; - 按场景 traffic 与周期算出本步要发的
tx_plan(src, dst, payload_bytes, flow_id); - 对 tx_plan 中每条:若 src 在 active 且未超 quota,则组包(pack_header + user_data),
adapter.send_frame(inst, to=addr, payload),然后collector.record_tx(...); - 本步结束前
collector.finalize()得到当前快照,用compute_window_metrics(snap, ...)得到到本步为止的 metrics,用于下一步的last_window_metrics; - 按
dt做 real-time sleep,进入下一步。
- 用
- 收尾:
rx_grace_s内再等一会(在途包到达),停止 RX 线程、关闭 adapter;traces = collector.finalize(),再一次性compute_window_metrics(traces, ...)得到完整 metrics;写出traces.csv(source=gateway)、metrics.csv、schedule_decisions.csv(循环里已攒的 schedule_rows)、report.md。
数据流关系(Mermaid):
mermaid
flowchart TB
subgraph input_gw [Input]
ScenarioFileGW[scenario_file]
CLIGW[CLI_params]
end
subgraph gateway_flow [Gateway Path]
LoadGW[load_scenario]
GetAdapter[get_adapter]
Connect[adapter.connect]
AddrMtu[addr_by_node_id_MTU]
Collector[TraceCollector]
RXThreads[subscribe_rx_threads]
Loop[per_step_loop]
StrategyDecide[strategy.decide]
TxPlan[tx_plan]
SendFrame[adapter.send_frame]
RecordTx[collector.record_tx]
RecordRx[on_rx_record_rx]
Finalize[collector.finalize]
MetricsGW[compute_window_metrics]
Grace[rx_grace_sleep]
end
subgraph output_gw [Output]
RunDirGW[runs_run_id]
TracesCSVGW[traces.csv]
MetricsCSVGW[metrics.csv]
ScheduleCSVGW[schedule_decisions.csv]
ReportMDGW[report.md]
end
ScenarioFileGW --> LoadGW
CLIGW --> LoadGW
LoadGW --> GetAdapter
GetAdapter --> Connect
Connect --> AddrMtu
AddrMtu --> Collector
AddrMtu --> RXThreads
Collector --> RXThreads
RXThreads --> RecordRx
RXThreads --> Loop
Loop --> StrategyDecide
StrategyDecide --> TxPlan
TxPlan --> SendFrame
SendFrame --> RecordTx
RecordRx --> Collector
Loop --> Finalize
Finalize --> MetricsGW
Loop --> Grace
Grace --> Finalize
MetricsGW --> TracesCSVGW
MetricsGW --> MetricsCSVGW
Finalize --> TracesCSVGW
Loop --> ScheduleCSVGW
MetricsGW --> ReportMDGW
RunDirGW --> TracesCSVGW
RunDirGW --> MetricsCSVGW
RunDirGW --> ScheduleCSVGW
RunDirGW --> ReportMDGW关键产物对应关系
| 产物 | Mock 模式来源 | Gateway 模式来源 |
|---|---|---|
traces.csv | _generate_mock_traces(...) 生成的 TraceRecord 列表 | TraceCollector.finalize():每步 record_tx + 回调里 record_rx 配对后的列表 |
metrics.csv | compute_window_metrics(traces, step_ms, steps) 按窗聚合 | 同上,最后用完整 traces 再算一遍 |
schedule_decisions.csv | _export_schedule_from_metrics(metrics):对每步指标调用策略 decide 得到 | 循环里每步 strategy.decide(ctx) 结果写入 schedule_rows,最后一次性写出 |
两种模式下,scenario.yaml、strategy.json、versions.json、report.md 的写法一致;versions.json 在 gateway 下会多出 mode、backend、run_u32、unet_instances 等字段。
想观察每步数据变化,可在命令中加 --debug,程序会在关键阶段向 stderr 打印摘要(场景、traces 条数、每步发送/接收数、最终 summary 等)。详见 验收与演示脚本 中的说明。