面向读者:初级工程师 / AGV 与多机器人方向学习者
文章目标:理解 openTCS 的核心架构思想,而不是停留在“会跑 Demo”层面


文章目录

一、为什么很多 AGV 项目"看起来能跑",但实际上离工业系统很远?

1.1 学生项目的典型做法

很多学生项目里的 AGV 仿真,大概都是这样的:

  • A* 算路
  • 小车沿路径移动
  • 两辆车碰撞就停一下
  • 简单避障

这种系统可以做演示,但离真正的工业 AGV 调度系统还差非常远。

1.2 差距到底在哪?

真正的工业场景里,核心问题其实不是:

“车会不会走”

而是:

  • 多车如何协同
  • 如何避免死锁
  • 如何避免路口拥堵
  • 如何统一调度
  • 如何动态重规划
  • 如何处理车辆掉线
  • 如何做资源占用控制
  • 如何支持不同品牌 AGV

1.3 一个具体的例子

假设你有一个仓库,10 台 AGV 同时运行:

学生项目的做法:
  10 台车各自 A* 算路
  各自沿路径走
  碰到了就停

会发生什么?
  - 两台车在走廊相向而行 → 谁也过不去 → 死锁
  - 三台车同时到达十字路口 → 互相等待 → 死锁
  - 一台车故障停在路中间 → 后面的车全部堵住
  - 所有车都选最短路 → 主干道拥堵,旁边的路空着

这些问题,单靠"路径规划算法"是解决不了的。

1.4 工业系统的思路

工业系统的思路完全不同:

不是"让每台车自己想办法"
而是"有一个中央大脑统一管理所有车"

不是"碰到了再处理"
而是"进入之前就预约资源"

不是"只算最短路"
而是"综合考虑拥堵、电量、任务优先级"

不是"车自己避障"
而是"系统保证不会出现需要避障的情况"

这就是 openTCS 的设计哲学。

1.5 一个类比

把仓库想象成一个城市:

学生项目 工业系统
没有交通规则 完整的交通法规
没有红绿灯 每个路口都有信号控制
碰到了再让 提前规划不会相遇
每辆车自己导航 交通指挥中心统一调度
出了事故再处理 从设计上避免事故

openTCS 就是这个"城市交通管理系统"。


二、什么是 openTCS?

2.1 基本定义

openTCS(open Transportation Control System)是由德国 Fraunhofer IML 推出的开源 AGV 调度系统。

它本质上是:

一个"中央交通管制系统"

2.2 它负责什么?

职责 说明
订单分配 决定哪台车执行哪个任务
路径规划 计算车辆从起点到终点的路线
资源占用 管理路径上每个节点和边的占用状态
多车协调 确保多台车不会互相阻塞
车辆状态管理 跟踪每台车的位置、状态、电量
交通控制 类似红绿灯,控制车辆的通行权
车辆通信 通过统一接口与不同品牌的车通信

2.3 它不是什么?

这一点非常重要,很多人搞混:

它不是 说明
SLAM 系统 它不做地图构建和定位
底盘控制器 它不控制电机和轮子
视觉导航算法 它不处理传感器数据
机器人操作系统 它不是 ROS

它的定位是:

车队调度层(Fleet Management)

在整个系统中的位置:

ERP/MES(企业管理层)
    ↓ "把货从 A 搬到 B"
openTCS(车队调度层)
    ↓ "AGV-3 走这条路去执行"
AGV 控制层(底盘 + 导航)
    ↓ "控制电机,沿路径移动"
物理层(轮子转动)

2.4 为什么调度层是最关键的?

因为:

  • 底盘控制:各厂商都能做好(成熟技术)
  • 导航定位:SLAM 技术已经很成熟
  • 单车路径规划:A* 就够了

但是:

让 10 台、50 台、100 台车在同一个空间里高效协同,不堵车、不死锁、不碰撞——这才是真正的难题。

这就是 openTCS 要解决的问题。

2.5 openTCS 的历史和定位

  • 由德国 Fraunhofer IML(物流研究所)开发
  • 2000 年代开始,已有 15+ 年历史
  • 开源(MIT License)
  • Java 实现
  • 在欧洲工业界广泛使用
  • 是很多商业 AGV 调度系统的参考架构

它代表的是:

欧洲工业界对 AGV 调度问题的标准解法。


三、openTCS 的整体架构

3.1 核心架构图

                +----------------+
                |   ERP / MES    |
                +----------------+
                         |
                         v
                +----------------+
                |     Kernel     |
                |----------------|
                | Dispatcher     |
                | Router         |
                | Scheduler      |
                +----------------+
                   /    |     \
                  /     |      \
                 v      v       v
          +--------+ +--------+ +--------+
          | AGV-1  | | AGV-2  | | AGV-3  |
          +--------+ +--------+ +--------+

3.2 Kernel:中央大脑

Kernel 是整个系统的核心。所有决策都在这里做出。

它内部最重要的几个模块:

模块 作用 类比
Dispatcher 订单分配 出租车调度中心
Router 路径规划 导航软件
Scheduler 资源锁与路径占用 红绿灯系统
Vehicle Driver 车辆通信 对讲机

3.3 为什么 Scheduler 最重要?

很多初学者只关注 Router(路径规划)。

但实际上:

工业系统最复杂的部分通常是 Scheduler。

原因很简单:

"算出路径"是一个数学问题 → 有成熟算法(A*、Dijkstra)
"保证几十台车不互相堵死"是一个工程问题 → 没有银弹

路径规划是"一次性计算",而资源调度是"持续性管理"。

一个类比:

给你一张地图,算出从家到公司的最短路——这很简单。
但是让 1000 辆车同时在这张地图上跑,不堵车——这才是真正的挑战。

3.4 模块之间的协作

一个完整的任务执行流程:

1. ERP 下发订单:"把货从工位 A 搬到工位 B"
2. Dispatcher 选择车辆:AGV-3 最合适(最近、电量够、空闲)
3. Router 计算路径:P1 → P2 → P3 → P4 → P5
4. Scheduler 检查资源:P2 被 AGV-1 占用,等待
5. Scheduler 批准:P2 释放了,AGV-3 可以进入
6. Vehicle Driver 下发指令:告诉 AGV-3 移动到 P2
7. AGV-3 执行:物理移动
8. 重复 4-7 直到到达目的地

注意第 4 步:

路径算出来了,但不能立即执行。必须等资源可用。

这就是 Scheduler 的价值。

3.5 Kernel 的运行模式

openTCS 的 Kernel 有两种运行模式:

模式 用途 说明
Modelling 建模 编辑地图、配置车辆,不执行调度
Operating 运行 实际调度车辆,执行任务

这种分离很重要:

你不会在飞机飞行时修改飞行控制软件。同样,你不应该在系统运行时修改地图。


四、中央调度架构:为什么工业 AGV 更偏向 Centralized?

4.1 两种调度架构

多机器人系统的调度架构,大体分为两种:

中央调度(Centralized):

所有决策由一个中央节点做出
车辆只负责执行

分布式调度(Decentralized):

每台车自己做决策
车辆之间互相协商

openTCS 选择了中央调度。这不是偶然的,而是工业场景的必然选择。

4.2 为什么工业场景偏向中央调度?

原因 1:全局最优 vs 局部最优

中央调度:
  系统知道所有车的位置、任务、路径
  → 可以做出全局最优决策
  → 例如:让 AGV-3 绕远路,避免和 AGV-1 冲突
       虽然 AGV-3 多走了 10 米,但系统整体效率更高

分布式调度:
  每台车只知道自己的信息和邻居的信息
  → 只能做局部最优决策
  → 例如:AGV-3 选了最短路,结果和 AGV-1 堵在一起
       两台车都延迟了 30 秒

工业场景追求的是:

系统整体吞吐量最大化,而不是单台车的路径最短。

原因 2:确定性

工业系统最重要的特性之一是:

确定性(Determinism)

即:

给定相同的输入,系统必须产生相同的输出。

中央调度天然具有确定性:

中央调度:
  所有决策由一个节点做出
  决策逻辑是确定的
  → 行为可预测、可复现、可审计

分布式调度:
  多个节点并发决策
  决策依赖于消息到达顺序
  → 行为可能不确定
  → 同样的场景,跑两次可能结果不同

在工业环境中,“不确定"意味着"不可靠”。

原因 3:死锁处理

死锁是多车系统最棘手的问题。

中央调度处理死锁:
  系统知道所有车的资源占用情况
  → 可以提前检测死锁
  → 可以拒绝可能导致死锁的资源请求
  → 可以全局重新规划

分布式调度处理死锁:
  每台车只知道自己占了什么、想要什么
  → 死锁检测需要全局信息(但分布式系统没有)
  → 通常只能"检测到死锁后恢复"
  → 恢复过程复杂且耗时

原因 4:实现复杂度

中央调度:
  逻辑集中在一个地方
  调试方便(所有日志在一个节点)
  状态一致性容易保证

分布式调度:
  逻辑分散在每台车上
  调试困难(需要收集所有车的日志)
  状态一致性是分布式系统的经典难题

原因 5:工业场景的特点

工业场景有一些特殊性,使得中央调度更合适:

特点 为什么适合中央调度
车辆数量有限(10-100台) 中央节点计算量可控
环境相对封闭 网络稳定,通信可靠
路网固定 不需要实时发现新路径
安全要求高 需要确定性和可审计性
7×24 运行 需要稳定性而非灵活性

4.3 中央调度的缺点

当然,中央调度也有缺点:

缺点 说明 工业系统如何应对
单点故障 中央节点挂了,全部停 主备切换、热备份
扩展性有限 车辆太多时计算量大 区域划分、分层调度
通信依赖 网络断了车就失控 车辆本地有安全停车逻辑
响应延迟 决策需要经过中央节点 毫秒级通信,可接受

4.4 什么时候分布式更好?

分布式调度更适合:

  • 大规模(1000+ 台机器人)
  • 开放环境(城市道路、户外)
  • 网络不可靠
  • 机器人高度自治
  • 不需要严格的全局最优

例如:

  • 自动驾驶汽车(城市道路)→ 分布式
  • 仓库 AGV(封闭环境)→ 中央调度
  • 无人机集群(户外)→ 分布式
  • 医院配送机器人(建筑内)→ 中央调度

4.5 openTCS 的选择

openTCS 明确选择了中央调度,因为它的目标场景是:

封闭环境中的 10-100 台 AGV 协同作业。

在这个场景下,中央调度的优势远大于劣势。

4.6 一个直观的对比

场景:10 台 AGV,一个十字路口

中央调度:
  系统提前知道 10 台车的路径
  发现 3 台车会在 10 秒后同时到达路口
  提前安排:AGV-1 先过,AGV-5 减速,AGV-8 绕路
  结果:没有任何车需要停下等待

分布式调度:
  每台车只知道自己的路径
  3 台车同时到达路口
  开始协商:"你先走还是我先走?"
  协商需要时间(消息来回)
  协商期间 3 台车都停着
  结果:浪费了 5 秒的等待时间

5 秒看起来不多,但如果每天有 1000 次这样的情况:

5 秒 × 1000 次 × 3 台车 = 4.2 小时的车辆空闲时间

这在工业场景中是不可接受的。


五、Dispatcher:车辆是怎么被分配任务的?

5.1 Dispatcher 的核心职责

Dispatcher 的核心职责是:

把"运输订单"分配给合适的车辆。

看起来简单,但实际上这是一个多目标优化问题。

5.2 一个具体的例子

当前状态:
  AGV-1:在充电桩,电量 30%
  AGV-2:在 P5,空闲,电量 80%
  AGV-3:在 P8,正在执行任务,电量 60%
  AGV-4:在 P2,空闲,电量 95%
  AGV-5:故障,不可用

新任务:
  从工位 A(在 P3 附近)取货,送到工位 B(在 P10 附近)

问题:派哪台车?

简单的"最近优先"会选 AGV-4(在 P2,离 P3 最近)。

但这可能不是最优选择:

考虑因素:
  AGV-4 虽然最近,但它去 P3 的路径经过一个拥堵区域
  AGV-2 虽然远一点,但路径畅通
  
  AGV-4 去 P3 需要 30 秒(因为拥堵)
  AGV-2 去 P3 需要 25 秒(路径畅通)

  → 实际上 AGV-2 更快到达

5.3 Dispatcher 需要考虑的因素

因素 说明 权重
距离 车辆到任务起点的距离
路径拥堵 路径上是否有其他车
电量 是否有足够电量完成任务 必须满足
车辆状态 是否空闲、是否可用 必须满足
车辆能力 是否能搬运该类型货物 必须满足
载重限制 货物重量是否超过车辆载重 必须满足
当前任务 如果正在执行任务,什么时候能空闲
充电策略 完成任务后是否需要充电
公平性 避免某台车过度使用

