Appearance
内部模块接口文档 (Internal Interfaces)
本文档描述系统内部关键模块的接口契约,供开发者理解模块间边界和扩展点。
1. UnetAdapter 接口
定义:src/unet_dt/unet_adapter/base.py
UnetAdapter 是与 UNet 水声通信仿真器交互的抽象适配层。所有具体实现(fjagepy、arlpy、logonly)均需实现此接口。
数据类型
python
@dataclass
class UnetInstance:
name: str # 节点名称 (如 "A", "B")
host: str # 节点主机地址
port: int # 节点端口
meta: Dict[str, Any] # 附加元数据(scenario 注入)抽象方法
connect(inst: UnetInstance) -> None
建立与指定 UNet 节点的网关连接。
- 参数:
inst— 目标节点实例信息 - 行为:创建并缓存底层通信连接(如 fjagepy 的 Gateway 对象)
- 异常:连接失败时抛出
ConnectionError或具体实现的异常
close(inst: UnetInstance) -> None
关闭与指定节点的连接并释放资源。
- 参数:
inst— 目标节点实例信息 - 行为:清理连接资源,从缓存中移除
probe(inst: UnetInstance) -> Dict[str, Any]
探测节点状态,获取运行中的 Agent 和 Service 信息。
- 参数:
inst— 目标节点实例信息 - 返回值:python
{ "ok": True, "agents": 5, # Agent 数量 "services": ["phy", ...], # 可用服务列表 "detail": None # 或错误详情字符串 } - 用途:健康检查、服务发现
check_phy(inst: UnetInstance, probe: Optional[Dict] = None) -> Dict[str, Any]
检查节点的物理层 (PHY) 状态。
- 参数:
inst— 目标节点probe— 可选的 probe 缓存结果(避免重复探测)
- 返回值:python
{ "phy_ok": True, # PHY 是否可用 "detail": None # 或错误详情 }
subscribe_rx(inst, on_msg, stop_event, timeout_s, ready_event) -> None
订阅节点的接收帧事件,阻塞式监听直到 stop_event 触发。
- 参数:
inst: UnetInstance— 监听节点on_msg: Callable[[Dict], None]— 收到帧时的回调函数stop_event: Optional[Any]— 停止事件信号(通常为threading.Event)timeout_s: float— 单次等待超时(秒),默认 1.0ready_event: Optional[Any]— 订阅就绪后触发的事件
- 行为:在循环中等待接收帧,每收到一帧调用
on_msg(frame_dict),直到stop_event被设置 - 回调参数格式:python
{ "rx_time": 1234.5, # 接收时间(仿真时钟) "from": 1, # 源节点地址 "to": 2, # 目标节点地址 "data": b"...", # 原始载荷 "protocol": 0, # 协议号 }
send_frame(inst, to, payload, meta) -> Dict[str, Any]
通过指定节点发送数据帧。
- 参数:
inst: UnetInstance— 发送节点to: int— 目标节点地址payload: bytes— 载荷数据meta: Optional[Dict]— 可选发送元数据
- 返回值:python
{ "ok": True, "tx_time": 5000.0, # 发送时间戳 "detail": None }
get_time(inst: UnetInstance) -> float
获取节点当前的仿真时钟时间(毫秒)。
query_node_address(inst: UnetInstance) -> Optional[int]
查询节点的动态 UNet 地址(通过 NODE_INFO 服务)。
- 返回值:节点地址(整数)或
None(不支持时) - 备注:此方法有默认实现(返回 None),子类可按需覆盖
已实现的适配器
| 实现 | 模块 | 说明 |
|---|---|---|
FjagepyAdapter | unet_adapter/fjagepy_impl.py | 基于 fjagepy 库的生产级实现,连接真实 UNet 网关 |
ArlpyAdapter | unet_adapter/arlpy_impl.py | 基于 arlpy 的模拟实现(用于本地开发) |
LogOnlyAdapter | unet_adapter/logonly_impl.py | 仅记录日志的 mock 实现(用于 CI/测试) |
2. Strategy 接口
定义:src/unet_dt/scheduler/base.py
Strategy 是调度策略的抽象基类,决定每个仿真步中哪些流量需要发送。
数据类型
python
@dataclass
class StepContext:
step_id: int # 当前步序号
dt_seconds: float # 时间步长
nodes: List[NodeInfo] # 可用节点列表
traffic_flows: List[TrafficFlow] # 流量配置
metrics_history: List[MetricItem] # 历史指标python
@dataclass
class ScheduleDecision:
sends: List[SendAction] # 本步需要执行的发送动作列表抽象方法
decide(ctx: StepContext) -> ScheduleDecision
根据当前上下文决定本步的调度动作。
- 参数:
ctx— 当前步的完整上下文信息 - 返回值:
ScheduleDecision,包含需要执行的发送动作列表 - 契约:实现必须是无副作用的纯函数(不修改 ctx,不依赖外部状态)
已实现的策略
| 策略 ID | 类名 | 说明 |
|---|---|---|
round_robin | RoundRobinStrategy | 简单轮询:按配置顺序依次发送所有流量 |
adaptive_qos | AdaptiveQoSStrategy | 自适应 QoS:根据历史指标动态调整发送优先级和频率 |
3. TraceCollector 接口
定义:集成在 src/unet_dt/persistence/trace_repo.py
TraceCollector 负责收集和持久化仿真过程中的通信追踪记录。
主要方法
insert_trace(record: TraceRecord) -> None
写入单条追踪记录到数据库。
insert_traces_batch(records: List[TraceRecord]) -> None
批量写入追踪记录(性能优化路径)。
get_traces(run_id: str, limit: int, offset: int) -> Tuple[List[TraceRecord], int]
分页查询追踪记录,返回 (记录列表, 总数)。
TraceRecord 字段
| 字段 | 类型 | 说明 |
|---|---|---|
run_id | str | 所属运行 ID |
step_id | int | 步序号 |
seq | int | 序列号 |
tx_time_ms | float | 发送时间 |
src | int | 源节点 |
dst | int | 目标节点 |
rx_time_ms | float? | 接收时间 |
payload_bytes | int | 载荷大小 |
received | bool | 是否成功接收 |
4. MetricsCompute 接口
定义:集成在 src/unet_dt/orchestrator/runner.py 中的指标计算逻辑
MetricsCompute 负责在每个仿真步结束后计算性能指标。
计算指标
| 指标 | 计算方式 |
|---|---|
throughput_bps | 窗口内成功接收的总字节数 / 窗口时长 |
loss_rate | 1 - (接收数 / 发送数) |
delay_p50_ms | 窗口内延迟的第 50 百分位 |
delay_p95_ms | 窗口内延迟的第 95 百分位 |
sync_error_ms | 发送时刻的期望值与实际值之差 |
overhead_ms | 调度+发送+同步的总开销 |
输出
每步计算完成后生成 MetricItem 对象:
- 写入 CSV 文件(
runs/{run_id}/metrics.csv) - 通过
EventPublisher推送 WebSocket 事件 - 供
RunStatusResponse.latest_metrics查询
5. EventPublisher 接口
定义:src/unet_dt/service/execution_worker.py 中的 Protocol
python
class EventPublisher(Protocol):
def publish_from_sync(
self,
*,
event_type: str,
run_id: str,
payload: Optional[Dict[str, Any]] = None,
ts_ms: Optional[int] = None,
timeout: float = 3.0,
) -> Any: ...用于从同步执行线程向异步 WebSocket 广播器发送事件。由 RunEventBroadcaster 实现。
6. RunRepository 接口
定义:src/unet_dt/persistence/repo.py
管理 dt_runs、dt_run_summary、dt_run_events 三张表的 CRUD 操作。
主要方法
| 方法 | 说明 |
|---|---|
create_run(record) | 创建运行记录 |
get_run(run_id) | 获取运行记录(不存在抛 KeyError) |
list_runs(limit, offset) | 分页查询运行列表 |
update_run(run_id, **fields) | 更新运行字段 |
get_summary(run_id) | 获取运行摘要 |
upsert_summary(record) | 插入或更新运行摘要 |
insert_event(record) | 插入运行事件 |
get_events(run_id, limit, offset) | 分页查询事件 |