Skip to content

持久层

数据库选型

系统支持双数据库后端,通过环境变量切换:

环境数据库连接串示例
开发/本地SQLitesqlite+pysqlite:///./unet_dt.db
生产/部署MySQL 8.xmysql+pymysql://user:pass@host:3306/db?charset=utf8mb4

MySQL 连接池调优

MySQL 模式下额外配置:

  • pool_size=10:基础连接池大小
  • max_overflow=20:溢出连接上限
  • pool_recycle=3600:连接回收时间(防止 MySQL wait_timeout 断连)
  • pool_pre_ping=True:每次取连接前验活
  • connect_timeout=10:连接超时

数据表结构(6 表)

runs

主表,存储运行记录。

列名类型约束说明
idVARCHAR(128)PK运行 ID,格式 run-YYYYMMDD-HHMMSS-{hex8}
scenario_idVARCHAR(128)NOT NULL关联场景 ID
strategy_idVARCHAR(64)NOT NULL调度策略 ID
modeVARCHAR(32)NOT NULL, DEFAULT 'mock'运行模式(mock/gateway/logonly)
statusVARCHAR(32)NOT NULL, DEFAULT 'created'状态机状态
stepsINTEGERNULLABLE总步数
current_stepINTEGERNULLABLE, DEFAULT 0当前执行步
dt_secondsFLOATNULLABLE步间隔(秒)
seedINTEGERNULLABLE随机种子
created_atDATETIMENOT NULL创建时间
started_atDATETIMENULLABLE启动时间
finished_atDATETIMENULLABLE结束时间
output_dirVARCHAR(512)NULLABLE产物目录路径
error_messageTEXTNULLABLE错误信息

run_summary

运行摘要,一对一关联 runs。

列名类型约束说明
run_idVARCHAR(128)PK, FK→runs.id (CASCADE)运行 ID
avg_throughput_bpsFLOATNULLABLE平均吞吐量
avg_loss_rateFLOATNULLABLE平均丢包率
p95_delay_msFLOATNULLABLEP95 延迟
trace_countINTEGERNULLABLE总 trace 数
received_countINTEGERNULLABLE成功接收数
updated_atDATETIMENOT NULL最后更新时间

run_events

运行事件日志。

列名类型约束说明
idINTEGERPK, AUTO_INCREMENT自增主键
run_idVARCHAR(128)FK→runs.id (CASCADE), INDEX运行 ID
event_typeVARCHAR(64)NOT NULL事件类型(state/location_init_warning 等)
step_idINTEGERNULLABLE关联步序号
payload_jsonTEXTNULLABLEJSON 载荷
created_atDATETIMENOT NULL事件时间

scenarios

场景定义,支持 DB + YAML 双源。

列名类型约束说明
idVARCHAR(128)PK场景 ID
nameVARCHAR(256)NOT NULL场景名称
nodes_jsonTEXTNOT NULL, DEFAULT '[]'节点定义 JSON
traffic_jsonTEXTNOT NULL, DEFAULT '[]'流量定义 JSON
run_config_jsonTEXTNOT NULL, DEFAULT '{}'运行配置 JSON
unet_config_jsonTEXTNULLABLEUNET 配置 JSON
created_atDATETIMENOT NULL创建时间
updated_atDATETIMENOT NULL更新时间

run_traces

原始 trace 记录,运行结束后从 CSV 批量导入。

列名类型约束说明
idINTEGERPK, AUTO_INCREMENT自增主键
run_idVARCHAR(128)FK→runs.id (CASCADE), INDEX运行 ID
step_idINTEGERNOT NULL步序号
seqINTEGERNOT NULL帧序号
tx_time_msINTEGERNOT NULL发送时间(ms)
rx_time_msINTEGERNULLABLE接收时间(ms)
srcINTEGERNOT NULL源节点地址
dstINTEGERNOT NULL目标节点地址
payload_bytesINTEGERNOT NULL, DEFAULT 0载荷字节数
receivedBOOLEANNOT NULL, DEFAULT FALSE是否接收成功
sourceVARCHAR(32)NOT NULL, DEFAULT 'unknown'数据来源