5.4 Dispatcher 的工作流程(详细版)

Step 1: 新订单进入系统
  订单内容:从 Location-A 取货,送到 Location-B
  订单优先级:普通

Step 2: Dispatcher 被触发
  触发条件:新订单到达

Step 3: 筛选候选车辆
  过滤条件:
    - 状态 = IDLE 或即将空闲
    - 电量 > 任务所需最低电量
    - 能力匹配(能搬运该类型货物)
    - 不在维护状态
  
  结果:AGV-2, AGV-4 是候选

Step 4: 为每个候选车辆计算路径
  AGV-2: P5 → P4 → P3 → Location-A → P3 → P6 → P9 → P10 → Location-B
  AGV-4: P2 → P3 → Location-A → P3 → P6 → P9 → P10 → Location-B

Step 5: 评估代价
  AGV-2 总代价:
    路径长度:120m
    预计时间:60s
    拥堵因子:1.0(路径畅通)
    综合代价:60

  AGV-4 总代价:
    路径长度:80m
    预计时间:40s
    拥堵因子:1.5(P2→P3 段有其他车)
    综合代价:60

Step 6: 选择最优车辆
  代价相同,选择电量更高的:AGV-4(95%)

Step 7: 生成 DriveOrder
  DriveOrder = {
    vehicle: AGV-4,
    route: [P2, P3, Location-A, P3, P6, P9, P10, Location-B],
    actions: [MOVE, MOVE, LOAD, MOVE, MOVE, MOVE, MOVE, UNLOAD]
  }

Step 8: 下发给 Vehicle Driver
  通过通信接口发送给 AGV-4

5.5 事件驱动的调度

openTCS 的 Dispatcher 是事件驱动的,不是轮询的。

什么意思?

轮询方式:
  每隔 1 秒检查一次:"有没有新任务?有没有空闲车辆?"
  问题:1 秒内可能错过最佳时机

事件驱动方式:
  当以下事件发生时,立即触发调度:
  - 新订单到达
  - 某台车变为空闲
  - 某台车故障
  - 路网拓扑变化(某条路被封)
  - 某个任务被取消

事件驱动的好处:

好处 说明
响应快 事件发生后立即处理
资源省 没有事件时不消耗 CPU
逻辑清晰 每种事件有明确的处理逻辑
可扩展 新增事件类型很容易

5.6 Dispatcher 的高级策略

策略 1:预分配

普通做法:
  车辆空闲 → 等待新任务 → 分配 → 开始移动

预分配:
  车辆即将完成当前任务
  → 提前分配下一个任务
  → 当前任务完成后立即开始下一个
  → 减少空闲等待时间

策略 2:任务合并

任务 1:从 A 取货送到 B
任务 2:从 A 取货送到 C

如果 B 和 C 在同一方向:
  → 合并为一个任务:从 A 取两件货,先送 B 再送 C
  → 减少一次空跑

策略 3:充电调度

AGV-4 电量 25%
当前有一个任务需要 20% 电量

简单做法:
  电量够 → 执行

智能做法:
  执行完后电量只剩 5%
  → 必须立即充电
  → 充电桩可能被占用
  → 可能需要等待
  
  不如现在先充电到 50%
  → 然后执行任务
  → 执行完还有 30%
  → 可以继续接下一个任务

5.7 Dispatcher 的可扩展性

openTCS 的 Dispatcher 是可替换的。

你可以实现自己的 Dispatcher,只要遵循接口:

public interface Dispatcher {
    void dispatch();                    // 触发一次调度
    void withdrawOrder(Vehicle vehicle); // 撤回某车的任务
}

这意味着:

你可以用自己的算法替换默认的调度逻辑。

例如:

  • 默认:最近优先
  • 你的实现:考虑拥堵的加权最短路
  • 高级实现:基于强化学习的调度

六、车辆状态机(Vehicle State Machine)

6.1 为什么需要状态机?

工业 AGV 系统里:

"车的位置"远远不够。

系统必须知道车辆的完整状态,才能做出正确的调度决策。

一个反面例子:

系统只知道 AGV-3 在 P5 位置
系统派 AGV-3 去执行任务

但实际上 AGV-3 正在充电(不能移动)
或者 AGV-3 已经故障(不能响应)
或者 AGV-3 正在被人工操作(不应该干预)

结果:指令发出去了,车没动,系统以为车在执行,实际上什么都没发生
→ 任务超时
→ 系统混乱

6.2 openTCS 的车辆状态

openTCS 维护的车辆状态包含多个维度:

维度 1:集成状态(Integration State)

TO_BE_UTILIZED    → 车辆已注册,准备使用
TO_BE_RESPECTED   → 车辆存在但不接受任务(只需要避让它)
TO_BE_IGNORED     → 车辆完全忽略(维护中)

维度 2:运行状态(Processing State)

IDLE              → 空闲,可以接受任务
PROCESSING_ORDER  → 正在执行任务
AWAITING_ORDER    → 等待新任务(已预分配但未开始)

维度 3:能量状态

电量百分比
是否在充电
是否需要充电(低于阈值)

维度 4:位置状态

当前所在 Point
当前所在 Path(如果在移动中)
朝向角度

维度 5:错误状态

无错误
警告(可继续运行)
错误(需要人工干预)
致命错误(立即停车)

6.3 状态转换图

                    ┌──────────────┐
                    │  UNAVAILABLE │
                    └──────┬───────┘
                           │ 上线
                           v
┌──────────┐         ┌──────────┐
│ CHARGING │ ←────── │   IDLE   │ ←──────────────┐
└──────┬───┘  电量低  └────┬─────┘                │
       │                   │ 分配任务              │ 任务完成
       │ 充满              v                      │
       │             ┌──────────┐                 │
       └──────────→  │EXECUTING │ ────────────────┘
                     └────┬─────┘
                          │ 等待资源
                          v
                     ┌──────────┐
                     │ WAITING  │
                     └────┬─────┘
                          │ 资源可用
                          v
                     ┌──────────┐
                     │EXECUTING │
                     └──────────┘

任何状态都可能转入:
                     ┌──────────┐
                     │  ERROR   │
                     └──────────┘

6.4 状态机对调度的影响

状态 是否允许接单 是否占用资源 是否需要避让
IDLE 是(当前位置)
EXECUTING 是(路径上所有资源)
WAITING 是(当前位置)
CHARGING 通常否 是(充电桩)
ERROR 是(当前位置)
UNAVAILABLE

注意"是否占用资源"这一列:

即使车辆故障了,它仍然占用当前位置的资源。

因为它物理上还在那里,其他车不能通过。

6.5 状态机的工程价值

状态机不只是一个理论概念,它在工程中有非常具体的价值:

价值 1:防止非法操作

没有状态机:
  系统给一台正在充电的车发送移动指令
  → 车辆可能强制中断充电
  → 电池损伤
  → 或者车辆忽略指令,系统以为在执行

有状态机:
  系统检查状态:CHARGING
  → 拒绝分配任务
  → 或者先发送"停止充电"指令,等状态变为 IDLE,再分配

价值 2:故障恢复

AGV-3 突然断联(通信中断)

没有状态机:
  系统不知道 AGV-3 怎么了
  继续给它发指令
  任务一直显示"执行中"
  其他车可能和它碰撞

有状态机:
  心跳超时 → 状态自动转为 UNAVAILABLE
  → 该车的任务标记为失败
  → 重新分配给其他车
  → 该车占用的资源标记为"不确定"
  → 其他车绕开该区域

价值 3:调度决策的基础

Dispatcher 做决策时:
  不是简单地问"哪台车最近"
  而是问"哪台车状态为 IDLE 且电量充足且能力匹配"

这些判断全部依赖状态机提供的信息。

价值 4:系统可观测性

运维人员看监控大屏:
  AGV-1: IDLE (P3, 85%)
  AGV-2: EXECUTING (P5→P8, 60%)
  AGV-3: ERROR (P7, 通信超时)
  AGV-4: CHARGING (充电桩-2, 45%)

一眼就能看出系统状态。
如果没有状态机,运维人员只能看到一堆坐标,无法快速判断问题。

6.6 状态机的实现要点

public class VehicleState {
    private IntegrationState integrationState;
    private ProcessingState processingState;
    private Point currentPosition;
    private double energyLevel;
    private List<LoadHandlingDevice> loadHandlingDevices;
    private ErrorState errorState;
    
    // 状态转换必须是原子的
    public synchronized void transitionTo(ProcessingState newState) {
        // 检查转换是否合法
        if (!isValidTransition(this.processingState, newState)) {
            throw new IllegalStateTransitionException(
                this.processingState + " → " + newState);
        }
        this.processingState = newState;
        notifyListeners();  // 通知所有监听者(Dispatcher、UI 等)
    }
    
    private boolean isValidTransition(ProcessingState from, ProcessingState to) {
        // 定义合法的状态转换
        switch (from) {
            case IDLE:
                return to == PROCESSING_ORDER || to == CHARGING;
            case PROCESSING_ORDER:
                return to == IDLE || to == WAITING || to == ERROR;
            case WAITING:
                return to == PROCESSING_ORDER || to == ERROR;
            case CHARGING:
                return to == IDLE || to == ERROR;
            case ERROR:
                return to == IDLE || to == UNAVAILABLE;
            default:
                return false;
        }
    }
}

关键设计点:

  • 原子性:状态转换必须是原子操作,不能出现中间状态
  • 合法性检查:不允许非法的状态跳转
  • 事件通知:状态变化时通知所有相关模块

七、Router:路径规划模块

7.1 Router 的职责

Router 的职责是:

计算车辆从起点到终点的可行路径。

但它不是简单的"最短路算法"。

7.2 openTCS 的地图模型

openTCS 使用的是图模型(Graph Topology),不是栅格地图。

核心对象:

对象 含义 属性
Point 节点 位置坐标、类型(停车点/路径点)
Path 长度、最大速度、方向、锁定状态
Location 工位 关联的 Point、可执行的操作
Block 区域 一组互斥的资源
Vehicle 车辆 当前位置、状态、能力

一个具体的例子:

仓库地图:

    P1 ----Path12---- P2 ----Path23---- P3
    |                  |                  |
  Path14            Path25            Path36
    |                  |                  |
    P4 ----Path45---- P5 ----Path56---- P6
    |                  |                  |
  Path47            Path58            Path69
    |                  |                  |
    P7 ----Path78---- P8 ----Path89---- P9

工位:
  Location-A 关联 P1(取货点)
  Location-B 关联 P9(放货点)
  Location-C 关联 P7(充电桩)

7.3 为什么用图模型而不是栅格地图?

维度 图模型 栅格地图
资源控制 每条边、每个节点都是明确的资源,可以加锁 锁哪个格子?锁多大范围?不明确
计算效率 图搜索(几十到几百个节点) 栅格搜索(几万到几十万个格子)
语义清晰 每个节点有明确含义(工位、路口、等待点) 格子没有语义
容易维护 修改路网只需要增删节点和边 修改地图需要重新绘制
路径确定性 车辆只能沿边移动,路径完全确定 车辆可能在格子间任意移动
交通规则 容易定义单向路、禁行区 难以表达复杂规则

图模型的核心优势是:

把连续的物理空间离散化为有限的、可管理的资源。

这使得资源锁成为可能。

7.4 Point 的类型

openTCS 中的 Point 有不同类型:

类型 含义 用途
HALT_POSITION 停车点 车辆可以在此停留
PARK_POSITION 停泊点 车辆空闲时停放的位置
REPORT_POSITION 报告点 车辆经过时上报位置

为什么要区分?

如果所有点都能停车:
  一台车可能停在路中间
  → 阻塞后面的车

只有 HALT_POSITION 能停车:
  车辆只能在指定位置停留
  → 不会阻塞通道
  → 系统可以精确管理每个停车位

这是一个非常重要的设计思想:

不是"哪里都能停",而是"只有指定位置能停"。

就像现实中的停车场:

你不能把车停在马路中间,只能停在车位里。

7.5 Path 的属性

每条 Path(边)有丰富的属性:

属性 含义 影响
length 长度 影响路径代价
maxVelocity 最大速度 影响通行时间
maxReverseVelocity 最大倒车速度 0 表示不允许倒车
locked 是否锁定 锁定的路径不可通行
routingCostForward 正向路由代价 影响路径选择
routingCostReverse 反向路由代价 影响路径选择

注意 routingCostForwardroutingCostReverse 可以不同:

场景:一条上坡路

正向(上坡):代价 = 10(慢、费电)
反向(下坡):代价 = 3(快、省电)

这样 Router 会倾向于让车辆下坡走这条路,上坡走另一条路。

7.6 Router 的代价模型

openTCS 的 Router 不是简单的"最短路",而是"最小代价路"。

代价可以包含:

