Skip to content

内部模块接口文档 (Internal Interfaces)

本文档描述系统内部关键模块的接口契约,供开发者理解模块间边界和扩展点。


1. UnetAdapter 接口

定义src/unet_dt/unet_adapter/base.py

UnetAdapter 是与 UNet 水声通信仿真器交互的抽象适配层。所有具体实现(fjagepy、arlpy、logonly)均需实现此接口。

数据类型

python
@dataclass
class UnetInstance:
    name: str               # 节点名称 (如 "A", "B")
    host: str               # 节点主机地址
    port: int               # 节点端口
    meta: Dict[str, Any]    # 附加元数据(scenario 注入)

抽象方法

connect(inst: UnetInstance) -> None

建立与指定 UNet 节点的网关连接。

  • 参数inst — 目标节点实例信息
  • 行为:创建并缓存底层通信连接(如 fjagepy 的 Gateway 对象)
  • 异常:连接失败时抛出 ConnectionError 或具体实现的异常

close(inst: UnetInstance) -> None

关闭与指定节点的连接并释放资源。

  • 参数inst — 目标节点实例信息
  • 行为:清理连接资源,从缓存中移除

probe(inst: UnetInstance) -> Dict[str, Any]

探测节点状态,获取运行中的 Agent 和 Service 信息。

  • 参数inst — 目标节点实例信息
  • 返回值
    python
    {
        "ok": True,
        "agents": 5,              # Agent 数量
        "services": ["phy", ...], # 可用服务列表
        "detail": None             # 或错误详情字符串
    }
  • 用途:健康检查、服务发现

check_phy(inst: UnetInstance, probe: Optional[Dict] = None) -> Dict[str, Any]

检查节点的物理层 (PHY) 状态。

  • 参数
    • inst — 目标节点
    • probe — 可选的 probe 缓存结果(避免重复探测)
  • 返回值
    python
    {
        "phy_ok": True,     # PHY 是否可用
        "detail": None      # 或错误详情
    }

subscribe_rx(inst, on_msg, stop_event, timeout_s, ready_event) -> None

