Appearance
资源动态分配技术分析
概述
水声信道的带宽极为有限(典型值仅数 kbps),且信道质量随时间和空间变化显著。资源动态分配技术旨在根据实时信道状态调整各节点的发送机会与帧配额,在有限带宽约束下最大化网络利用率。本章从调度策略框架设计、两种策略实现、配额控制机制三个维度进行分析。
调度策略框架设计
核心抽象
系统采用策略模式(Strategy Pattern)定义调度框架,核心数据结构定义于 scheduler/base.py:1-33:
StepContext — 调度上下文:
python
# src/unet_dt/scheduler/base.py:9-15
@dataclass
class StepContext:
step_id: int # 当前步序号
t_start_ms: int # 步起始时刻 (ms)
t_end_ms: int # 步结束时刻 (ms)
last_window_metrics: Dict[str, Any] # 上一步的窗口指标
queues: Dict[int, int] # 各节点队列深度 (预留)
link_hints: Dict[str, Any] # 链路提示信息 (预留)ScheduleDecision — 调度决策:
python
# src/unet_dt/scheduler/base.py:19-23
@dataclass
class ScheduleDecision:
step_id: int # 决策对应的步序号
active_nodes: List[int] # 本步允许发送的节点列表
quota_frames: Dict[int, int] # 各节点的帧配额
meta: Dict[str, Any] # 策略元信息 (可扩展)Strategy — 策略抽象基类:
python
# src/unet_dt/scheduler/base.py:26-32
class Strategy(ABC):
id: str
name: str
@abstractmethod
def decide(self, ctx: StepContext) -> ScheduleDecision:
raise NotImplementedError框架伪代码
算法: 调度策略框架
输入: strategy (策略实例), steps (总步数), dt (步长), flows (流量定义)
输出: 每步的调度决策序列
FOR step_id = 0 TO steps-1:
metrics = 获取上一步的窗口指标
ctx = StepContext(step_id, step_id*dt, (step_id+1)*dt, metrics, {}, {})
decision = strategy.decide(ctx)
FOR each flow IN flows:
IF flow.src NOT IN decision.active_nodes THEN
SKIP
IF sent_count[flow.src] >= decision.quota_frames[flow.src] THEN
SKIP -- 配额耗尽
执行帧发送
sent_count[flow.src] += 1此框架的关键设计决策:
- 策略与执行分离:
Strategy.decide()仅产生决策,不直接操作网络;runner.py负责执行 - 反馈闭环:
last_window_metrics将上一步的实际指标反馈给策略,支持自适应行为 - 可扩展接口:
queues和link_hints为未来的队列感知和链路自适应调度预留了接口
RoundRobin 策略
算法描述
RoundRobin(scheduler/round_robin.py:11-30)实现最简单的轮询调度:每步仅激活一个节点,按固定顺序循环。
算法: RoundRobin 调度
输入: node_ids (节点列表), step_id (当前步)
输出: ScheduleDecision
1. index = step_id MOD len(node_ids)
2. active = [node_ids[index]]
3. quota = {active[0]: 1}
4. RETURN ScheduleDecision(step_id, active, quota, {"strategy": "round_robin"})实现代码
python
# src/unet_dt/scheduler/round_robin.py:18-30
def decide(self, ctx: StepContext) -> ScheduleDecision:
if not self._node_ids:
active = []
else:
active = [self._node_ids[ctx.step_id % len(self._node_ids)]]
quota = {node_id: 1 for node_id in active}
return ScheduleDecision(
step_id=ctx.step_id,
active_nodes=active,
quota_frames=quota,
meta={"strategy": self.id},
)特性分析
| 维度 | 评价 |
|---|---|
| 公平性 | 严格公平——每个节点获得完全相同的发送机会 |
| 信道适应性 | 无——不考虑信道质量,即使某节点链路中断也照常调度 |
| 计算开销 | O(1)——仅一次取模运算 |
| 适用场景 | 基线对比、信道质量均匀的稳态场景 |
AdaptiveQoS 策略
算法描述
AdaptiveQoS(scheduler/adaptive_qos.py:11-39)根据上一步的丢包率动态扩展活跃节点数量。
算法: AdaptiveQoS 自适应调度
输入: node_ids, step_id, last_window_metrics
输出: ScheduleDecision
1. loss_rate = last_window_metrics["loss_rate"] (默认 0.0)
2. IF loss_rate > 0.2 THEN
max_active = 2 -- 高丢包时扩展为双节点并发
ELSE
max_active = 1 -- 正常情况单节点
3. start = step_id MOD len(node_ids)
4. active = []
5. FOR i = 0 TO max_active-1:
active.append(node_ids[(start + i) MOD len(node_ids)])
6. quota = {node: 1 FOR node IN active}
7. RETURN ScheduleDecision(step_id, active, quota, {"strategy": "adaptive_qos", "loss_rate": loss_rate})实现代码
python
# src/unet_dt/scheduler/adaptive_qos.py:18-39
def decide(self, ctx: StepContext) -> ScheduleDecision:
if not self._node_ids:
return ScheduleDecision(ctx.step_id, [], {}, {"strategy": self.id})
loss_rate = float(ctx.last_window_metrics.get("loss_rate", 0.0))
max_active = 2 if loss_rate > 0.2 else 1
start = ctx.step_id % len(self._node_ids)
active = []
for i in range(max_active):
active.append(self._node_ids[(start + i) % len(self._node_ids)])
quota = {node_id: 1 for node_id in active}
return ScheduleDecision(
step_id=ctx.step_id,
active_nodes=active,
quota_frames=quota,
meta={"strategy": self.id, "loss_rate": loss_rate},
)自适应机制分析
loss_rate
|
1.0 -|
|
0.2 -|----------+
| | max_active = 2
| |
0.0 -|----------+-----> max_active = 1
+----------------------------> time- 阈值 0.2:当丢包率超过 20% 时,系统判断当前活跃节点可能处于恶劣信道,通过增加第二个节点分散风险
- 单一维度反馈:策略仅使用
loss_rate一个指标,不考虑延迟、吞吐量或能耗 - 滞后性:决策基于上一步(甚至更早,参见
runner.py:766-769中的m[-2]回溯)的指标,存在一步延迟
quota_frames 配额控制
配额机制在 runner.py:689-693 实现,限制每个节点在每步内的最大发送帧数:
python
# src/unet_dt/orchestrator/runner.py:689-693
q = quota.get(src)
if q is not None and sent_in_step[src] >= int(q):
logger.debug("tx skip: quota reached src=%d (sent=%d, quota=%d)",
src, sent_in_step[src], int(q))
continue关键行为:
- 配额在每步开始时由策略决定,当前两种策略均设定
quota = 1 - 超出配额的帧被静默跳过(logged at DEBUG level)
- 配额在发送尝试时即消耗(
runner.py:723),无论成功或失败,防止失败帧的无限重试
python
# src/unet_dt/orchestrator/runner.py:721-723
# Always consume quota regardless of send success/failure to
# prevent unlimited retries within a single step.
sent_in_step[src] += 1指标反馈机制
策略决策依赖 compute_window_metrics()(metrics/compute.py:20-68)产生的窗口指标:
| 指标字段 | 含义 | 计算方式 |
|---|---|---|
throughput_bps | 窗口内吞吐量 | (received_bytes * 8) / (step_ms / 1000) |
loss_rate | 丢包率 | 1 - (received / sent) |
delay_p50_ms | 延迟中位数 | np.percentile(delays, 50) |
delay_p95_ms | 延迟 95 分位 | np.percentile(delays, 95) |
sent_count | 发送帧数 | 窗口内 TX 记录总数 |
recv_count | 接收帧数 | 窗口内 received=True 的记录数 |
sync_error_ms | 同步误差 | 当前硬编码 0.0 |
overhead_ms | 调度开销 | 当前硬编码 0.0 |
gateway 模式中,策略使用的 last_window_metrics 特意取倒数第二步的指标(runner.py:766-769),因为最新步的 RX 尚未完全到达:
python
# src/unet_dt/orchestrator/runner.py:766-769
if len(m) >= 2:
last_window_metrics = m[-2]
else:
last_window_metrics = current_step_metrics局限性分析
L-1: 配额固定为 1 帧
两种策略的 quota_frames 均硬编码为 {node_id: 1},无法根据信道容量或业务需求动态调整。在高带宽窗口期浪费了发送机会。
L-2: queues / link_hints 未使用
StepContext 中的 queues 和 link_hints 字段在 runner.py 中始终传入空字典(runner.py:649-650),策略无法获取队列积压状态或链路质量提示。
L-3: 单一维度自适应
AdaptiveQoS 仅依据 loss_rate 一个指标做决策,未综合考虑延迟、吞吐量、能耗等多维 QoS 指标。阈值 0.2 为硬编码常量,缺乏自动调优机制。
L-4: overhead_ms 未真实计量
调度开销指标 overhead_ms 硬编码为 0(metrics/compute.py:65),无法量化策略决策和帧构造的实际耗时。
改进方向
短期:动态 quota 计算
算法: 动态 Quota 计算
输入: link_capacity_bps, frame_size_bytes, dt_seconds
输出: quota (帧/步)
available_bits = link_capacity_bps * dt_seconds
available_frames = available_bits / (frame_size_bytes * 8)
quota = max(1, floor(available_frames * utilization_target))中期:多维 QoS 策略
引入加权效用函数综合多维指标:
算法: 多维 QoS 调度
输入: metrics (包含 loss_rate, delay_p95, throughput), weights (w_l, w_d, w_t)
输出: 调度决策
FOR each node n:
utility(n) = w_t * normalized_throughput(n)
- w_l * loss_rate(n)
- w_d * normalized_delay(n)
active_nodes = TOP-K(nodes, by=utility, k=max_active)
quota(n) = proportional_to(utility(n))长期:队列感知调度
- 在
StepContext.queues中填充各节点的发送队列深度 - 策略可根据队列积压动态调整优先级,避免"饥饿"节点
- 结合 link_hints(如 SNR 估计)实现跨层优化