总代价 = 路径长度代价
       + 转弯代价
       + 拥堵代价
       + 单向路绕行代价
       + 车辆类型特定代价

路径长度代价

最基本的代价:路径越长,代价越高。

转弯代价

每次转弯需要减速 → 转弯 → 加速
这比直行慢
因此转弯有额外代价

效果:Router 倾向于选择转弯少的路径

拥堵代价

某条路径上已经有其他车辆
→ 增加该路径的代价
→ Router 倾向于选择空闲路径

这不是"禁止通行",而是"不太想走这条路"
如果绕路代价更大,还是会选择拥堵路径

车辆类型特定代价

大型叉车 AGV:不能走窄通道
小型搬运 AGV:可以走窄通道

对于大型叉车:窄通道代价 = ∞(不可通行)
对于小型 AGV:窄通道代价 = 正常

7.7 Router 的实现

openTCS 默认使用 Dijkstra/A* 的变体:

public class ShortestPathRouter implements Router {
    
    public List<Route.Step> getRoute(Vehicle vehicle, Point source, Point destination) {
        // 1. 构建代价图(考虑车辆类型)
        CostGraph costGraph = buildCostGraph(vehicle);
        
        // 2. 运行最短路算法
        List<Point> path = dijkstra(costGraph, source, destination);
        
        // 3. 转换为 Route.Step 序列
        return convertToSteps(path);
    }
    
    private CostGraph buildCostGraph(Vehicle vehicle) {
        CostGraph graph = new CostGraph();
        
        for (Path path : allPaths) {
            // 检查车辆是否能通过这条路径
            if (!vehicle.canTraverse(path)) {
                continue;  // 跳过不可通行的路径
            }
            
            // 计算代价
            double cost = path.getLength();
            cost += path.getRoutingCostForward();
            cost *= getCongestionFactor(path);  // 拥堵因子
            
            graph.addEdge(path.getSource(), path.getDestination(), cost);
        }
        
        return graph;
    }
}

7.8 动态路由

Router 不是一次性计算的。当以下情况发生时,需要重新计算:

触发条件 说明
路径被锁定 某条路因为维护被封
拥堵变化 某条路变得拥堵
车辆故障 某台车停在路中间
新的资源约束 Scheduler 锁定了某些资源
场景:
  AGV-3 的原始路径:P1 → P2 → P3 → P4
  
  AGV-3 走到 P2 时,发现 P3 被 AGV-7 占用(故障停车)
  
  Router 重新计算:P2 → P5 → P6 → P4(绕路)
  
  AGV-3 按新路径继续执行

这就是"动态重规划"。


八、Scheduler:真正的核心

8.1 为什么 Scheduler 是最重要的?

再强调一次:

路径规划解决的是"怎么走"的问题。
Scheduler 解决的是"能不能走"的问题。

一个类比:

你用导航软件算出了一条路线(Router 的工作)
但是路上有红绿灯(Scheduler 的工作)

导航告诉你"直走 200 米右转"
但红灯亮了,你必须等

Scheduler 就是那个控制红绿灯的系统

8.2 资源的定义

在 openTCS 中,以下都是"资源":

资源类型 说明 容量
Point 地图上的节点 通常 1(一次只能一台车)
Path 地图上的边 通常 1(单车道)
Block 一组互斥的资源 1(整个区域一次一台车)
Location 工位 1(一次只能一台车操作)

8.3 资源锁的工作原理

AGV-3 想从 P2 移动到 P3

Step 1: AGV-3 向 Scheduler 申请资源
  请求:[Path23, P3]

Step 2: Scheduler 检查
  Path23 是否被占用?→ 否
  P3 是否被占用?→ 否

Step 3: Scheduler 批准
  将 Path23 标记为"AGV-3 占用"
  将 P3 标记为"AGV-3 占用"

Step 4: AGV-3 开始移动

Step 5: AGV-3 到达 P3
  释放 Path23
  释放 P2(已经离开)
  保持 P3 的占用

8.4 为什么不能只锁当前位置?

一个常见的错误想法:

“只锁车辆当前所在的位置不就行了?”

不行。原因:

场景:
  AGV-3 在 P2,要去 P3
  AGV-7 在 P4,要去 P2

如果只锁当前位置:
  t=0: AGV-3 在 P2(锁 P2),AGV-7 在 P4(锁 P4)
  t=1: AGV-3 开始移动,释放 P2
  t=1: AGV-7 看到 P2 空了,开始移动向 P2
  t=2: AGV-3 还在 Path23 上(还没到 P3)
  t=2: AGV-7 到达 P2
  
  问题:如果 Path23 和 Path42 有交叉点,两车可能碰撞!

正确做法:

AGV-3 移动前,锁定整个路径段:[P2, Path23, P3]
直到 AGV-3 完全到达 P3,才释放 P2 和 Path23

这样 AGV-7 在 AGV-3 移动期间无法进入 Path23 或 P2

8.5 提前锁定(Look-Ahead)

实际上,openTCS 不只锁当前段,还会提前锁定后续几段:

AGV-3 的完整路径:P1 → P2 → P3 → P4 → P5

提前锁定策略(look-ahead = 2):
  当 AGV-3 在 P1 时:
    已锁定:[P1, Path12, P2, Path23, P3]
    
  当 AGV-3 到达 P2 时:
    释放:P1, Path12
    新锁定:Path34, P4
    当前锁定:[P2, Path23, P3, Path34, P4]

为什么要提前锁定?

如果不提前锁定:
  AGV-3 到达 P3 后,才去申请 P4
  但 P4 可能已经被其他车占用了
  → AGV-3 必须在 P3 等待
  → 而 P3 可能是一个关键路口
  → AGV-3 等在路口,阻塞了其他车

如果提前锁定:
  AGV-3 还在 P1 时就知道 P4 是否可用
  如果 P4 不可用,可以提前重新规划路径
  → 不需要停在路口等待

8.6 Block:区域互斥

有些场景下,不只是单个 Point 或 Path 需要互斥,而是整个区域:

场景:一个窄走廊,有 3 个节点

    P5 ---- P6 ---- P7

这个走廊一次只能有一台车通过(太窄了,无法会车)

如果只锁单个节点:
  AGV-1 从 P5 向 P7 移动,锁了 P5, Path56, P6
  AGV-2 从 P7 向 P5 移动,锁了 P7, Path67, P6... 等等,P6 被锁了!
  
  但如果 AGV-1 锁了 P5→P6,AGV-2 锁了 P7→P6:
  AGV-1 在 P6,AGV-2 在 P7
  AGV-1 想去 P7(被 AGV-2 占)
  AGV-2 想去 P6(被 AGV-1 占)
  → 死锁!

Block 的解决方案:

定义 Block:NarrowCorridor = {P5, Path56, P6, Path67, P7}
Block 容量 = 1

规则:一次只有一台车能进入这个 Block

AGV-1 想进入走廊:
  申请 Block NarrowCorridor
  → 批准(没有其他车在里面)
  → AGV-1 进入,从 P5 走到 P7

AGV-2 想进入走廊:
  申请 Block NarrowCorridor
  → 拒绝(AGV-1 在里面)
  → AGV-2 等待

AGV-1 离开走廊(到达 P7 之后的下一个点):
  释放 Block NarrowCorridor
  → AGV-2 获得批准
  → AGV-2 进入走廊

Block 的本质是:

把多个资源打包成一个"大资源",实现区域级别的互斥。

8.7 Scheduler 的完整工作流程

AGV-3 要执行路径:P1 → P2 → P3 → P4 → P5

Scheduler 的工作:

1. 收到 AGV-3 的路径
2. 计算需要的资源序列:
   [P1, Path12, P2, Path23, P3, Path34, P4, Path45, P5]
3. 检查 Block:路径是否经过任何 Block?
   是 → 需要额外申请 Block
4. 根据 look-ahead 策略,先申请前 N 段资源
5. 如果资源可用 → 批准,AGV-3 开始移动
6. 如果资源不可用 → AGV-3 等待
7. AGV-3 每到达一个新节点:
   - 释放已经离开的资源
   - 申请下一段资源
8. 如果下一段资源不可用:
   - AGV-3 在当前 HALT_POSITION 等待
   - 或者触发重新规划

8.8 资源锁 vs 简单避障

再次强调这个区别,因为它是理解工业系统的关键:

维度 简单避障(学生项目) 资源锁(工业系统)
时机 碰到了再处理 进入之前就预约
信息 只知道眼前的障碍 知道所有车的计划
结果 可能死锁 从设计上避免死锁
效率 频繁停车等待 提前规划,减少等待
确定性 不确定(取决于运气) 确定(有明确的规则)
安全性 依赖传感器(可能失灵) 逻辑保证(不依赖传感器)

一个关键洞察:

资源锁是"逻辑层面"的安全保证。
即使传感器全部失灵,只要资源锁正确工作,车辆就不会碰撞。
因为系统从逻辑上保证了"两台车不会同时出现在同一个位置"。

当然,实际系统中传感器避障仍然存在,作为"最后一道防线"。但它不应该是主要的防碰撞手段。


九、死锁问题:多车系统最棘手的挑战

9.1 什么是死锁?

经典死锁:
  AGV-1 占用 A,等待 B
  AGV-2 占用 B,等待 A
  → 两者互相等待,谁也动不了

更复杂的死锁:

环形死锁:
  AGV-1 占用 A,等待 B
  AGV-2 占用 B,等待 C
  AGV-3 占用 C,等待 A
  → 三者形成环,谁也动不了

实际场景中的死锁:

场景:单车道走廊

  AGV-1 →→→→→→→→→ 方向
  ←←←←←←←←← AGV-2

  两台车在走廊中相向而行
  走廊太窄,无法会车
  → 死锁

9.2 死锁的四个必要条件

操作系统理论中的经典知识,完全适用于 AGV 系统:

条件 在 AGV 系统中的含义
互斥 一个位置一次只能有一台车
持有并等待 车辆占着当前位置,等待下一个位置
不可抢占 不能强制把一台车"搬走"
循环等待 多台车形成等待环

四个条件同时满足 → 死锁发生。

9.3 openTCS 如何预防死锁?

openTCS 主要通过以下策略降低死锁概率:

策略 1:路网设计(破坏循环等待)

最有效的方法:从路网设计上避免死锁

方法 1:单向路网
  所有路径都是单向的
  → 不可能出现两车相向而行
  → 从根本上消除了最常见的死锁场景

  P1 → P2 → P3
  ↑              ↓
  P6 ← P5 ← P4

方法 2:环形路网
  所有车辆沿同一方向行驶
  → 不会出现对向冲突

方法 3:设置等待区
  在关键路口旁边设置等待点
  → 车辆可以"让路"
  → 避免堵在路口

策略 2:资源有序分配(破坏循环等待)

给所有资源编号:P1=1, P2=2, P3=3, ...

规则:车辆只能按编号递增的顺序申请资源

AGV-1 在 P2,想去 P5:
  申请顺序:P3, P4, P5(递增)✓

AGV-2 在 P5,想去 P2:
  按规则应该:P4, P3, P2(递减)✗ 违反规则!
  实际做法:走另一条路(编号递增的路径)

这样就不可能出现循环等待。

但这种方法限制了路径选择,可能导致路径不是最优的。

策略 3:Block 区域互斥(破坏持有并等待)

把可能死锁的区域定义为 Block
一次只允许一台车进入

效果:
  车辆要么完全进入 Block(不会和其他车冲突)
  要么完全不进入(在外面等待)
  → 不会出现"两台车都在 Block 里互相堵"的情况

策略 4:超时检测 + 恢复

如果预防措施失败(设计不完美),还有最后一道防线:

检测:
  如果一台车等待超过 T 秒(例如 30 秒)
  → 可能发生了死锁

恢复:
  方案 1:让其中一台车后退
  方案 2:让其中一台车绕路
  方案 3:人工干预

9.4 死锁 vs 活锁

还有一种更隐蔽的问题:活锁(Livelock)

死锁:两台车都不动
活锁:两台车都在动,但都到不了目的地

例如:
  AGV-1 和 AGV-2 在走廊相遇
  AGV-1 让路,移到旁边
  AGV-2 也让路,移到同一边
  AGV-1 又让路,移到另一边
  AGV-2 也移到另一边
  → 两台车不停地互相让路,但谁也过不去

就像两个人在走廊里面对面,互相让路但总是让到同一边。

解决活锁的方法:

中央调度的优势:
  系统统一决定"谁让谁"
  不会出现两台车自己协商导致的活锁

具体做法:
  Scheduler 明确指定:AGV-1 等待,AGV-2 先过
  不存在"互相让"的情况

9.5 实际工程中的死锁处理

在实际工程中,死锁处理通常是多层次的:

第一层:路网设计(消除 90% 的死锁可能)
  - 单向路
  - 环形路
  - 足够的等待区

第二层:Scheduler 规则(消除 9% 的死锁可能)
  - Block 互斥
  - 有序资源分配
  - Look-ahead 检测

第三层:超时恢复(处理剩余 1%)
  - 检测等待超时
  - 自动恢复策略
  - 人工干预通知

十、Vehicle Driver:插件化通信接口

10.1 为什么需要 Driver 抽象?

现实中 AGV 品牌很多,每家的通信协议都不同:

品牌 通信方式 协议
MiR REST API HTTP/JSON
海康机器人 私有协议 TCP
快仓 私有协议 TCP
极智嘉 REST API HTTP/JSON
仙工 私有协议 TCP/MQTT
自研 AGV 自定义 各种

如果调度系统直接和每种 AGV 通信:

Kernel 代码里:
  if (brand == "MiR") {
      sendHttpRequest(...);
} else if (brand == "海康") {
sendTcpPacket(...);
} else if (brand == "快仓") {
sendCustomProtocol(...);
} else if ...

问题:

* 每增加一种 AGV,就要修改 Kernel 代码
* Kernel 变得越来越臃肿
* 测试困难(需要所有品牌的 AGV 才能测试)
* 耦合严重(一个品牌的协议变了,可能影响其他品牌)

10.2 openTCS 的解决方案:Driver 接口

openTCS 的设计思想是:

Kernel 永远不关心底层协议。它只和一个统一的接口交互。

Kernel
   ↓ (统一接口)
Vehicle Driver Interface
   ↓ (不同实现)
┌─────────────┬─────────────┬─────────────┐
│ MiR Driver  │ 海康 Driver │ 自研 Driver │
└─────────────┴─────────────┴─────────────┘
   ↓               ↓              ↓
┌─────────────┬─────────────┬─────────────┐
│  MiR AGV    │  海康 AGV   │  自研 AGV   │
└─────────────┴─────────────┴─────────────┘

10.3 Driver 接口的定义

openTCS 的 Vehicle Driver(官方称为 Communication Adapter)需要实现以下核心功能:

public interface VehicleCommAdapter {
    
    // === 生命周期 ===
    void enable();    // 启用适配器,建立与 AGV 的连接
    void disable();   // 禁用适配器,断开连接
    
    // === 指令下发 ===
    void sendCommand(MovementCommand command);
    // MovementCommand 包含:
    //   - 目标 Point
    //   - 经过的 Path
    //   - 到达后执行的操作(取货/放货/充电)
    //   - 最大速度
    //   - 方向(前进/后退)
    
    // === 状态上报 ===
    // 适配器需要主动上报以下信息:
    //   - 车辆当前位置(哪个 Point)
    //   - 车辆状态(空闲/执行中/错误)
    //   - 电量
    //   - 载荷状态(有货/无货)
    //   - 错误信息
}

10.4 一个具体的 Driver 实现示例

假设你有一台自研 AGV,通过 TCP 通信:

public class MyAgvCommAdapter implements VehicleCommAdapter {
    
    private TcpClient tcpClient;
    private String agvIp;
    private int agvPort;
    
    @Override
    public void enable() {
        // 建立 TCP 连接
        tcpClient = new TcpClient(agvIp, agvPort);
        tcpClient.connect();
        
        // 启动状态轮询线程
        startStatusPolling();
    }
    
    @Override
    public void sendCommand(MovementCommand command) {
        // 将 openTCS 的指令转换为 AGV 能理解的格式
        byte[] packet = encodeCommand(command);
        tcpClient.send(packet);
    }
    
    private byte[] encodeCommand(MovementCommand command) {
        // 自研 AGV 的协议格式:
        // [Header][CommandType][TargetX][TargetY][Speed][Action]
        ByteBuffer buffer = ByteBuffer.allocate(20);
        buffer.put((byte) 0xAA);           // Header
        buffer.put((byte) 0x01);           // CommandType: MOVE
        buffer.putFloat(command.getTargetPoint().getX());
        buffer.putFloat(command.getTargetPoint().getY());
        buffer.putFloat(command.getMaxSpeed());
        buffer.put(encodeAction(command.getOperation()));
        return buffer.array();
    }
    
    private void startStatusPolling() {
        // 每 200ms 查询一次 AGV 状态
        scheduler.scheduleAtFixedRate(() -> {
            byte[] response = tcpClient.sendAndReceive(STATUS_QUERY);
            AgvStatus status = decodeStatus(response);
            
            // 上报给 Kernel
            if (status.positionChanged()) {
                reportPosition(status.getCurrentPoint());
            }
            if (status.stateChanged()) {
                reportState(status.getState());
            }
            reportEnergyLevel(status.getBattery());
            
        }, 0, 200, TimeUnit.MILLISECONDS);
    }
}

10.5 Driver 的职责边界

Driver 的职责非常明确:

Driver 负责 Driver 不负责
与 AGV 通信 路径规划
协议转换 任务分配
状态上报 资源管理
指令下发 冲突检测
连接管理 调度决策

这种清晰的边界意味着:

开发一个新的 Driver:
  只需要了解 AGV 的通信协议
  不需要了解调度算法
  不需要了解资源管理
  不需要了解其他 AGV 的存在

开发调度算法:
  只需要了解 Driver 接口
  不需要了解具体 AGV 的协议
  不需要了解 TCP/HTTP/MQTT 的细节

10.6 插件化的工程价值

价值 1:独立开发和测试

团队 A:开发调度算法(只需要 Mock Driver)
团队 B:开发 MiR Driver(只需要 Driver 接口文档)
团队 C:开发海康 Driver(只需要 Driver 接口文档)

三个团队可以并行工作,互不依赖。

价值 2:热插拔

系统运行中:
  - 新增一种 AGV → 加载新 Driver,不需要重启系统
  - 某种 AGV 下线 → 卸载 Driver,不影响其他 AGV

价值 3:仿真和实际无缝切换

开发阶段:使用 SimulationDriver(模拟 AGV 行为)
测试阶段:使用 LoopbackDriver(回环测试)
部署阶段:使用 RealAgvDriver(真实 AGV)

切换只需要更换 Driver 配置,Kernel 代码完全不变。

这一点对比赛特别有用:

你可以先用仿真 Driver 开发和调试整个系统,最后再接入真实硬件。

价值 4:异构车队管理

同一个 Kernel 可以同时管理:
  - 3 台 MiR(使用 MiR Driver)
  - 5 台自研 AGV(使用自研 Driver)
  - 2 台叉车(使用叉车 Driver)

Kernel 不关心它们是什么品牌
只关心它们的状态和能力

10.7 Driver 与 Kernel 的交互时序

时间线:

t=0:  Kernel 决定让 AGV-3 从 P2 移动到 P3
t=1:  Kernel 调用 driver.sendCommand(MoveTo P3)
t=2:  Driver 将指令转换为 AGV 协议,发送给 AGV
t=3:  AGV 开始移动
t=4:  AGV 到达 P3
t=5:  AGV 上报位置(通过 AGV 自己的协议)
t=6:  Driver 收到位置上报
t=7:  Driver 调用 reportPosition(P3) 通知 Kernel
t=8:  Kernel 更新 AGV-3 的位置为 P3
t=9:  Kernel 释放 P2 的资源锁
t=10: Kernel 为 AGV-3 申请下一段资源

注意:

Kernel 不是通过"猜测"来知道 AGV 到达了。
而是通过 Driver 的明确上报。
这保证了系统状态和物理状态的一致性。


十一、VDA5050:AGV 行业的统一通信标准

11.1 为什么需要标准?

在 VDA5050 出现之前,AGV 行业的状况是:

每个 AGV 厂商都有自己的:
  - 通信协议
  - 数据格式
  - 状态定义
  - 指令集
  - 错误码

结果:
  - 每接入一种新 AGV,都要从头开发 Driver
  - 不同品牌的 AGV 无法混合部署
  - 更换 AGV 品牌意味着重写整个通信层
  - 企业被厂商锁定

这就像:

如果每个手机品牌都用不同的充电接口,你换手机就要换充电器。
USB-C 统一了充电接口。
VDA5050 统一了 AGV 通信接口。

11.2 VDA5050 是什么?

VDA5050 是由德国汽车工业协会(VDA)和德国机械设备制造业联合会(VDMA)联合推出的 AGV/AMR 通信标准。

核心特点:

特点 说明
基于 MQTT 使用 MQTT 消息队列通信
JSON 格式 数据用 JSON 编码,人类可读
双向通信 调度系统下发指令,AGV 上报状态
标准化状态 统一的状态定义和错误码
路径模型 基于 Node + Edge 的路径描述

11.3 VDA5050 的通信架构

Fleet Manager(调度系统)
        ↓↑
    MQTT Broker
        ↓↑
    AGV(车辆)

Topic 结构:
  下发指令:uagv/v2/{manufacturer}/{serialNumber}/order
  上报状态:uagv/v2/{manufacturer}/{serialNumber}/state
  可视化:  uagv/v2/{manufacturer}/{serialNumber}/visualization
  连接状态:uagv/v2/{manufacturer}/{serialNumber}/connection

11.4 VDA5050 的核心消息

Order(订单/指令)

调度系统下发给 AGV 的路径和动作:

{
  "headerId": 1,
  "timestamp": "2024-01-15T10:30:00Z",
  "version": "2.0.0",
  "manufacturer": "MyCompany",
  "serialNumber": "AGV-001",
  "orderId": "order_001",
  "orderUpdateId": 0,
  "nodes": [
    {
      "nodeId": "P1",
      "sequenceId": 0,
      "released": true,
      "nodePosition": {"x": 10.0, "y": 5.0, "mapId": "floor1"},
      "actions": []
    },
    {
      "nodeId": "P2",
      "sequenceId": 2,
      "released": true,
      "nodePosition": {"x": 15.0, "y": 5.0, "mapId": "floor1"},
      "actions": [
        {
          "actionType": "pick",
          "actionId": "action_001",
          "blockingType": "HARD"
        }
      ]
    },
    {
      "nodeId": "P3",
      "sequenceId": 4,
      "released": false,
      "nodePosition": {"x": 20.0, "y": 5.0, "mapId": "floor1"},
      "actions": []
    }
  ],
  "edges": [
    {
      "edgeId": "E1-2",
      "sequenceId": 1,
      "released": true,
      "startNodeId": "P1",
      "endNodeId": "P2",
      "maxSpeed": 1.5
    },
    {
      "edgeId": "E2-3",
      "sequenceId": 3,
      "released": false,
      "startNodeId": "P2",
      "endNodeId": "P3",
      "maxSpeed": 1.0
    }
  ]
}

注意 released 字段:

released = true:AGV 可以立即执行这段路径
released = false:AGV 不能执行,需要等待后续指令释放

这就是 VDA5050 版本的"资源锁"!

调度系统通过控制 released 字段来实现交通管制:
  - 先释放前几段路径
  - AGV 执行
  - 确认安全后,再释放后续路径

State(状态上报)

AGV 定期上报给调度系统的状态:

{
  "headerId": 42,
  "timestamp": "2024-01-15T10:30:05Z",
  "orderId": "order_001",
  "lastNodeId": "P1",
  "lastNodeSequenceId": 0,
  "driving": true,
  "paused": false,
  "nodeStates": [...],
  "edgeStates": [...],
  "agvPosition": {
    "x": 12.5,
    "y": 5.0,
    "theta": 0.0,
    "mapId": "floor1",
    "positionInitialized": true
  },
  "velocity": {
    "vx": 1.2,
    "vy": 0.0,
    "omega": 0.0
  },
  "batteryState": {
    "batteryCharge": 75.5,
    "charging": false
  },
  "operatingMode": "AUTOMATIC",
  "errors": [],
  "loads": [
    {
      "loadId": "pallet_001",
      "loadType": "EURO_PALLET",
      "weight": 500
    }
  ],
  "safetyState": {
    "eStop": "NONE",
    "fieldViolation": false
  }
}

11.5 VDA5050 的关键设计思想

思想 1:Base + Horizon

Order 中的路径分为两部分:

Base(基础段):released = true
  AGV 必须执行的路径
  调度系统保证这段路径是安全的(资源已锁定)

Horizon(预览段):released = false
  AGV 知道后续路径,但不能执行
  用于提前减速、规划等
  调度系统确认安全后,会发送 orderUpdate 释放

好处:
  - AGV 可以提前知道后续路径(平滑运动)
  - 但不会提前进入未授权的区域(安全)
  - 调度系统可以随时修改 Horizon(灵活)

这和 openTCS 的 look-ahead 思想完全一致。

思想 2:增量更新

不需要每次都发送完整的 Order
可以通过 orderUpdateId 增量更新:
  - 释放更多节点(把 released 从 false 改为 true)
  - 追加新的节点和边
  - 修改 Horizon

