Skip to content

资源动态分配技术分析

概述

水声信道的带宽极为有限(典型值仅数 kbps),且信道质量随时间和空间变化显著。资源动态分配技术旨在根据实时信道状态调整各节点的发送机会与帧配额,在有限带宽约束下最大化网络利用率。本章从调度策略框架设计、两种策略实现、配额控制机制三个维度进行分析。

调度策略框架设计

核心抽象

系统采用策略模式(Strategy Pattern)定义调度框架,核心数据结构定义于 scheduler/base.py:1-33

StepContext — 调度上下文:

python
# src/unet_dt/scheduler/base.py:9-15
@dataclass
class StepContext:
    step_id: int                           # 当前步序号
    t_start_ms: int                        # 步起始时刻 (ms)
    t_end_ms: int                          # 步结束时刻 (ms)
    last_window_metrics: Dict[str, Any]    # 上一步的窗口指标
    queues: Dict[int, int]                 # 各节点队列深度 (预留)
    link_hints: Dict[str, Any]             # 链路提示信息 (预留)

ScheduleDecision — 调度决策:

python
# src/unet_dt/scheduler/base.py:19-23
@dataclass
class ScheduleDecision:
    step_id: int                           # 决策对应的步序号
    active_nodes: List[int]                # 本步允许发送的节点列表
    quota_frames: Dict[int, int]           # 各节点的帧配额
    meta: Dict[str, Any]                   # 策略元信息 (可扩展)

Strategy — 策略抽象基类:

python
# src/unet_dt/scheduler/base.py:26-32
class Strategy(ABC):
    id: str
    name: str

    @abstractmethod
    def decide(self, ctx: StepContext) -> ScheduleDecision:
        raise NotImplementedError

框架伪代码

算法: 调度策略框架
输入: strategy (策略实例), steps (总步数), dt (步长), flows (流量定义)
输出: 每步的调度决策序列

FOR step_id = 0 TO steps-1:
    metrics = 获取上一步的窗口指标
    ctx = StepContext(step_id, step_id*dt, (step_id+1)*dt, metrics, {}, {})
    decision = strategy.decide(ctx)

    FOR each flow IN flows:
        IF flow.src NOT IN decision.active_nodes THEN
            SKIP
        IF sent_count[flow.src] >= decision.quota_frames[flow.src] THEN
            SKIP  -- 配额耗尽
        执行帧发送
        sent_count[flow.src] += 1

此框架的关键设计决策:

  1. 策略与执行分离Strategy.decide() 仅产生决策,不直接操作网络;runner.py 负责执行
  2. 反馈闭环last_window_metrics 将上一步的实际指标反馈给策略,支持自适应行为
  3. 可扩展接口queueslink_hints 为未来的队列感知和链路自适应调度预留了接口

RoundRobin 策略

算法描述

RoundRobin(scheduler/round_robin.py:11-30)实现最简单的轮询调度:每步仅激活一个节点,按固定顺序循环。

算法: RoundRobin 调度
输入: node_ids (节点列表), step_id (当前步)
输出: ScheduleDecision

1. index = step_id MOD len(node_ids)
2. active = [node_ids[index]]
3. quota = {active[0]: 1}
4. RETURN ScheduleDecision(step_id, active, quota, {"strategy": "round_robin"})

实现代码

python
# src/unet_dt/scheduler/round_robin.py:18-30
def decide(self, ctx: StepContext) -> ScheduleDecision:
    if not self._node_ids:
        active = []
    else:
        active = [self._node_ids[ctx.step_id % len(self._node_ids)]]
    quota = {node_id: 1 for node_id in active}
    return ScheduleDecision(
        step_id=ctx.step_id,
        active_nodes=active,
        quota_frames=quota,
        meta={"strategy": self.id},
    )

特性分析

维度评价
公平性严格公平——每个节点获得完全相同的发送机会
信道适应性无——不考虑信道质量,即使某节点链路中断也照常调度
计算开销O(1)——仅一次取模运算
适用场景基线对比、信道质量均匀的稳态场景

AdaptiveQoS 策略

算法描述

AdaptiveQoS(scheduler/adaptive_qos.py:11-39)根据上一步的丢包率动态扩展活跃节点数量。

算法: AdaptiveQoS 自适应调度
输入: node_ids, step_id, last_window_metrics
输出: ScheduleDecision

1. loss_rate = last_window_metrics["loss_rate"] (默认 0.0)
2. IF loss_rate > 0.2 THEN
     max_active = 2    -- 高丢包时扩展为双节点并发
   ELSE
     max_active = 1    -- 正常情况单节点
3. start = step_id MOD len(node_ids)
4. active = []
5. FOR i = 0 TO max_active-1:
     active.append(node_ids[(start + i) MOD len(node_ids)])
6. quota = {node: 1 FOR node IN active}
7. RETURN ScheduleDecision(step_id, active, quota, {"strategy": "adaptive_qos", "loss_rate": loss_rate})

实现代码