索引:ix_run_traces_run_step_seq (run_id, step_id, seq)

run_metrics

聚合指标,每步一行,由 step_callback 实时写入。

列名类型约束说明
idINTEGERPK, AUTO_INCREMENT自增主键
run_idVARCHAR(128)FK→runs.id (CASCADE), INDEX运行 ID
step_idINTEGERNOT NULL步序号
t_window_start_msINTEGERNOT NULL, DEFAULT 0窗口起始时间
t_window_end_msINTEGERNOT NULL, DEFAULT 0窗口结束时间
throughput_bpsFLOATNOT NULL, DEFAULT 0.0吞吐量
loss_rateFLOATNOT NULL, DEFAULT 0.0丢包率
delay_p50_msFLOATNOT NULL, DEFAULT 0.0P50 延迟
delay_p95_msFLOATNOT NULL, DEFAULT 0.0P95 延迟
sent_countINTEGERNOT NULL, DEFAULT 0发送帧数
recv_countINTEGERNOT NULL, DEFAULT 0接收帧数
sync_error_msFLOATNOT NULL, DEFAULT 0.0同步误差
overhead_msFLOATNOT NULL, DEFAULT 0.0调度开销

索引:ix_run_metrics_run_step (run_id, step_id)

ER 关系图

┌──────────┐     1:1     ┌─────────────┐
│  runs    │────────────▶│ run_summary  │
│          │             └─────────────┘
│          │     1:N     ┌─────────────┐
│          │────────────▶│ run_events   │
│          │             └─────────────┘
│          │     1:N     ┌─────────────┐
│          │────────────▶│ run_traces   │
│          │             └─────────────┘
│          │     1:N     ┌─────────────┐
│          │────────────▶│ run_metrics  │
└──────────┘             └─────────────┘

┌──────────┐
│scenarios │  (独立表,通过 scenario_id 逻辑关联 runs)
└──────────┘

Repository 层

RunRepository (repo.py)

职责:runs / run_summary / run_events 三表操作

  • prepare_schema():自动建表 + 补列(via ensure_tables_with_columns
  • create_run() / get_run() / list_runs():基础 CRUD
  • transition_run_status():带状态机校验的状态转换
  • set_run_fields():通用字段更新
  • upsert_summary():INSERT or UPDATE 摘要
  • append_event() / list_events():事件日志
  • ping() / table_status():健康检查支持
  • 使用 Table 反射(autoload_with)适配可能存在的表结构差异

TraceRepository (trace_repo.py)

职责:run_traces / run_metrics 两表操作

  • bulk_insert_traces():批量写入 trace
  • bulk_insert_metrics():批量写入指标
  • has_traces() / has_metrics():判断是否有 DB 数据
  • get_traces() / get_metrics():分页查询
  • get_latest_metric():获取最新一条指标(供轮询端点使用)

ScenarioRepository (scenario_repo.py)

职责:scenarios 表 + YAML 文件同步

  • create_scenario() / get_scenario() / update_scenario() / delete_scenario()
  • list_scenarios_merged():合并 DB 和 YAML 两种来源
  • import_from_yaml() / export_to_yaml() / sync_to_yaml():双向同步

自动 Schema 迁移

db.py 中的 ensure_table_with_columns() 实现轻量级自动迁移:

  1. 表不存在 → CREATE TABLE
  2. 表存在但缺列 → ALTER TABLE ADD COLUMN
  3. MySQL 的 TEXT/BLOB 类型列添加为 NULL(不设 DEFAULT)
  4. 其他类型根据 SQLAlchemy model 的 default 值自动设置

DbTableMapping

RuntimeSettings 中的 DbTableMapping 支持自定义表名:

python
class DbTableMapping(BaseModel):
    runs: str = "runs"
    run_summary: str = "run_summary"
    run_events: str = "run_events"
    scenarios: str = "scenarios"
    run_traces: str = "run_traces"
    run_metrics: str = "run_metrics"

通过环境变量 UNET_DT_DB_TABLE_{RUNS|RUN_SUMMARY|...} 可分别覆盖每张表的物理表名。