这减少了通信量,也提高了灵活性。

思想 3:Action 与 Movement 分离

Node 上可以定义 Action(动作):
  - pick(取货)
  - drop(放货)
  - charge(充电)
  - scan(扫描)
  - wait(等待)

Action 和 Movement 是分离的:
  Movement:从 A 移动到 B(由 Edge 定义)
  Action:到达 B 后做什么(由 Node 的 actions 定义)

这使得路径规划和任务执行解耦。

11.6 VDA5050 与 openTCS 的关系

openTCS 的概念          VDA5050 的概念
─────────────────────────────────────
Point                   Node
Path                    Edge
DriveOrder              Order
MovementCommand         Node + Edge(released=true)
Vehicle State           State 消息
Operation               Action
Resource Lock           released 字段
Look-ahead              Horizon(released=false)

可以看到,两者的概念高度对应。

实际集成方式:

openTCS Kernel
    ↓
VDA5050 Driver(将 openTCS 指令转换为 VDA5050 Order)
    ↓
MQTT Broker
    ↓
支持 VDA5050 的 AGV

11.7 VDA5050 的行业影响

之前 之后
每种 AGV 需要专门的 Driver 一个 VDA5050 Driver 适配所有
更换 AGV 品牌需要重写通信层 只要支持 VDA5050 就能即插即用
不同品牌 AGV 无法混合部署 统一协议,混合部署成为可能
企业被厂商锁定 自由选择最适合的 AGV

目前支持 VDA5050 的厂商越来越多:

  • KUKA
  • SEW-EURODRIVE
  • SICK
  • 部分国内厂商也在跟进

11.8 VDA5050 的局限

局限 说明
不涉及调度算法 只定义通信协议,不定义如何调度
不涉及路径规划 只定义如何下发路径,不定义如何计算路径
版本兼容性 v1 和 v2 差异较大
实现差异 不同厂商对标准的理解可能不同
功能覆盖 某些特殊功能可能标准未覆盖

十二、openTCS 的地图建模详解

12.1 地图的核心要素

openTCS 的地图由以下要素组成:

Point(点)
  ├── 坐标 (x, y)
  ├── 类型(HALT / PARK / REPORT)
  └── 属性(自定义 key-value)

Path(路径)
  ├── 起点 Point
  ├── 终点 Point
  ├── 长度
  ├── 最大正向速度
  ├── 最大反向速度(0 = 不允许反向)
  ├── 路由代价(正向/反向)
  ├── 是否锁定
  └── 属性

Location(工位)
  ├── 关联的 Point
  ├── 类型(取货点/放货点/充电桩)
  └── 允许的操作列表

LocationType(工位类型)
  ├── 名称
  └── 允许的操作列表

Block(互斥区域)
  ├── 包含的资源集合(Points + Paths)
  └── 互斥类型

12.2 地图设计的原则

原则 1:节点密度适中

太密:
  P1 -- P2 -- P3 -- P4 -- P5 -- P6(每 0.5 米一个节点)
  问题:资源锁粒度太细,频繁申请/释放,性能差

太疏:
  P1 ─────────────────────── P2(10 米一个节点)
  问题:一台车占用 10 米的路径,其他车无法通过中间区域

合适:
  P1 ──── P2 ──── P3 ──── P4(2-3 米一个节点)
  兼顾性能和灵活性

原则 2:关键位置必须有节点

以下位置必须设置节点:
  - 路口(多条路径交汇)
  - 工位(取货/放货点)
  - 等待区(车辆可以停留等待)
  - 充电桩
  - 电梯口
  - 门口
  - 路径分叉点
  - 路径合并点

原则 3:单向路优先

双向路的问题:
  两台车可能在同一条路上相向而行
  → 需要复杂的会车逻辑
  → 或者死锁

单向路的好处:
  所有车同向行驶
  → 不可能相向冲突
  → 大幅简化调度逻辑

代价:
  路径可能不是最短的(需要绕行)
  但系统整体效率更高(没有冲突等待)

原则 4:设置足够的等待区

等待区的作用:
  当车辆无法继续前进时(前方资源被占用)
  需要一个地方停下来等待
  这个地方不能阻塞其他车辆

好的等待区设计:
  - 在每个路口旁边设置等待点
  - 等待点不在主干道上
  - 等待点有足够的容量

坏的设计:
  - 车辆只能停在路口等待
  - 一台车等待就阻塞了整个路口
  - 导致连锁拥堵

12.3 Block 的详细设计

Block 是 openTCS 中非常重要但经常被忽视的概念。

Block 类型

openTCS 支持不同类型的 Block:

类型 含义 场景
SINGLE_VEHICLE_ONLY 一次只允许一台车 窄走廊、单车道
SAME_DIRECTION_ONLY 允许多台车,但必须同向 宽走廊、单向主干道

SINGLE_VEHICLE_ONLY 示例

窄走廊:

  P3 ──── P4 ──── P5 ──── P6 ──── P7
  
  Block: NarrowCorridor = {P3, P4, P5, P6, P7, 
                           Path34, Path45, Path56, Path67}
  Type: SINGLE_VEHICLE_ONLY

效果:
  整个走廊一次只能有一台车
  其他车必须在走廊外等待

SAME_DIRECTION_ONLY 示例

主干道(宽,可以跟车但不能会车):

  P10 ──── P11 ──── P12 ──── P13
  
  Block: MainRoad = {P10, P11, P12, P13,
                     Path1011, Path1112, Path1213}
  Type: SAME_DIRECTION_ONLY

效果:
  多台车可以同时在主干道上
  但必须同向行驶
  不允许有车反向进入

Block 的嵌套

复杂场景可能需要嵌套 Block:

  大区域 Block A = {P1, P2, P3, P4, P5, ...}
  小区域 Block B = {P3, P4}(Block A 的子集)

  Block B 是一个特别关键的区域(比如电梯口)
  Block A 是整个走廊

  规则:
    进入 Block A 需要申请 Block A
    进入 Block B 需要同时申请 Block A 和 Block B

12.4 地图建模的实际案例

一个小型仓库的地图:

充电区        主干道              工位区
                                    
C1─P1 ═══ P2 ═══ P3 ═══ P4 ═══ P5─W1
   │              │              │
C2─P6      P7 ═══ P8 ═══ P9 ═══ P10─W2
   │       │              │      │
   P11═══ P12═══ P13═══ P14═══ P15─W3
                  │
                  P16─W4(特殊工位)

═══ 表示双向路径
─── 表示单向路径

Block 定义:
  Block1: {P7, P8, P9, Path78, Path89} - 中间走廊,SINGLE_VEHICLE_ONLY
  Block2: {P16} - 特殊工位入口,SINGLE_VEHICLE_ONLY

Location 定义:
  C1, C2: 充电桩(类型:ChargeStation)
  W1, W2, W3, W4: 工位(类型:WorkStation)

十三、Transport Order:任务的完整生命周期

13.1 什么是 Transport Order?

Transport Order 是 openTCS 中的核心概念,代表一个完整的运输任务。

一个 Transport Order 包含:
  - 起点(从哪里取货)
  - 终点(送到哪里)
  - 操作(取货、放货)
  - 优先级
  - 截止时间(可选)
  - 依赖关系(可选:必须在某个任务之后执行)

13.2 Transport Order 的状态机

RAW
  │ 新创建,等待调度
  v
ACTIVE
  │ 已激活,等待分配车辆
  v
DISPATCHABLE
  │ 可调度,等待 Dispatcher 分配
  v
BEING_PROCESSED
  │ 正在执行
  v
FINISHED / FAILED / WITHDRAWN
  最终状态

每个状态的含义:

状态 含义 触发条件
RAW 刚创建 外部系统提交订单
ACTIVE 已激活 依赖的前置任务完成
DISPATCHABLE 可调度 系统确认任务合法
BEING_PROCESSED 执行中 Dispatcher 分配了车辆
FINISHED 完成 车辆到达终点并完成操作
FAILED 失败 执行过程中出错
WITHDRAWN 撤回 人工取消

13.3 DriveOrder:Transport Order 的分解

一个 Transport Order 可能包含多个步骤:

Transport Order: "从工位 A 取货,送到工位 B"

分解为 DriveOrder 序列:
  DriveOrder 1: 从当前位置移动到工位 A
  DriveOrder 2: 在工位 A 执行取货操作
  DriveOrder 3: 从工位 A 移动到工位 B
  DriveOrder 4: 在工位 B 执行放货操作

每个 DriveOrder 包含:

DriveOrder:
  - Route: 具体的路径(Point 和 Path 序列)
  - Destination: 目标 Location
  - Operation: 到达后执行的操作

13.4 Route:路径的详细描述

Route 是 DriveOrder 中最核心的部分:

Route:
  Step 1: P1 → Path12 → P2(前进,速度 1.5m/s)
  Step 2: P2 → Path23 → P3(前进,速度 1.0m/s)
  Step 3: P3 → Path34 → P4(前进,速度 1.5m/s)
  
  总代价: 15
  总距离: 12m
  预计时间: 10s

13.5 任务执行的完整时序

t=0:   ERP 提交 Transport Order:"从 W1 取货送到 W3"
t=1:   Order 状态:RAW → ACTIVE → DISPATCHABLE
t=2:   Dispatcher 触发,评估候选车辆
t=3:   选择 AGV-2(在 P6,空闲,电量 80%)
t=4:   Router 计算路径:P6 → P2 → P3 → P4 → P5 → W1
t=5:   生成 DriveOrder 1:移动到 W1
t=6:   Scheduler 申请资源:[P6, Path62,
P2, Path23, P3](look-ahead=2)
t=7:   资源可用,批准
t=8:   Vehicle Driver 下发指令给 AGV-2
t=9:   AGV-2 开始移动
t=10:  AGV-2 到达 P2,释放 P6 和 Path62
t=11:  Scheduler 申请下一段:[Path34, P4]
t=12:  AGV-2 到达 P3,释放 Path23, P2
t=13:  继续移动...
t=20:  AGV-2 到达 W1
t=21:  执行取货操作(Action: PICK)
t=22:  取货完成,DriveOrder 1 完成
t=23:  生成 DriveOrder 2:从 W1 移动到 W3
t=24:  Router 计算路径:P5 → P4 → P3 → P13 → P14 → P15 → W3
t=25:  Scheduler 申请资源...
t=26:  AGV-2 开始移动
...
t=40:  AGV-2 到达 W3
t=41:  执行放货操作(Action: DROP)
t=42:  放货完成
t=43:  Transport Order 状态:BEING_PROCESSED → FINISHED
t=44:  AGV-2 状态:EXECUTING → IDLE
t=45:  Dispatcher 被触发(车辆变为空闲)
t=46:  检查是否有新任务可分配...

13.6 任务失败的处理

任务执行过程中可能出现各种异常:

场景 1:AGV 故障
  AGV-2 在 P3 突然报错(电机故障)
  → Driver 上报 ERROR 状态
  → Kernel 标记 AGV-2 为 ERROR
  → 当前 DriveOrder 标记为 FAILED
  → Transport Order 标记为 FAILED
  → 释放 AGV-2 占用的后续资源(但保留当前位置 P3)
  → Dispatcher 可以选择:
      a) 将任务重新分配给其他车辆
      b) 等待 AGV-2 恢复
      c) 标记任务失败,通知上层系统

场景 2:路径被阻塞
  AGV-2 在 P3,下一段 P4 被另一台故障车占用
  → Scheduler 拒绝资源申请
  → AGV-2 在 P3 等待
  → 等待超时(例如 60 秒)
  → 触发重新规划
  → Router 计算绕行路径:P3 → P8 → P9 → P14 → P15 → W3
  → 继续执行

场景 3:任务被取消
  人工在系统中取消了这个 Transport Order
  → Kernel 发送"停止"指令给 AGV-2
  → AGV-2 在最近的 HALT_POSITION 停下
  → 释放所有已申请的资源
  → Transport Order 状态:WITHDRAWN
  → AGV-2 状态:IDLE

13.7 任务优先级

工业系统中,不同任务有不同的优先级:

优先级示例:
  紧急补料:优先级 = 10(最高)
  正常搬运:优先级 = 5
  空箱回收:优先级 = 1(最低)

Dispatcher 的行为:
  当有多个任务等待分配时,优先分配高优先级任务
  
  甚至可能:
    AGV-3 正在执行低优先级任务
    来了一个紧急任务
    → 中断 AGV-3 的当前任务
    → 让 AGV-3 先执行紧急任务
    → 紧急任务完成后,再继续原来的任务

这叫做"任务抢占"(Preemption),是高级调度系统的功能。


十四、openTCS 的可扩展性设计

14.1 插件化架构总览

openTCS 的几乎所有核心模块都是可替换的:

┌─────────────────────────────────────────┐
│              openTCS Kernel              │
├─────────────────────────────────────────┤
│                                         │
│  ┌───────────┐  可替换                  │
│  │ Dispatcher│ ←── 你可以实现自己的调度算法│
│  └───────────┘                          │
│                                         │
│  ┌───────────┐  可替换                  │
│  │  Router   │ ←── 你可以实现自己的路径算法│
│  └───────────┘                          │
│                                         │
│  ┌───────────┐  可替换                  │
│  │ Scheduler │ ←── 你可以实现自己的资源策略│
│  └───────────┘                          │
│                                         │
│  ┌───────────┐  可替换                  │
│  │  Driver   │ ←── 你可以接入任何品牌 AGV │
│  └───────────┘                          │
│                                         │
└─────────────────────────────────────────┘

14.2 为什么要可扩展?

因为不同的应用场景需要不同的策略:

场景 Dispatcher 策略 Router 策略 Scheduler 策略
电商仓库 订单批量优化 最短时间 激进锁定
汽车工厂 产线节拍匹配 固定路线 区域互斥
医院 紧急优先 避开人流 保守锁定
半导体工厂 洁净度优先 避开污染区 严格互斥

一个通用的默认实现不可能满足所有场景。

14.3 扩展点的接口设计

openTCS 使用 Java SPI(Service Provider Interface)机制实现插件化:

// Dispatcher 接口
public interface Dispatcher {
    void dispatch();
    void withdrawOrder(TransportOrder order, boolean immediateAbort);
    void reroute(Vehicle vehicle);
}

// Router 接口
public interface Router {
    Optional<List<DriveOrder>> getRoute(Vehicle vehicle, 
                                         Point sourcePoint, 
                                         TransportOrder transportOrder);
    long getCosts(Vehicle vehicle, Point sourcePoint, Point destinationPoint);
    void updateRoutingTopology();
}

// Scheduler 接口(简化)
public interface Scheduler {
    void claim(Client client, List<Set<TCSResource<?>>> resources);
    void allocate(Client client, Set<TCSResource<?>> resources);
    void free(Client client, Set<TCSResource<?>> resources);
}

14.4 自定义 Dispatcher 示例

假设你想实现一个考虑电量的 Dispatcher:

public class BatteryAwareDispatcher implements Dispatcher {
    
    private final Router router;
    private final TransportOrderService orderService;
    private final VehicleService vehicleService;
    
    @Override
    public void dispatch() {
        // 获取所有待分配的订单
        List<TransportOrder> pendingOrders = orderService.getPendingOrders();
        
        // 获取所有空闲车辆
        List<Vehicle> idleVehicles = vehicleService.getIdleVehicles();
        
        for (TransportOrder order : pendingOrders) {
            Vehicle bestVehicle = null;
            double bestScore = Double.MAX_VALUE;
            
            for (Vehicle vehicle : idleVehicles) {
                // 计算路径代价
                double routeCost = router.getCosts(vehicle, 
                    vehicle.getCurrentPosition(), 
                    order.getDestination());
                
                // 计算电量因子
                double energyNeeded = estimateEnergyConsumption(routeCost);
                double currentEnergy = vehicle.getEnergyLevel();
                
                // 如果电量不够,跳过
                if (currentEnergy < energyNeeded * 1.2) { // 20% 余量
                    continue;
                }
                
                // 综合评分:路径代价 + 电量惩罚
                double score = routeCost;
                if (currentEnergy < 50) {
                    score *= 1.5;  // 低电量车辆,增加代价(不太想用它)
                }
                
                if (score < bestScore) {
                    bestScore = score;
                    bestVehicle = vehicle;
                }
            }
            
            if (bestVehicle != null) {
                assignOrderToVehicle(order, bestVehicle);
                idleVehicles.remove(bestVehicle);
            }
        }
    }
    
    private double estimateEnergyConsumption(double routeCost) {
        // 简单估算:每米消耗 0.1% 电量
        return routeCost * 0.1;
    }
}

14.5 自定义 Router 示例

假设你想实现一个考虑实时拥堵的 Router:

public class CongestionAwareRouter implements Router {
    
    private final PlantModelService modelService;
    private final VehicleService vehicleService;
    
    @Override
    public Optional<List<DriveOrder>> getRoute(Vehicle vehicle, 
                                                Point source, 
                                                TransportOrder order) {
        // 构建带拥堵权重的图
        WeightedGraph graph = buildCongestionGraph(vehicle);
        
        // Dijkstra 求最短路
        List<Point> path = dijkstra(graph, source, order.getDestination());
        
        if (path.isEmpty()) {
            return Optional.empty();
        }
        
        return Optional.of(convertToDriveOrders(path, order));
    }
    
    private WeightedGraph buildCongestionGraph(Vehicle requestingVehicle) {
        WeightedGraph graph = new WeightedGraph();
        
        for (Path path : modelService.getAllPaths()) {
            double baseCost = path.getLength();
            
            // 检查这条路径上是否有其他车辆
            int vehiclesOnPath = countVehiclesOnPath(path);
            
            // 拥堵因子:每多一台车,代价增加 50%
            double congestionFactor = 1.0 + vehiclesOnPath * 0.5;
            
            // 检查这条路径是否被锁定
            if (path.isLocked()) {
                continue;  // 锁定的路径不可通行
            }
            
            // 检查车辆类型是否允许通过
            if (!isVehicleAllowed(requestingVehicle, path)) {
                continue;
            }
            
            double finalCost = baseCost * congestionFactor;
            graph.addEdge(path.getSourcePoint(), 
                         path.getDestinationPoint(), 
                         finalCost);
        }
        
        return graph;
    }
    
    private int countVehiclesOnPath(Path path) {
        int count = 0;
        for (Vehicle v : vehicleService.getAllVehicles()) {
            if (v.getCurrentPosition().equals(path.getSourcePoint()) ||
                v.getCurrentPosition().equals(path.getDestinationPoint())) {
                count++;
            }
        }
        return count;
    }
}

十五、openTCS 与 MAPF 的关系

15.1 什么是 MAPF?

MAPF(Multi-Agent Path Finding)是学术界研究的多智能体路径规划问题:

给定:
  - 一张地图
  - N 个智能体
  - 每个智能体有起点和终点

求:
  - 每个智能体的路径
  - 所有路径不冲突(不会碰撞)
  - 某种目标最优(总路径最短 / 最大完成时间最短)

15.2 MAPF 的经典算法

算法 思想 特点
CBS(Conflict-Based Search) 先独立规划,发现冲突后添加约束重新规划 最优解,但慢
ECBS CBS 的有界次优版本 快,解质量有保证
WHCA*(Windowed Hierarchical A*) 时间窗口内的协同 A* 快,但不保证最优
Priority-Based 给智能体排优先级,高优先级先规划 快,但可能不公平
ORCA 基于速度障碍的分布式避碰 实时,但适合开放空间

15.3 openTCS 不是 MAPF 系统

这是很多人的误解。

openTCS 和 MAPF 的核心区别:

维度 MAPF openTCS
规划时机 所有智能体同时规划 一台车一台车地规划
冲突处理 规划阶段就消除冲突 执行阶段通过资源锁避免冲突
最优性 追求全局最优 追求稳定可行
动态性 通常假设静态场景 天然支持动态(新任务、故障)
规模 学术实验通常 < 100 智能体 工业部署 10-100 台
重规划 全部重新计算 只重新计算受影响的车辆

15.4 为什么 openTCS 不用 MAPF?

原因 1:动态性

MAPF 假设:
  所有任务在开始时就已知
  一次性规划所有路径

工业现实:
  任务随时到达
  车辆随时故障
  路径随时被封
  → 需要持续的、增量的调度
  → 不适合"一次性全局规划"

原因 2:计算时间

CBS 求解 50 个智能体的最优解:
  可能需要几秒到几分钟

工业系统的要求:
  新任务到达后,必须在 100ms 内响应
  → 不能等待全局最优解

原因 3:稳定性 vs 最优性

MAPF 追求:
  "所有车的总路径最短"

工业系统追求:
  "系统 7×24 稳定运行,不出事故"

这两个目标有时是矛盾的:
  最优路径可能让多台车走同一条路(因为那条路最短)
  → 虽然理论最优,但实际上增加了冲突风险
  
  稳定的做法是让车辆分散在不同路径上
  → 虽然不是理论最优,但冲突少、系统稳定

原因 4:工程复杂度

MAPF 算法实现复杂度高
调试困难(全局规划,一个 bug 影响所有车)
边界情况多(新任务到达时如何融入已有规划?)

openTCS 的方法:
  每台车独立规划路径
  通过资源锁避免冲突
  逻辑简单、可预测、容易调试

15.5 两种方法的适用场景

场景 更适合 MAPF 更适合 openTCS 方法
高密度仓库(100+ AGV)
任务批量到达
追求吞吐量极限
任务动态到达
异构车队
需要高稳定性
需要快速响应
车辆数量适中(10-50)

15.6 融合方案

实际上,现代系统越来越倾向于融合两种方法:

混合架构:

层 1:全局规划(MAPF 思想)
  - 每隔 N 秒,对所有活跃车辆做一次全局路径规划
  - 消除明显的冲突
  - 优化整体效率

层 2:实时调度(openTCS 思想)
  - 资源锁保证安全
  - 处理动态变化(新任务、故障)
  - 局部重规划

效果:
  - 全局规划提供"大方向"
  - 资源锁保证"安全底线"
  - 两者互补

十六、openTCS 的实际部署架构

16.1 单体部署

最简单的部署方式:

┌─────────────────────────────────┐
│          单台服务器              │
│                                 │
│  ┌─────────────────────────┐   │
│  │     openTCS Kernel      │   │
│  │  (Dispatcher + Router   │   │
│  │   + Scheduler + Drivers)│   │
│  └─────────────────────────┘   │
│                                 │
│  ┌─────────────────────────┐   │
│  │    Plant Overview       │   │
│  │    (Web UI / Swing UI)  │   │
│  └─────────────────────────┘   │
│                                 │
└─────────────────────────────────┘
         │
         │ TCP/MQTT/HTTP
         │
    ┌────┴────┐
    │  AGVs   │
    └─────────┘

适用于:

  • 小规模部署(< 20 台 AGV)
  • 开发和测试环境
  • 比赛演示

16.2 高可用部署

工业环境中的部署:

┌──────────────────────────────────────────────┐
│                 主服务器                       │
│  ┌──────────────────────────────────────┐    │
│  │         openTCS Kernel (Active)      │    │
│  └──────────────────────────────────────┘    │
└──────────────────────────────────────────────┘
         │                          │
         │ 状态同步                  │ 心跳
         │                          │
┌──────────────────────────────────────────────┐
│                 备服务器                       │
│  ┌──────────────────────────────────────┐    │
│  │        openTCS Kernel (Standby)      │    │
│  └──────────────────────────────────────┘    │
└──────────────────────────────────────────────┘

主服务器故障时:
  备服务器自动接管
  AGV 重新连接到备服务器
  任务继续执行

16.3 与外部系统的集成

┌─────────┐     ┌─────────┐     ┌─────────┐
│   ERP   │     │   MES   │     │   WMS   │
└────┬────┘     └────┬────┘     └────┬────┘
     │               │               │
     └───────────────┼───────────────┘
                     │
              ┌──────┴──────┐
              │  REST API   │
              │  / Web Hook │
              └──────┬──────┘
                     │
              ┌──────┴──────┐
              │   openTCS   │
              │   Kernel    │
              └──────┬──────┘
                     │
              ┌──────┴──────┐
              │ MQTT Broker │
              └──────┬──────┘
                     │
         ┌───────────┼───────────┐
         │           │           │
    ┌────┴───┐  ┌────┴───┐  ┌───┴────┐
    │ AGV-1  │  │ AGV-2  │  │ AGV-3  │
    └────────┘  └────────┘  └────────┘

集成接口:

接口 方向 用途
REST API 外部 → openTCS 提交订单、查询状态
WebSocket openTCS → 外部 实时状态推送
MQTT openTCS ↔ AGV 车辆通信
Database openTCS → DB 日志、历史记录

16.4 监控和运维

工业系统必须有完善的监控:

监控维度:
  
  系统级:
    - Kernel CPU/内存使用率
    - 消息队列积压
    - API 响应时间
    
  业务级:
    - 当前活跃任务数
    - 平均任务完成时间
    - 车辆利用率
    - 死锁发生次数
    - 任务失败率
    
  车辆级:
    - 每台车的位置
    - 每台车的状态
    - 每台车的电量
    - 每台车的错误历史
    - 每台车的行驶里程

十七、如果你想做自己的 AGV 调度系统

17.1 核心设计原则

从 openTCS 中可以提炼出以下设计原则:

原则 1:中央调度

不要:
  每辆车自己决定去哪
  车辆之间互相协商

而是:
  统一的中央大脑做所有决策
  车辆只负责执行

原因:
  全局最优、确定性、容易调试、容易保证安全

原则 2:资源预约

不要:
  碰到障碍再停下
  检测到冲突再处理

而是:
  进入之前先申请资源
  申请不到就等待或绕路

原因:
  从逻辑上保证不会碰撞
  不依赖传感器的可靠性
  可以提前检测死锁

原则 3:状态机驱动

不要:
  只维护车辆位置
  用 if-else 判断车辆能不能接任务

而是:
  维护完整的状态机
  状态转换有明确的规则
  每个状态有明确的允许操作

原因:
  防止非法操作
  便于故障恢复
  系统行为可预测

原则 4:分层解耦

不要:
  调度逻辑和通信逻辑混在一起
  路径规划和资源管理混在一起

而是:
  明确分层:
    调度层(Dispatcher)
    规划层(Router)
    资源层(Scheduler)
    通信层(Driver)

原因:
  每层可以独立开发、测试、替换
  逻辑清晰,容易维护

原则 5:事件驱动

不要:
  每隔 1 秒轮询一次所有状态
  定时触发调度

而是:
  状态变化时立即触发相应逻辑
  新任务到达 → 触发调度
  车辆空闲 → 触发调度
  资源释放 → 通知等待的车辆

原因:
  响应快
  资源消耗低
  逻辑清晰

17.2 一个适合比赛的简化版架构

如果你们做软件设计大赛或课程项目,可以这样设计:

┌─────────────────────────────────────────────────┐
│                   Frontend (Vue)                  │
│  ┌──────────┐  ┌──────────┐  ┌──────────────┐  │
│  │ 地图可视化│  │ 任务面板 │  │ 车辆状态面板 │  │
│  └──────────┘  └──────────┘  └──────────────┘  │
└─────────────────────┬───────────────────────────┘
                      │ WebSocket / REST
┌─────────────────────┴───────────────────────────┐
│                   Backend (Python)                │
│                                                  │
│  ┌────────────────┐                             │
│  │  Task Manager  │ ← 接收和管理运输订单         │
│  └───────┬────────┘                             │
│          │                                       │
│  ┌───────┴────────┐                             │
│  │   Dispatcher   │ ← 选择最优车辆              │
│  └───────┬────────┘                             │
│          │                                       │
│  ┌───────┴────────┐                             │
│  │  Path Planner  │ ← A*/Dijkstra/WHCA*        │
│  └───────┬────────┘                             │
│          │                                       │
│  ┌───────┴────────┐                             │
│  │   Scheduler    │ ← 资源锁管理               │
│  └───────┬────────┘                             │
│          │                                       │
│  ┌───────┴────────┐                             │
│  │  AGV Adapter   │ ← 统一通信接口              │
│  └───────┬────────┘                             │
│          │                                       │
└──────────┼───────────────────────────────────────┘
           │
┌──────────┴───────────────────────────────────────┐
│            Simulation Engine (Pygame/PyQt)         │
│  ┌──────┐  ┌──────┐  ┌──────┐  ┌──────┐        │
│  │AGV-1 │  │AGV-2 │  │AGV-3 │  │AGV-4 │        │
│  └──────┘  └──────┘  └──────┘  └──────┘        │
└──────────────────────────────────────────────────┘

17.3 技术栈推荐

模块 推荐技术 原因
后端 Python + FastAPI 开发快,异步支持好
前端 Vue 3 + Canvas/SVG 轻量,可视化方便
通信 WebSocket 实时双向通信
仿真 Pygame 或 PyQt Python 生态,容易集成
路径算法 NetworkX Python 图算法库,开箱即用
消息队列 Redis Pub/Sub 轻量,适合事件驱动

17.4 核心数据结构

from dataclasses import dataclass, field
from enum import Enum
from typing import Optional

# === 地图模型 ===

@dataclass
class Point:
    id: str
    x: float
    y: float
    type: str  # "halt" | "park" | "charge"

@dataclass
class Path:
    id: str
    source: str      # Point ID
    destination: str # Point ID
    length: float
    max_speed: float
    bidirectional: bool = False
    locked: bool = False

@dataclass
class Location:
    id: str
    point_id: str
    type: str  # "pickup" | "dropoff" | "charge"
    operations: list[str] = field(default_factory=list)

# === 车辆模型 ===

class VehicleState(Enum):
    IDLE = "idle"
    EXECUTING = "executing"
    WAITING = "waiting"
    CHARGING = "charging"
    ERROR = "error"
    UNAVAILABLE = "unavailable"

@dataclass
class Vehicle:
    id: str
    current_position: str  # Point ID
    state: VehicleState = VehicleState.IDLE
    energy_level: float = 100.0
    current_order: Optional[str] = None
    
    def can_accept_order(self) -> bool:
        return (self.state == VehicleState.IDLE and 
                self.energy_level > 20.0)

# === 任务模型 ===

class OrderState(Enum):
    PENDING = "pending"
    DISPATCHED = "dispatched"
    EXECUTING = "executing"
    FINISHED = "finished"
    FAILED = "failed"

@dataclass
class TransportOrder:
    id: str
    source_location: str      # Location ID(取货点)
    destination_location: str  # Location ID(放货点)
    priority: int = 5
    state: OrderState = OrderState.PENDING
    assigned_vehicle: Optional[str] = None
    created_at: float = 0.0
    deadline: Optional[float] = None

# === 资源锁模型 ===

@dataclass
class ResourceLock:
    resource_id: str       # Point ID 或 Path ID
    vehicle_id: str        # 占用该资源的车辆
    lock_type: str = "exclusive"  # "exclusive" | "shared"
    timestamp: float = 0.0

17.5 Scheduler 的简化实现

class Scheduler:
    """资源锁管理器"""
    
    def __init__(self):
        self.locks: dict[str, ResourceLock] = {}  # resource_id → lock
        self.waiting_queue: dict[str, list[str]] = {}  # resource_id → [vehicle_ids]
    
    def try_allocate(self, vehicle_id: str, resources: list[str]) -> bool:
        """
        尝试为车辆分配一组资源。
        要么全部成功,要么全部失败(原子性)。
        """
        # 检查所有资源是否可用
        for resource_id in resources:
            if resource_id in self.locks:
                existing_lock = self.locks[resource_id]
                if existing_lock.vehicle_id != vehicle_id:
                    # 资源被其他车辆占用
                    return False
        
        # 全部可用,执行分配
        for resource_id in resources:
            self.locks[resource_id] = ResourceLock(
                resource_id=resource_id,
                vehicle_id=vehicle_id,
                timestamp=time.time()
            )
        
        return True
    
    def release(self, vehicle_id: str, resources: list[str]):
        """释放车辆占用的资源"""
        for resource_id in resources:
            if resource_id in self.locks:
                lock = self.locks[resource_id]
                if lock.vehicle_id == vehicle_id:
                    del self.locks[resource_id]
                    # 通知等待该资源的车辆
                    self._notify_waiting(resource_id)
    
    def release_all(self, vehicle_id: str):
        """释放某车辆的所有资源"""
        to_release = [
            rid for rid, lock in self.locks.items() 
            if lock.vehicle_id == vehicle_id
        ]
        self.release(vehicle_id, to_release)
    
    def get_locked_resources(self, vehicle_id: str) -> list[str]:
        """获取某车辆占用的所有资源"""
        return [
            rid for rid, lock in self.locks.items() 
            if lock.vehicle_id == vehicle_id
        ]
    
    def is_available(self, resource_id: str, requesting_vehicle: str) -> bool:
        """检查资源是否对某车辆可用"""
        if resource_id not in self.locks:
            return True
        return self.locks[resource_id].vehicle_id == requesting_vehicle
    
    def _notify_waiting(self, resource_id: str):
        """资源释放后,通知等待队列中的车辆"""
        if resource_id in self.waiting_queue:
            waiting = self.waiting_queue[resource_id]
            if waiting:
                next_vehicle = waiting.pop(0)
                # 触发该车辆重新尝试分配
                self._trigger_retry(next_vehicle)
    
    def check_deadlock(self) -> list[list[str]]:
        """
        检测死锁:寻找等待环。
        返回所有死锁环中的车辆列表。
        """
        # 构建等待图:vehicle_A → vehicle_B 表示 A 在等待 B 占用的资源
        wait_graph: dict[str, set[str]] = {}
        
        for resource_id, waiters in self.waiting_queue.items():
            if resource_id in self.locks:
                holder = self.locks[resource_id].vehicle_id
                for waiter in waiters:
                    if waiter not in wait_graph:
                        wait_graph[waiter] = set()
                    wait_graph[waiter].add(holder)
        
        # DFS 找环
        cycles = self._find_cycles(wait_graph)
        return cycles
    
    def _find_cycles(self, graph: dict[str, set[str]]) -> list[list[str]]:
        """在有向图中找所有环"""
        visited = set()
        rec_stack = set()
        cycles = []
        
        def dfs(node, path):
            visited.add(node)
            rec_stack.add(node)
            path.append(node)
            
            for neighbor in graph.get(node, set()):
                if neighbor not in visited:
                    dfs(neighbor, path)
                elif neighbor in rec_stack:
                    # 找到环
                    cycle_start = path.index(neighbor)
                    cycles.append(path[cycle_start:])
            
            path.pop()
            rec_stack.remove(node)
        
        for node in graph:
            if node not in visited:
                dfs(node, [])
        
        return cycles

17.6 Dispatcher 的简化实现

class Dispatcher:
    """任务分配器"""
    
    def __init__(self, router: 'Router', scheduler: Scheduler, 
                 vehicles: dict[str, Vehicle], orders: dict[str, TransportOrder]):
        self.router = router
        self.scheduler = scheduler
        self.vehicles = vehicles
        self.orders = orders
    
    def dispatch(self):
        """
        核心调度逻辑:将待分配的订单分配给合适的车辆。
        触发时机:新订单到达、车辆变为空闲、车辆故障。
        """
        pending_orders = self._get_pending_orders()
        available_vehicles = self._get_available_vehicles()
        
        if not pending_orders or not available_vehicles:
            return
        
        # 按优先级排序订单(高优先级先分配)
        pending_orders.sort(key=lambda o: -o.priority)
        
        for order in pending_orders:
            best_vehicle = self._find_best_vehicle(order, available_vehicles)
            
            if best_vehicle:
                self._assign_order(order, best_vehicle)
                available_vehicles.remove(best_vehicle)
    
    def _get_pending_orders(self) -> list[TransportOrder]:
        return [o for o in self.orders.values() if o.state == OrderState.PENDING]
    
    def _get_available_vehicles(self) -> list[Vehicle]:
        return [v for v in self.vehicles.values() if v.can_accept_order()]
    
    def _find_best_vehicle(self, order: TransportOrder, 
                           candidates: list[Vehicle]) -> Optional[Vehicle]:
        """为订单找到最优车辆"""
        best_vehicle = None
        best_cost = float('inf')
        
        for vehicle in candidates:
            # 计算从车辆当前位置到取货点的代价
            cost = self.router.get_cost(
                vehicle.current_position, 
                order.source_location
            )
            
            if cost is None:
                continue  # 无法到达
            
            # 检查电量是否足够
            total_distance = cost + self.router.get_cost(
                order.source_location, 
                order.destination_location
            )
            if total_distance is None:
                continue
                
            energy_needed = self._estimate_energy(total_distance)
            if vehicle.energy_level < energy_needed * 1.3:  # 30% 余量
                continue
            
            # 拥堵惩罚
            congestion_factor = self._get_congestion_factor(
                vehicle.current_position, order.source_location
            )
            adjusted_cost = cost * congestion_factor
            
            if adjusted_cost < best_cost:
                best_cost = adjusted_cost
                best_vehicle = vehicle
        
        return best_vehicle
    
    def _assign_order(self, order: TransportOrder, vehicle: Vehicle):
        """将订单分配给车辆"""
        order.state = OrderState.DISPATCHED
        order.assigned_vehicle = vehicle.id
        vehicle.state = VehicleState.EXECUTING
        vehicle.current_order = order.id
        
        # 计算完整路径
        route = self.router.compute_route(
            vehicle.current_position,
            order.source_location,
            order.destination_location
        )
        
        # 开始执行
        self._execute_route(vehicle, route, order)
    
    def _estimate_energy(self, distance: float) -> float:
        """估算能耗:每米 0.1% 电量"""
        return distance * 0.1
    
    def _get_congestion_factor(self, source: str, destination: str) -> float:
        """计算路径拥堵因子"""
        path = self.router.get_path(source, destination)
        if not path:
            return 1.0
        
        vehicles_on_path = sum(
            1 for v in self.vehicles.values()
            if v.current_position in path and v.state == VehicleState.EXECUTING
        )
        
        return 1.0 + vehicles_on_path * 0.3