python
# src/unet_dt/scheduler/adaptive_qos.py:18-39
def decide(self, ctx: StepContext) -> ScheduleDecision:
    if not self._node_ids:
        return ScheduleDecision(ctx.step_id, [], {}, {"strategy": self.id})

    loss_rate = float(ctx.last_window_metrics.get("loss_rate", 0.0))
    max_active = 2 if loss_rate > 0.2 else 1
    start = ctx.step_id % len(self._node_ids)
    active = []
    for i in range(max_active):
        active.append(self._node_ids[(start + i) % len(self._node_ids)])

    quota = {node_id: 1 for node_id in active}
    return ScheduleDecision(
        step_id=ctx.step_id,
        active_nodes=active,
        quota_frames=quota,
        meta={"strategy": self.id, "loss_rate": loss_rate},
    )

自适应机制分析

      loss_rate
         |
    1.0 -|
         |
    0.2 -|----------+
         |          | max_active = 2
         |          |
    0.0 -|----------+-----> max_active = 1
         +----------------------------> time
  • 阈值 0.2:当丢包率超过 20% 时,系统判断当前活跃节点可能处于恶劣信道,通过增加第二个节点分散风险
  • 单一维度反馈:策略仅使用 loss_rate 一个指标,不考虑延迟、吞吐量或能耗
  • 滞后性:决策基于上一步(甚至更早,参见 runner.py:766-769 中的 m[-2] 回溯)的指标,存在一步延迟

quota_frames 配额控制

配额机制在 runner.py:689-693 实现,限制每个节点在每步内的最大发送帧数:

python
# src/unet_dt/orchestrator/runner.py:689-693
q = quota.get(src)
if q is not None and sent_in_step[src] >= int(q):
    logger.debug("tx skip: quota reached src=%d (sent=%d, quota=%d)",
                 src, sent_in_step[src], int(q))
    continue

关键行为

  • 配额在每步开始时由策略决定,当前两种策略均设定 quota = 1
  • 超出配额的帧被静默跳过(logged at DEBUG level)
  • 配额在发送尝试时即消耗(runner.py:723),无论成功或失败,防止失败帧的无限重试
python
# src/unet_dt/orchestrator/runner.py:721-723
# Always consume quota regardless of send success/failure to
# prevent unlimited retries within a single step.
sent_in_step[src] += 1

指标反馈机制

策略决策依赖 compute_window_metrics()metrics/compute.py:20-68)产生的窗口指标:

指标字段含义计算方式
throughput_bps窗口内吞吐量(received_bytes * 8) / (step_ms / 1000)
loss_rate丢包率1 - (received / sent)
delay_p50_ms延迟中位数np.percentile(delays, 50)
delay_p95_ms延迟 95 分位np.percentile(delays, 95)
sent_count发送帧数窗口内 TX 记录总数
recv_count接收帧数窗口内 received=True 的记录数
sync_error_ms同步误差当前硬编码 0.0
overhead_ms调度开销当前硬编码 0.0

gateway 模式中,策略使用的 last_window_metrics 特意取倒数第二步的指标(runner.py:766-769),因为最新步的 RX 尚未完全到达:

python
# src/unet_dt/orchestrator/runner.py:766-769
if len(m) >= 2:
    last_window_metrics = m[-2]
else:
    last_window_metrics = current_step_metrics

局限性分析

L-1: 配额固定为 1 帧

两种策略的 quota_frames 均硬编码为 {node_id: 1},无法根据信道容量或业务需求动态调整。在高带宽窗口期浪费了发送机会。

StepContext 中的 queueslink_hints 字段在 runner.py 中始终传入空字典(runner.py:649-650),策略无法获取队列积压状态或链路质量提示。

L-3: 单一维度自适应

AdaptiveQoS 仅依据 loss_rate 一个指标做决策,未综合考虑延迟、吞吐量、能耗等多维 QoS 指标。阈值 0.2 为硬编码常量,缺乏自动调优机制。

L-4: overhead_ms 未真实计量

调度开销指标 overhead_ms 硬编码为 0(metrics/compute.py:65),无法量化策略决策和帧构造的实际耗时。

改进方向

短期:动态 quota 计算

算法: 动态 Quota 计算
输入: link_capacity_bps, frame_size_bytes, dt_seconds
输出: quota (帧/步)

available_bits = link_capacity_bps * dt_seconds
available_frames = available_bits / (frame_size_bytes * 8)
quota = max(1, floor(available_frames * utilization_target))

中期:多维 QoS 策略

引入加权效用函数综合多维指标:

算法: 多维 QoS 调度
输入: metrics (包含 loss_rate, delay_p95, throughput), weights (w_l, w_d, w_t)
输出: 调度决策

FOR each node n:
    utility(n) = w_t * normalized_throughput(n)
                - w_l * loss_rate(n)
                - w_d * normalized_delay(n)

active_nodes = TOP-K(nodes, by=utility, k=max_active)
quota(n) = proportional_to(utility(n))

长期:队列感知调度

  1. StepContext.queues 中填充各节点的发送队列深度
  2. 策略可根据队列积压动态调整优先级,避免"饥饿"节点
  3. 结合 link_hints(如 SNR 估计)实现跨层优化