订阅节点的接收帧事件,阻塞式监听直到 stop_event 触发。

  • 参数
    • inst: UnetInstance — 监听节点
    • on_msg: Callable[[Dict], None] — 收到帧时的回调函数
    • stop_event: Optional[Any] — 停止事件信号(通常为 threading.Event
    • timeout_s: float — 单次等待超时(秒),默认 1.0
    • ready_event: Optional[Any] — 订阅就绪后触发的事件
  • 行为:在循环中等待接收帧,每收到一帧调用 on_msg(frame_dict),直到 stop_event 被设置
  • 回调参数格式
    python
    {
        "rx_time": 1234.5,         # 接收时间(仿真时钟)
        "from": 1,                 # 源节点地址
        "to": 2,                   # 目标节点地址
        "data": b"...",            # 原始载荷
        "protocol": 0,             # 协议号
    }

send_frame(inst, to, payload, meta) -> Dict[str, Any]

通过指定节点发送数据帧。

  • 参数
    • inst: UnetInstance — 发送节点
    • to: int — 目标节点地址
    • payload: bytes — 载荷数据
    • meta: Optional[Dict] — 可选发送元数据
  • 返回值
    python
    {
        "ok": True,
        "tx_time": 5000.0,    # 发送时间戳
        "detail": None
    }

get_time(inst: UnetInstance) -> float

获取节点当前的仿真时钟时间(毫秒)。

query_node_address(inst: UnetInstance) -> Optional[int]

查询节点的动态 UNet 地址(通过 NODE_INFO 服务)。

  • 返回值:节点地址(整数)或 None(不支持时)
  • 备注:此方法有默认实现(返回 None),子类可按需覆盖

已实现的适配器

实现模块说明
FjagepyAdapterunet_adapter/fjagepy_impl.py基于 fjagepy 库的生产级实现,连接真实 UNet 网关
ArlpyAdapterunet_adapter/arlpy_impl.py基于 arlpy 的模拟实现(用于本地开发)
LogOnlyAdapterunet_adapter/logonly_impl.py仅记录日志的 mock 实现(用于 CI/测试)

2. Strategy 接口

定义src/unet_dt/scheduler/base.py

Strategy 是调度策略的抽象基类,决定每个仿真步中哪些流量需要发送。

数据类型

python
@dataclass
class StepContext:
    step_id: int                    # 当前步序号
    dt_seconds: float               # 时间步长
    nodes: List[NodeInfo]           # 可用节点列表
    traffic_flows: List[TrafficFlow] # 流量配置
    metrics_history: List[MetricItem] # 历史指标
python
@dataclass
class ScheduleDecision:
    sends: List[SendAction]   # 本步需要执行的发送动作列表

抽象方法

decide(ctx: StepContext) -> ScheduleDecision

根据当前上下文决定本步的调度动作。

  • 参数ctx — 当前步的完整上下文信息
  • 返回值ScheduleDecision,包含需要执行的发送动作列表
  • 契约:实现必须是无副作用的纯函数(不修改 ctx,不依赖外部状态)

已实现的策略

策略 ID类名说明
round_robinRoundRobinStrategy简单轮询:按配置顺序依次发送所有流量
adaptive_qosAdaptiveQoSStrategy自适应 QoS:根据历史指标动态调整发送优先级和频率

3. TraceCollector 接口

定义:集成在 src/unet_dt/persistence/trace_repo.py

TraceCollector 负责收集和持久化仿真过程中的通信追踪记录。

主要方法

insert_trace(record: TraceRecord) -> None

写入单条追踪记录到数据库。

insert_traces_batch(records: List[TraceRecord]) -> None

批量写入追踪记录(性能优化路径)。

get_traces(run_id: str, limit: int, offset: int) -> Tuple[List[TraceRecord], int]

分页查询追踪记录,返回 (记录列表, 总数)。

TraceRecord 字段

字段类型说明
run_idstr所属运行 ID
step_idint步序号
seqint序列号
tx_time_msfloat发送时间
srcint源节点
dstint目标节点
rx_time_msfloat?接收时间
payload_bytesint载荷大小
receivedbool是否成功接收

4. MetricsCompute 接口

定义:集成在 src/unet_dt/orchestrator/runner.py 中的指标计算逻辑

MetricsCompute 负责在每个仿真步结束后计算性能指标。

计算指标

指标计算方式
throughput_bps窗口内成功接收的总字节数 / 窗口时长
loss_rate1 - (接收数 / 发送数)
delay_p50_ms窗口内延迟的第 50 百分位
delay_p95_ms窗口内延迟的第 95 百分位
sync_error_ms发送时刻的期望值与实际值之差
overhead_ms调度+发送+同步的总开销

输出

每步计算完成后生成 MetricItem 对象:

  • 写入 CSV 文件(runs/{run_id}/metrics.csv
  • 通过 EventPublisher 推送 WebSocket 事件
  • RunStatusResponse.latest_metrics 查询

5. EventPublisher 接口

定义src/unet_dt/service/execution_worker.py 中的 Protocol

python
class EventPublisher(Protocol):
    def publish_from_sync(
        self,
        *,
        event_type: str,
        run_id: str,
        payload: Optional[Dict[str, Any]] = None,
        ts_ms: Optional[int] = None,
        timeout: float = 3.0,
    ) -> Any: ...

用于从同步执行线程向异步 WebSocket 广播器发送事件。由 RunEventBroadcaster 实现。


6. RunRepository 接口

定义src/unet_dt/persistence/repo.py

管理 dt_runsdt_run_summarydt_run_events 三张表的 CRUD 操作。

主要方法

方法说明
create_run(record)创建运行记录
get_run(run_id)获取运行记录(不存在抛 KeyError)
list_runs(limit, offset)分页查询运行列表
update_run(run_id, **fields)更新运行字段
get_summary(run_id)获取运行摘要
upsert_summary(record)插入或更新运行摘要
insert_event(record)插入运行事件
get_events(run_id, limit, offset)分页查询事件