17.7 Router 的简化实现

import networkx as nx

class Router:
    """路径规划器"""
    
    def __init__(self, points: dict[str, Point], paths: list[Path]):
        self.graph = nx.DiGraph()
        self._build_graph(points, paths)
    
    def _build_graph(self, points: dict[str, Point], paths: list[Path]):
        """构建路网图"""
        for point in points.values():
            self.graph.add_node(point.id, x=point.x, y=point.y, type=point.type)
        
        for path in paths:
            if not path.locked:
                self.graph.add_edge(
                    path.source, 
                    path.destination,
                    weight=path.length,
                    max_speed=path.max_speed,
                    path_id=path.id
                )
                if path.bidirectional:
                    self.graph.add_edge(
                        path.destination, 
                        path.source,
                        weight=path.length,
                        max_speed=path.max_speed,
                        path_id=path.id + "_rev"
                    )
    
    def get_cost(self, source: str, destination: str) -> Optional[float]:
        """计算两点之间的最短路径代价"""
        try:
            return nx.shortest_path_length(
                self.graph, source, destination, weight='weight'
            )
        except nx.NetworkXNoPath:
            return None
    
    def get_path(self, source: str, destination: str) -> Optional[list[str]]:
        """计算两点之间的最短路径(节点序列)"""
        try:
            return nx.shortest_path(
                self.graph, source, destination, weight='weight'
            )
        except nx.NetworkXNoPath:
            return None
    
    def compute_route(self, current_pos: str, pickup: str, 
                      dropoff: str) -> Optional[list[str]]:
        """计算完整路线:当前位置 → 取货点 → 放货点"""
        path_to_pickup = self.get_path(current_pos, pickup)
        path_to_dropoff = self.get_path(pickup, dropoff)
        
        if path_to_pickup is None or path_to_dropoff is None:
            return None
        
        # 合并路径(去掉重复的中间点)
        full_path = path_to_pickup + path_to_dropoff[1:]
        return full_path
    
    def compute_route_avoiding(self, source: str, destination: str, 
                                avoid_points: set[str]) -> Optional[list[str]]:
        """计算避开某些点的路径(用于绕行)"""
        # 创建临时图,移除需要避开的节点
        temp_graph = self.graph.copy()
        for point in avoid_points:
            if point in temp_graph:
                temp_graph.remove_node(point)
        
        try:
            return nx.shortest_path(temp_graph, source, destination, weight='weight')
        except nx.NetworkXNoPath:
            return None
    
    def update_edge_weight(self, source: str, destination: str, new_weight: float):
        """动态更新边的权重(用于拥堵感知)"""
        if self.graph.has_edge(source, destination):
            self.graph[source][destination]['weight'] = new_weight
    
    def lock_path(self, source: str, destination: str):
        """锁定路径(不可通行)"""
        if self.graph.has_edge(source, destination):
            self.graph.remove_edge(source, destination)
    
    def unlock_path(self, source: str, destination: str, weight: float):
        """解锁路径"""
        self.graph.add_edge(source, destination, weight=weight)

17.8 执行引擎的简化实现

import asyncio

class ExecutionEngine:
    """
    执行引擎:协调 Scheduler 和 Vehicle Driver,
    逐步执行车辆的路径。
    """
    
    def __init__(self, scheduler: Scheduler, vehicles: dict[str, Vehicle]):
        self.scheduler = scheduler
        self.vehicles = vehicles
        self.look_ahead = 2  # 提前锁定 2 步
    
    async def execute_route(self, vehicle_id: str, route: list[str], 
                            order: TransportOrder):
        """执行一条完整路径"""
        vehicle = self.vehicles[vehicle_id]
        
        for i in range(len(route) - 1):
            current = route[i]
            next_point = route[i + 1]
            
            # 计算需要锁定的资源(当前段 + look-ahead)
            resources_needed = self._get_resources_for_step(route, i)
            
            # 尝试获取资源
            success = await self._acquire_resources_with_retry(
                vehicle_id, resources_needed
            )
            
            if not success:
                # 获取资源失败(超时),尝试重新规划
                await self._handle_blocked(vehicle, route, i, order)
                return
            
            # 资源获取成功,执行移动
            await self._move_vehicle(vehicle, next_point)
            
            # 释放已经离开的资源
            if i > 0:
                previous = route[i - 1]
                path_id = f"{previous}_{current}"
                self.scheduler.release(vehicle_id, [previous, path_id])
        
        # 路径执行完成
        self._complete_order(vehicle, order)
    
    def _get_resources_for_step(self, route: list[str], 
                                 current_index: int) -> list[str]:
        """计算当前步骤需要锁定的资源"""
        resources = []
        end_index = min(current_index + self.look_ahead + 1, len(route))
        
        for i in range(current_index, end_index):
            resources.append(route[i])  # Point
            if i < end_index - 1:
                path_id = f"{route[i]}_{route[i+1]}"
                resources.append(path_id)  # Path
        
        return resources
    
    async def _acquire_resources_with_retry(self, vehicle_id: str, 
                                             resources: list[str],
                                             timeout: float = 30.0) -> bool:
        """带重试的资源获取"""
        start_time = asyncio.get_event_loop().time()
        
        while True:
            if self.scheduler.try_allocate(vehicle_id, resources):
                return True
            
            elapsed = asyncio.get_event_loop().time() - start_time
            if elapsed > timeout:
                return False
            
            # 等待一小段时间后重试
            await asyncio.sleep(0.5)
    
    async def _move_vehicle(self, vehicle: Vehicle, target_point: str):
        """
        模拟车辆移动。
        在实际系统中,这里会通过 Driver 发送指令给真实 AGV,
        然后等待 AGV 上报到达。
        """
        # 模拟移动时间(根据距离计算)
        move_time = 2.0  # 简化:固定 2 秒
        await asyncio.sleep(move_time)
        
        # 更新车辆位置
        vehicle.current_position = target_point
    
    async def _handle_blocked(self, vehicle: Vehicle, route: list[str], 
                               blocked_at: int, order: TransportOrder):
        """处理路径被阻塞的情况"""
        # 策略 1:等待(已经在 acquire 中等了 timeout)
        # 策略 2:重新规划绕行路径
        blocked_point = route[blocked_at + 1]
        
        # 尝试绕行
        new_route = self.router.compute_route_avoiding(
            vehicle.current_position,
            route[-1],  # 最终目的地
            avoid_points={blocked_point}
        )
        
        if new_route:
            # 用新路径继续执行
            await self.execute_route(vehicle.id, new_route, order)
        else:
            # 无法绕行,标记任务失败
            order.state = OrderState.FAILED
            vehicle.state = VehicleState.IDLE
            vehicle.current_order = None
    
    def _complete_order(self, vehicle: Vehicle, order: TransportOrder):
        """完成订单"""
        order.state = OrderState.FINISHED
        vehicle.state = VehicleState.IDLE
        vehicle.current_order = None
        
        # 释放所有剩余资源(只保留当前位置)
        all_resources = self.scheduler.get_locked_resources(vehicle.id)
        resources_to_release = [
            r for r in all_resources if r != vehicle.current_position
        ]
        self.scheduler.release(vehicle.id, resources_to_release)

17.9 把它们组装在一起

class AGVControlSystem:
    """AGV 调度系统主类"""
    
    def __init__(self):
        # 加载地图
        self.points, self.paths, self.locations = self._load_map()
        
        # 初始化核心模块
        self.router = Router(self.points, self.paths)
        self.scheduler = Scheduler()
        self.vehicles: dict[str, Vehicle] = {}
        self.orders: dict[str, TransportOrder] = {}
        
        self.dispatcher = Dispatcher(
            self.router, self.scheduler, self.vehicles, self.orders
        )
        self.execution_engine = ExecutionEngine(self.scheduler, self.vehicles)
        
        # 事件处理
        self.event_handlers = {
            'new_order': self._on_new_order,
            'vehicle_idle': self._on_vehicle_idle,
            'vehicle_error': self._on_vehicle_error,
        }
    
    def add_vehicle(self, vehicle_id: str, position: str):
        """注册新车辆"""
        vehicle = Vehicle(id=vehicle_id, current_position=position)
        self.vehicles[vehicle_id] = vehicle
        # 锁定车辆当前位置
        self.scheduler.try_allocate(vehicle_id, [position])
    
    def submit_order(self, order: TransportOrder):
        """提交新订单"""
        self.orders[order.id] = order
        self._trigger_event('new_order', order)
    
    def _on_new_order(self, order: TransportOrder):
        """新订单到达时触发"""
        self.dispatcher.dispatch()
    
    def _on_vehicle_idle(self, vehicle: Vehicle):
        """车辆变为空闲时触发"""
        self.dispatcher.dispatch()
    
    def _on_vehicle_error(self, vehicle: Vehicle):
        """车辆故障时触发"""
        # 重新分配该车辆的任务
        if vehicle.current_order:
            order = self.orders[vehicle.current_order]
            order.state = OrderState.PENDING
            order.assigned_vehicle = None
            vehicle.current_order = None
            # 触发重新调度
            self.dispatcher.dispatch()
    
    def _trigger_event(self, event_type: str, data):
        """触发事件"""
        handler = self.event_handlers.get(event_type)
        if handler:
            handler(data)
    
    async def run(self):
        """主循环"""
        print("AGV Control System started.")
        
        # 定期检查死锁
        while True:
            cycles = self.scheduler.check_deadlock()
            if cycles:
                print(f"Deadlock detected! Vehicles involved: {cycles}")
                self._resolve_deadlock(cycles)
            
            await asyncio.sleep(5)  # 每 5 秒检查一次
    
    def _resolve_deadlock(self, cycles: list[list[str]]):
        """解决死锁"""
        for cycle in cycles:
            # 简单策略:让优先级最低的车辆让路
            vehicles_in_cycle = [self.vehicles[vid] for vid in cycle]
            victim = min(vehicles_in_cycle, 
                        key=lambda v: self.orders.get(v.current_order, 
                                                      TransportOrder(id='', source_location='', 
                                                                     destination_location='')).priority)
            
            # 让 victim 释放资源并重新规划
            self.scheduler.release_all(victim.id)
            # 保留当前位置
            self.scheduler.try_allocate(victim.id, [victim.current_position])
            # 重新规划路径
            if victim.current_order:
                order = self.orders[victim.current_order]
                order.state = OrderState.PENDING
                victim.state = VehicleState.IDLE
                victim.current_order = None
                self.dispatcher.dispatch()

17.10 使用示例

async def main():
    # 创建系统
    system = AGVControlSystem()
    
    # 注册车辆
    system.add_vehicle("AGV-1", "P1")
    system.add_vehicle("AGV-2", "P6")
    system.add_vehicle("AGV-3", "P9")
    
    # 提交订单
    system.submit_order(TransportOrder(
        id="order_001",
        source_location="W1",      # 从工位 1 取货
        destination_location="W3", # 送到工位 3
        priority=5
    ))
    
    system.submit_order(TransportOrder(
        id="order_002",
        source_location="W2",
        destination_location="W4",
        priority=8  # 高优先级
    ))
    
    # 启动系统
    await system.run()

if __name__ == "__main__":
    asyncio.run(main())

十八、总结:从 openTCS 中学到的核心思想

18.1 架构层面

思想 说明 为什么重要
中央调度 一个大脑管所有车 全局最优、确定性、安全
分层解耦 调度/规划/资源/通信分离 可维护、可替换、可测试
插件化 核心模块可替换 适应不同场景
事件驱动 状态变化触发逻辑 响应快、资源省

18.2 算法层面

思想 说明 为什么重要
资源预约 进入前先锁定 逻辑安全保证
图模型 离散化的路网 资源可管理
代价模型 不只是最短路 综合考虑多因素
动态重规划 路径可以随时调整 应对变化

18.3 工程层面

思想 说明 为什么重要
状态机 完整的状态管理 防止非法操作
原子操作 资源分配全部成功或全部失败 一致性
超时机制 等待有上限 避免无限阻塞
故障恢复 任何组件故障都有应对 7×24 运行
可观测性 所有状态可查询 运维友好

18.4 一句话总结

openTCS 教给我们的最重要的一课是:

多车协同的核心不是"路径规划算法",而是"资源管理系统"。

算出路径只是第一步。
保证几十台车在同一个空间里安全、高效、不死锁地运行——
这才是真正的工程挑战。
而解决这个挑战的关键,是一套严谨的资源预约和释放机制。


这就是 openTCS 的完整技术解析。从架构设计到核心算法,从理论原理到代码实现,希望这篇文章能帮你建立对工业 AGV 调度系统的完整认知。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