从 openTCS 看 AGV 调度系统设计:中央调度、资源锁、Router Dispatcher 与 VDA5050 实战解析
面向读者:初级工程师 / AGV 与多机器人方向学习者
文章目标:理解 openTCS 的核心架构思想,而不是停留在“会跑 Demo”层面
文章目录
- 一、为什么很多 AGV 项目"看起来能跑",但实际上离工业系统很远?
- 二、什么是 openTCS?
- 三、openTCS 的整体架构
- 四、中央调度架构:为什么工业 AGV 更偏向 Centralized?
- 五、Dispatcher:车辆是怎么被分配任务的?
- 六、车辆状态机(Vehicle State Machine)
- 七、Router:路径规划模块
- 八、Scheduler:真正的核心
- 九、死锁问题:多车系统最棘手的挑战
- 十、Vehicle Driver:插件化通信接口
- 十一、VDA5050:AGV 行业的统一通信标准
- 十二、openTCS 的地图建模详解
- 十三、Transport Order:任务的完整生命周期
- 十四、openTCS 的可扩展性设计
- 十五、openTCS 与 MAPF 的关系
- 十六、openTCS 的实际部署架构
- 十七、如果你想做自己的 AGV 调度系统
- 十八、总结:从 openTCS 中学到的核心思想
一、为什么很多 AGV 项目"看起来能跑",但实际上离工业系统很远?
1.1 学生项目的典型做法
很多学生项目里的 AGV 仿真,大概都是这样的:
- A* 算路
- 小车沿路径移动
- 两辆车碰撞就停一下
- 简单避障
这种系统可以做演示,但离真正的工业 AGV 调度系统还差非常远。
1.2 差距到底在哪?
真正的工业场景里,核心问题其实不是:
“车会不会走”
而是:
- 多车如何协同
- 如何避免死锁
- 如何避免路口拥堵
- 如何统一调度
- 如何动态重规划
- 如何处理车辆掉线
- 如何做资源占用控制
- 如何支持不同品牌 AGV
1.3 一个具体的例子
假设你有一个仓库,10 台 AGV 同时运行:
学生项目的做法:
10 台车各自 A* 算路
各自沿路径走
碰到了就停
会发生什么?
- 两台车在走廊相向而行 → 谁也过不去 → 死锁
- 三台车同时到达十字路口 → 互相等待 → 死锁
- 一台车故障停在路中间 → 后面的车全部堵住
- 所有车都选最短路 → 主干道拥堵,旁边的路空着
这些问题,单靠"路径规划算法"是解决不了的。
1.4 工业系统的思路
工业系统的思路完全不同:
不是"让每台车自己想办法"
而是"有一个中央大脑统一管理所有车"
不是"碰到了再处理"
而是"进入之前就预约资源"
不是"只算最短路"
而是"综合考虑拥堵、电量、任务优先级"
不是"车自己避障"
而是"系统保证不会出现需要避障的情况"
这就是 openTCS 的设计哲学。
1.5 一个类比
把仓库想象成一个城市:
| 学生项目 | 工业系统 |
|---|---|
| 没有交通规则 | 完整的交通法规 |
| 没有红绿灯 | 每个路口都有信号控制 |
| 碰到了再让 | 提前规划不会相遇 |
| 每辆车自己导航 | 交通指挥中心统一调度 |
| 出了事故再处理 | 从设计上避免事故 |
openTCS 就是这个"城市交通管理系统"。
二、什么是 openTCS?
2.1 基本定义
openTCS(open Transportation Control System)是由德国 Fraunhofer IML 推出的开源 AGV 调度系统。
它本质上是:
一个"中央交通管制系统"
2.2 它负责什么?
| 职责 | 说明 |
|---|---|
| 订单分配 | 决定哪台车执行哪个任务 |
| 路径规划 | 计算车辆从起点到终点的路线 |
| 资源占用 | 管理路径上每个节点和边的占用状态 |
| 多车协调 | 确保多台车不会互相阻塞 |
| 车辆状态管理 | 跟踪每台车的位置、状态、电量 |
| 交通控制 | 类似红绿灯,控制车辆的通行权 |
| 车辆通信 | 通过统一接口与不同品牌的车通信 |
2.3 它不是什么?
这一点非常重要,很多人搞混:
| 它不是 | 说明 |
|---|---|
| SLAM 系统 | 它不做地图构建和定位 |
| 底盘控制器 | 它不控制电机和轮子 |
| 视觉导航算法 | 它不处理传感器数据 |
| 机器人操作系统 | 它不是 ROS |
它的定位是:
车队调度层(Fleet Management)
在整个系统中的位置:
ERP/MES(企业管理层)
↓ "把货从 A 搬到 B"
openTCS(车队调度层)
↓ "AGV-3 走这条路去执行"
AGV 控制层(底盘 + 导航)
↓ "控制电机,沿路径移动"
物理层(轮子转动)
2.4 为什么调度层是最关键的?
因为:
- 底盘控制:各厂商都能做好(成熟技术)
- 导航定位:SLAM 技术已经很成熟
- 单车路径规划:A* 就够了
但是:
让 10 台、50 台、100 台车在同一个空间里高效协同,不堵车、不死锁、不碰撞——这才是真正的难题。
这就是 openTCS 要解决的问题。
2.5 openTCS 的历史和定位
- 由德国 Fraunhofer IML(物流研究所)开发
- 2000 年代开始,已有 15+ 年历史
- 开源(MIT License)
- Java 实现
- 在欧洲工业界广泛使用
- 是很多商业 AGV 调度系统的参考架构
它代表的是:
欧洲工业界对 AGV 调度问题的标准解法。
三、openTCS 的整体架构
3.1 核心架构图
+----------------+
| ERP / MES |
+----------------+
|
v
+----------------+
| Kernel |
|----------------|
| Dispatcher |
| Router |
| Scheduler |
+----------------+
/ | \
/ | \
v v v
+--------+ +--------+ +--------+
| AGV-1 | | AGV-2 | | AGV-3 |
+--------+ +--------+ +--------+
3.2 Kernel:中央大脑
Kernel 是整个系统的核心。所有决策都在这里做出。
它内部最重要的几个模块:
| 模块 | 作用 | 类比 |
|---|---|---|
| Dispatcher | 订单分配 | 出租车调度中心 |
| Router | 路径规划 | 导航软件 |
| Scheduler | 资源锁与路径占用 | 红绿灯系统 |
| Vehicle Driver | 车辆通信 | 对讲机 |
3.3 为什么 Scheduler 最重要?
很多初学者只关注 Router(路径规划)。
但实际上:
工业系统最复杂的部分通常是 Scheduler。
原因很简单:
"算出路径"是一个数学问题 → 有成熟算法(A*、Dijkstra)
"保证几十台车不互相堵死"是一个工程问题 → 没有银弹
路径规划是"一次性计算",而资源调度是"持续性管理"。
一个类比:
给你一张地图,算出从家到公司的最短路——这很简单。
但是让 1000 辆车同时在这张地图上跑,不堵车——这才是真正的挑战。
3.4 模块之间的协作
一个完整的任务执行流程:
1. ERP 下发订单:"把货从工位 A 搬到工位 B"
2. Dispatcher 选择车辆:AGV-3 最合适(最近、电量够、空闲)
3. Router 计算路径:P1 → P2 → P3 → P4 → P5
4. Scheduler 检查资源:P2 被 AGV-1 占用,等待
5. Scheduler 批准:P2 释放了,AGV-3 可以进入
6. Vehicle Driver 下发指令:告诉 AGV-3 移动到 P2
7. AGV-3 执行:物理移动
8. 重复 4-7 直到到达目的地
注意第 4 步:
路径算出来了,但不能立即执行。必须等资源可用。
这就是 Scheduler 的价值。
3.5 Kernel 的运行模式
openTCS 的 Kernel 有两种运行模式:
| 模式 | 用途 | 说明 |
|---|---|---|
| Modelling | 建模 | 编辑地图、配置车辆,不执行调度 |
| Operating | 运行 | 实际调度车辆,执行任务 |
这种分离很重要:
你不会在飞机飞行时修改飞行控制软件。同样,你不应该在系统运行时修改地图。
四、中央调度架构:为什么工业 AGV 更偏向 Centralized?
4.1 两种调度架构
多机器人系统的调度架构,大体分为两种:
中央调度(Centralized):
所有决策由一个中央节点做出
车辆只负责执行
分布式调度(Decentralized):
每台车自己做决策
车辆之间互相协商
openTCS 选择了中央调度。这不是偶然的,而是工业场景的必然选择。
4.2 为什么工业场景偏向中央调度?
原因 1:全局最优 vs 局部最优
中央调度:
系统知道所有车的位置、任务、路径
→ 可以做出全局最优决策
→ 例如:让 AGV-3 绕远路,避免和 AGV-1 冲突
虽然 AGV-3 多走了 10 米,但系统整体效率更高
分布式调度:
每台车只知道自己的信息和邻居的信息
→ 只能做局部最优决策
→ 例如:AGV-3 选了最短路,结果和 AGV-1 堵在一起
两台车都延迟了 30 秒
工业场景追求的是:
系统整体吞吐量最大化,而不是单台车的路径最短。
原因 2:确定性
工业系统最重要的特性之一是:
确定性(Determinism)
即:
给定相同的输入,系统必须产生相同的输出。
中央调度天然具有确定性:
中央调度:
所有决策由一个节点做出
决策逻辑是确定的
→ 行为可预测、可复现、可审计
分布式调度:
多个节点并发决策
决策依赖于消息到达顺序
→ 行为可能不确定
→ 同样的场景,跑两次可能结果不同
在工业环境中,“不确定"意味着"不可靠”。
原因 3:死锁处理
死锁是多车系统最棘手的问题。
中央调度处理死锁:
系统知道所有车的资源占用情况
→ 可以提前检测死锁
→ 可以拒绝可能导致死锁的资源请求
→ 可以全局重新规划
分布式调度处理死锁:
每台车只知道自己占了什么、想要什么
→ 死锁检测需要全局信息(但分布式系统没有)
→ 通常只能"检测到死锁后恢复"
→ 恢复过程复杂且耗时
原因 4:实现复杂度
中央调度:
逻辑集中在一个地方
调试方便(所有日志在一个节点)
状态一致性容易保证
分布式调度:
逻辑分散在每台车上
调试困难(需要收集所有车的日志)
状态一致性是分布式系统的经典难题
原因 5:工业场景的特点
工业场景有一些特殊性,使得中央调度更合适:
| 特点 | 为什么适合中央调度 |
|---|---|
| 车辆数量有限(10-100台) | 中央节点计算量可控 |
| 环境相对封闭 | 网络稳定,通信可靠 |
| 路网固定 | 不需要实时发现新路径 |
| 安全要求高 | 需要确定性和可审计性 |
| 7×24 运行 | 需要稳定性而非灵活性 |
4.3 中央调度的缺点
当然,中央调度也有缺点:
| 缺点 | 说明 | 工业系统如何应对 |
|---|---|---|
| 单点故障 | 中央节点挂了,全部停 | 主备切换、热备份 |
| 扩展性有限 | 车辆太多时计算量大 | 区域划分、分层调度 |
| 通信依赖 | 网络断了车就失控 | 车辆本地有安全停车逻辑 |
| 响应延迟 | 决策需要经过中央节点 | 毫秒级通信,可接受 |
4.4 什么时候分布式更好?
分布式调度更适合:
- 大规模(1000+ 台机器人)
- 开放环境(城市道路、户外)
- 网络不可靠
- 机器人高度自治
- 不需要严格的全局最优
例如:
- 自动驾驶汽车(城市道路)→ 分布式
- 仓库 AGV(封闭环境)→ 中央调度
- 无人机集群(户外)→ 分布式
- 医院配送机器人(建筑内)→ 中央调度
4.5 openTCS 的选择
openTCS 明确选择了中央调度,因为它的目标场景是:
封闭环境中的 10-100 台 AGV 协同作业。
在这个场景下,中央调度的优势远大于劣势。
4.6 一个直观的对比
场景:10 台 AGV,一个十字路口
中央调度:
系统提前知道 10 台车的路径
发现 3 台车会在 10 秒后同时到达路口
提前安排:AGV-1 先过,AGV-5 减速,AGV-8 绕路
结果:没有任何车需要停下等待
分布式调度:
每台车只知道自己的路径
3 台车同时到达路口
开始协商:"你先走还是我先走?"
协商需要时间(消息来回)
协商期间 3 台车都停着
结果:浪费了 5 秒的等待时间
5 秒看起来不多,但如果每天有 1000 次这样的情况:
5 秒 × 1000 次 × 3 台车 = 4.2 小时的车辆空闲时间
这在工业场景中是不可接受的。
五、Dispatcher:车辆是怎么被分配任务的?
5.1 Dispatcher 的核心职责
Dispatcher 的核心职责是:
把"运输订单"分配给合适的车辆。
看起来简单,但实际上这是一个多目标优化问题。
5.2 一个具体的例子
当前状态:
AGV-1:在充电桩,电量 30%
AGV-2:在 P5,空闲,电量 80%
AGV-3:在 P8,正在执行任务,电量 60%
AGV-4:在 P2,空闲,电量 95%
AGV-5:故障,不可用
新任务:
从工位 A(在 P3 附近)取货,送到工位 B(在 P10 附近)
问题:派哪台车?
简单的"最近优先"会选 AGV-4(在 P2,离 P3 最近)。
但这可能不是最优选择:
考虑因素:
AGV-4 虽然最近,但它去 P3 的路径经过一个拥堵区域
AGV-2 虽然远一点,但路径畅通
AGV-4 去 P3 需要 30 秒(因为拥堵)
AGV-2 去 P3 需要 25 秒(路径畅通)
→ 实际上 AGV-2 更快到达
5.3 Dispatcher 需要考虑的因素
| 因素 | 说明 | 权重 |
|---|---|---|
| 距离 | 车辆到任务起点的距离 | 高 |
| 路径拥堵 | 路径上是否有其他车 | 高 |
| 电量 | 是否有足够电量完成任务 | 必须满足 |
| 车辆状态 | 是否空闲、是否可用 | 必须满足 |
| 车辆能力 | 是否能搬运该类型货物 | 必须满足 |
| 载重限制 | 货物重量是否超过车辆载重 | 必须满足 |
| 当前任务 | 如果正在执行任务,什么时候能空闲 | 中 |
| 充电策略 | 完成任务后是否需要充电 | 低 |
| 公平性 | 避免某台车过度使用 | 低 |
5.4 Dispatcher 的工作流程(详细版)
Step 1: 新订单进入系统
订单内容:从 Location-A 取货,送到 Location-B
订单优先级:普通
Step 2: Dispatcher 被触发
触发条件:新订单到达
Step 3: 筛选候选车辆
过滤条件:
- 状态 = IDLE 或即将空闲
- 电量 > 任务所需最低电量
- 能力匹配(能搬运该类型货物)
- 不在维护状态
结果:AGV-2, AGV-4 是候选
Step 4: 为每个候选车辆计算路径
AGV-2: P5 → P4 → P3 → Location-A → P3 → P6 → P9 → P10 → Location-B
AGV-4: P2 → P3 → Location-A → P3 → P6 → P9 → P10 → Location-B
Step 5: 评估代价
AGV-2 总代价:
路径长度:120m
预计时间:60s
拥堵因子:1.0(路径畅通)
综合代价:60
AGV-4 总代价:
路径长度:80m
预计时间:40s
拥堵因子:1.5(P2→P3 段有其他车)
综合代价:60
Step 6: 选择最优车辆
代价相同,选择电量更高的:AGV-4(95%)
Step 7: 生成 DriveOrder
DriveOrder = {
vehicle: AGV-4,
route: [P2, P3, Location-A, P3, P6, P9, P10, Location-B],
actions: [MOVE, MOVE, LOAD, MOVE, MOVE, MOVE, MOVE, UNLOAD]
}
Step 8: 下发给 Vehicle Driver
通过通信接口发送给 AGV-4
5.5 事件驱动的调度
openTCS 的 Dispatcher 是事件驱动的,不是轮询的。
什么意思?
轮询方式:
每隔 1 秒检查一次:"有没有新任务?有没有空闲车辆?"
问题:1 秒内可能错过最佳时机
事件驱动方式:
当以下事件发生时,立即触发调度:
- 新订单到达
- 某台车变为空闲
- 某台车故障
- 路网拓扑变化(某条路被封)
- 某个任务被取消
事件驱动的好处:
| 好处 | 说明 |
|---|---|
| 响应快 | 事件发生后立即处理 |
| 资源省 | 没有事件时不消耗 CPU |
| 逻辑清晰 | 每种事件有明确的处理逻辑 |
| 可扩展 | 新增事件类型很容易 |
5.6 Dispatcher 的高级策略
策略 1:预分配
普通做法:
车辆空闲 → 等待新任务 → 分配 → 开始移动
预分配:
车辆即将完成当前任务
→ 提前分配下一个任务
→ 当前任务完成后立即开始下一个
→ 减少空闲等待时间
策略 2:任务合并
任务 1:从 A 取货送到 B
任务 2:从 A 取货送到 C
如果 B 和 C 在同一方向:
→ 合并为一个任务:从 A 取两件货,先送 B 再送 C
→ 减少一次空跑
策略 3:充电调度
AGV-4 电量 25%
当前有一个任务需要 20% 电量
简单做法:
电量够 → 执行
智能做法:
执行完后电量只剩 5%
→ 必须立即充电
→ 充电桩可能被占用
→ 可能需要等待
不如现在先充电到 50%
→ 然后执行任务
→ 执行完还有 30%
→ 可以继续接下一个任务
5.7 Dispatcher 的可扩展性
openTCS 的 Dispatcher 是可替换的。
你可以实现自己的 Dispatcher,只要遵循接口:
public interface Dispatcher {
void dispatch(); // 触发一次调度
void withdrawOrder(Vehicle vehicle); // 撤回某车的任务
}
这意味着:
你可以用自己的算法替换默认的调度逻辑。
例如:
- 默认:最近优先
- 你的实现:考虑拥堵的加权最短路
- 高级实现:基于强化学习的调度
六、车辆状态机(Vehicle State Machine)
6.1 为什么需要状态机?
工业 AGV 系统里:
"车的位置"远远不够。
系统必须知道车辆的完整状态,才能做出正确的调度决策。
一个反面例子:
系统只知道 AGV-3 在 P5 位置
系统派 AGV-3 去执行任务
但实际上 AGV-3 正在充电(不能移动)
或者 AGV-3 已经故障(不能响应)
或者 AGV-3 正在被人工操作(不应该干预)
结果:指令发出去了,车没动,系统以为车在执行,实际上什么都没发生
→ 任务超时
→ 系统混乱
6.2 openTCS 的车辆状态
openTCS 维护的车辆状态包含多个维度:
维度 1:集成状态(Integration State)
TO_BE_UTILIZED → 车辆已注册,准备使用
TO_BE_RESPECTED → 车辆存在但不接受任务(只需要避让它)
TO_BE_IGNORED → 车辆完全忽略(维护中)
维度 2:运行状态(Processing State)
IDLE → 空闲,可以接受任务
PROCESSING_ORDER → 正在执行任务
AWAITING_ORDER → 等待新任务(已预分配但未开始)
维度 3:能量状态
电量百分比
是否在充电
是否需要充电(低于阈值)
维度 4:位置状态
当前所在 Point
当前所在 Path(如果在移动中)
朝向角度
维度 5:错误状态
无错误
警告(可继续运行)
错误(需要人工干预)
致命错误(立即停车)
6.3 状态转换图
┌──────────────┐
│ UNAVAILABLE │
└──────┬───────┘
│ 上线
v
┌──────────┐ ┌──────────┐
│ CHARGING │ ←────── │ IDLE │ ←──────────────┐
└──────┬───┘ 电量低 └────┬─────┘ │
│ │ 分配任务 │ 任务完成
│ 充满 v │
│ ┌──────────┐ │
└──────────→ │EXECUTING │ ────────────────┘
└────┬─────┘
│ 等待资源
v
┌──────────┐
│ WAITING │
└────┬─────┘
│ 资源可用
v
┌──────────┐
│EXECUTING │
└──────────┘
任何状态都可能转入:
┌──────────┐
│ ERROR │
└──────────┘
6.4 状态机对调度的影响
| 状态 | 是否允许接单 | 是否占用资源 | 是否需要避让 |
|---|---|---|---|
| IDLE | 是 | 是(当前位置) | 是 |
| EXECUTING | 否 | 是(路径上所有资源) | 是 |
| WAITING | 否 | 是(当前位置) | 是 |
| CHARGING | 通常否 | 是(充电桩) | 是 |
| ERROR | 否 | 是(当前位置) | 是 |
| UNAVAILABLE | 否 | 否 | 否 |
注意"是否占用资源"这一列:
即使车辆故障了,它仍然占用当前位置的资源。
因为它物理上还在那里,其他车不能通过。
6.5 状态机的工程价值
状态机不只是一个理论概念,它在工程中有非常具体的价值:
价值 1:防止非法操作
没有状态机:
系统给一台正在充电的车发送移动指令
→ 车辆可能强制中断充电
→ 电池损伤
→ 或者车辆忽略指令,系统以为在执行
有状态机:
系统检查状态:CHARGING
→ 拒绝分配任务
→ 或者先发送"停止充电"指令,等状态变为 IDLE,再分配
价值 2:故障恢复
AGV-3 突然断联(通信中断)
没有状态机:
系统不知道 AGV-3 怎么了
继续给它发指令
任务一直显示"执行中"
其他车可能和它碰撞
有状态机:
心跳超时 → 状态自动转为 UNAVAILABLE
→ 该车的任务标记为失败
→ 重新分配给其他车
→ 该车占用的资源标记为"不确定"
→ 其他车绕开该区域
价值 3:调度决策的基础
Dispatcher 做决策时:
不是简单地问"哪台车最近"
而是问"哪台车状态为 IDLE 且电量充足且能力匹配"
这些判断全部依赖状态机提供的信息。
价值 4:系统可观测性
运维人员看监控大屏:
AGV-1: IDLE (P3, 85%)
AGV-2: EXECUTING (P5→P8, 60%)
AGV-3: ERROR (P7, 通信超时)
AGV-4: CHARGING (充电桩-2, 45%)
一眼就能看出系统状态。
如果没有状态机,运维人员只能看到一堆坐标,无法快速判断问题。
6.6 状态机的实现要点
public class VehicleState {
private IntegrationState integrationState;
private ProcessingState processingState;
private Point currentPosition;
private double energyLevel;
private List<LoadHandlingDevice> loadHandlingDevices;
private ErrorState errorState;
// 状态转换必须是原子的
public synchronized void transitionTo(ProcessingState newState) {
// 检查转换是否合法
if (!isValidTransition(this.processingState, newState)) {
throw new IllegalStateTransitionException(
this.processingState + " → " + newState);
}
this.processingState = newState;
notifyListeners(); // 通知所有监听者(Dispatcher、UI 等)
}
private boolean isValidTransition(ProcessingState from, ProcessingState to) {
// 定义合法的状态转换
switch (from) {
case IDLE:
return to == PROCESSING_ORDER || to == CHARGING;
case PROCESSING_ORDER:
return to == IDLE || to == WAITING || to == ERROR;
case WAITING:
return to == PROCESSING_ORDER || to == ERROR;
case CHARGING:
return to == IDLE || to == ERROR;
case ERROR:
return to == IDLE || to == UNAVAILABLE;
default:
return false;
}
}
}
关键设计点:
- 原子性:状态转换必须是原子操作,不能出现中间状态
- 合法性检查:不允许非法的状态跳转
- 事件通知:状态变化时通知所有相关模块
七、Router:路径规划模块
7.1 Router 的职责
Router 的职责是:
计算车辆从起点到终点的可行路径。
但它不是简单的"最短路算法"。
7.2 openTCS 的地图模型
openTCS 使用的是图模型(Graph Topology),不是栅格地图。
核心对象:
| 对象 | 含义 | 属性 |
|---|---|---|
| Point | 节点 | 位置坐标、类型(停车点/路径点) |
| Path | 边 | 长度、最大速度、方向、锁定状态 |
| Location | 工位 | 关联的 Point、可执行的操作 |
| Block | 区域 | 一组互斥的资源 |
| Vehicle | 车辆 | 当前位置、状态、能力 |
一个具体的例子:
仓库地图:
P1 ----Path12---- P2 ----Path23---- P3
| | |
Path14 Path25 Path36
| | |
P4 ----Path45---- P5 ----Path56---- P6
| | |
Path47 Path58 Path69
| | |
P7 ----Path78---- P8 ----Path89---- P9
工位:
Location-A 关联 P1(取货点)
Location-B 关联 P9(放货点)
Location-C 关联 P7(充电桩)
7.3 为什么用图模型而不是栅格地图?
| 维度 | 图模型 | 栅格地图 |
|---|---|---|
| 资源控制 | 每条边、每个节点都是明确的资源,可以加锁 | 锁哪个格子?锁多大范围?不明确 |
| 计算效率 | 图搜索(几十到几百个节点) | 栅格搜索(几万到几十万个格子) |
| 语义清晰 | 每个节点有明确含义(工位、路口、等待点) | 格子没有语义 |
| 容易维护 | 修改路网只需要增删节点和边 | 修改地图需要重新绘制 |
| 路径确定性 | 车辆只能沿边移动,路径完全确定 | 车辆可能在格子间任意移动 |
| 交通规则 | 容易定义单向路、禁行区 | 难以表达复杂规则 |
图模型的核心优势是:
把连续的物理空间离散化为有限的、可管理的资源。
这使得资源锁成为可能。
7.4 Point 的类型
openTCS 中的 Point 有不同类型:
| 类型 | 含义 | 用途 |
|---|---|---|
| HALT_POSITION | 停车点 | 车辆可以在此停留 |
| PARK_POSITION | 停泊点 | 车辆空闲时停放的位置 |
| REPORT_POSITION | 报告点 | 车辆经过时上报位置 |
为什么要区分?
如果所有点都能停车:
一台车可能停在路中间
→ 阻塞后面的车
只有 HALT_POSITION 能停车:
车辆只能在指定位置停留
→ 不会阻塞通道
→ 系统可以精确管理每个停车位
这是一个非常重要的设计思想:
不是"哪里都能停",而是"只有指定位置能停"。
就像现实中的停车场:
你不能把车停在马路中间,只能停在车位里。
7.5 Path 的属性
每条 Path(边)有丰富的属性:
| 属性 | 含义 | 影响 |
|---|---|---|
| length | 长度 | 影响路径代价 |
| maxVelocity | 最大速度 | 影响通行时间 |
| maxReverseVelocity | 最大倒车速度 | 0 表示不允许倒车 |
| locked | 是否锁定 | 锁定的路径不可通行 |
| routingCostForward | 正向路由代价 | 影响路径选择 |
| routingCostReverse | 反向路由代价 | 影响路径选择 |
注意 routingCostForward 和 routingCostReverse 可以不同:
场景:一条上坡路
正向(上坡):代价 = 10(慢、费电)
反向(下坡):代价 = 3(快、省电)
这样 Router 会倾向于让车辆下坡走这条路,上坡走另一条路。
7.6 Router 的代价模型
openTCS 的 Router 不是简单的"最短路",而是"最小代价路"。
代价可以包含:
总代价 = 路径长度代价
+ 转弯代价
+ 拥堵代价
+ 单向路绕行代价
+ 车辆类型特定代价
路径长度代价
最基本的代价:路径越长,代价越高。
转弯代价
每次转弯需要减速 → 转弯 → 加速
这比直行慢
因此转弯有额外代价
效果:Router 倾向于选择转弯少的路径
拥堵代价
某条路径上已经有其他车辆
→ 增加该路径的代价
→ Router 倾向于选择空闲路径
这不是"禁止通行",而是"不太想走这条路"
如果绕路代价更大,还是会选择拥堵路径
车辆类型特定代价
大型叉车 AGV:不能走窄通道
小型搬运 AGV:可以走窄通道
对于大型叉车:窄通道代价 = ∞(不可通行)
对于小型 AGV:窄通道代价 = 正常
7.7 Router 的实现
openTCS 默认使用 Dijkstra/A* 的变体:
public class ShortestPathRouter implements Router {
public List<Route.Step> getRoute(Vehicle vehicle, Point source, Point destination) {
// 1. 构建代价图(考虑车辆类型)
CostGraph costGraph = buildCostGraph(vehicle);
// 2. 运行最短路算法
List<Point> path = dijkstra(costGraph, source, destination);
// 3. 转换为 Route.Step 序列
return convertToSteps(path);
}
private CostGraph buildCostGraph(Vehicle vehicle) {
CostGraph graph = new CostGraph();
for (Path path : allPaths) {
// 检查车辆是否能通过这条路径
if (!vehicle.canTraverse(path)) {
continue; // 跳过不可通行的路径
}
// 计算代价
double cost = path.getLength();
cost += path.getRoutingCostForward();
cost *= getCongestionFactor(path); // 拥堵因子
graph.addEdge(path.getSource(), path.getDestination(), cost);
}
return graph;
}
}
7.8 动态路由
Router 不是一次性计算的。当以下情况发生时,需要重新计算:
| 触发条件 | 说明 |
|---|---|
| 路径被锁定 | 某条路因为维护被封 |
| 拥堵变化 | 某条路变得拥堵 |
| 车辆故障 | 某台车停在路中间 |
| 新的资源约束 | Scheduler 锁定了某些资源 |
场景:
AGV-3 的原始路径:P1 → P2 → P3 → P4
AGV-3 走到 P2 时,发现 P3 被 AGV-7 占用(故障停车)
Router 重新计算:P2 → P5 → P6 → P4(绕路)
AGV-3 按新路径继续执行
这就是"动态重规划"。
八、Scheduler:真正的核心
8.1 为什么 Scheduler 是最重要的?
再强调一次:
路径规划解决的是"怎么走"的问题。
Scheduler 解决的是"能不能走"的问题。
一个类比:
你用导航软件算出了一条路线(Router 的工作)
但是路上有红绿灯(Scheduler 的工作)
导航告诉你"直走 200 米右转"
但红灯亮了,你必须等
Scheduler 就是那个控制红绿灯的系统
8.2 资源的定义
在 openTCS 中,以下都是"资源":
| 资源类型 | 说明 | 容量 |
|---|---|---|
| Point | 地图上的节点 | 通常 1(一次只能一台车) |
| Path | 地图上的边 | 通常 1(单车道) |
| Block | 一组互斥的资源 | 1(整个区域一次一台车) |
| Location | 工位 | 1(一次只能一台车操作) |
8.3 资源锁的工作原理
AGV-3 想从 P2 移动到 P3
Step 1: AGV-3 向 Scheduler 申请资源
请求:[Path23, P3]
Step 2: Scheduler 检查
Path23 是否被占用?→ 否
P3 是否被占用?→ 否
Step 3: Scheduler 批准
将 Path23 标记为"AGV-3 占用"
将 P3 标记为"AGV-3 占用"
Step 4: AGV-3 开始移动
Step 5: AGV-3 到达 P3
释放 Path23
释放 P2(已经离开)
保持 P3 的占用
8.4 为什么不能只锁当前位置?
一个常见的错误想法:
“只锁车辆当前所在的位置不就行了?”
不行。原因:
场景:
AGV-3 在 P2,要去 P3
AGV-7 在 P4,要去 P2
如果只锁当前位置:
t=0: AGV-3 在 P2(锁 P2),AGV-7 在 P4(锁 P4)
t=1: AGV-3 开始移动,释放 P2
t=1: AGV-7 看到 P2 空了,开始移动向 P2
t=2: AGV-3 还在 Path23 上(还没到 P3)
t=2: AGV-7 到达 P2
问题:如果 Path23 和 Path42 有交叉点,两车可能碰撞!
正确做法:
AGV-3 移动前,锁定整个路径段:[P2, Path23, P3]
直到 AGV-3 完全到达 P3,才释放 P2 和 Path23
这样 AGV-7 在 AGV-3 移动期间无法进入 Path23 或 P2
8.5 提前锁定(Look-Ahead)
实际上,openTCS 不只锁当前段,还会提前锁定后续几段:
AGV-3 的完整路径:P1 → P2 → P3 → P4 → P5
提前锁定策略(look-ahead = 2):
当 AGV-3 在 P1 时:
已锁定:[P1, Path12, P2, Path23, P3]
当 AGV-3 到达 P2 时:
释放:P1, Path12
新锁定:Path34, P4
当前锁定:[P2, Path23, P3, Path34, P4]
为什么要提前锁定?
如果不提前锁定:
AGV-3 到达 P3 后,才去申请 P4
但 P4 可能已经被其他车占用了
→ AGV-3 必须在 P3 等待
→ 而 P3 可能是一个关键路口
→ AGV-3 等在路口,阻塞了其他车
如果提前锁定:
AGV-3 还在 P1 时就知道 P4 是否可用
如果 P4 不可用,可以提前重新规划路径
→ 不需要停在路口等待
8.6 Block:区域互斥
有些场景下,不只是单个 Point 或 Path 需要互斥,而是整个区域:
场景:一个窄走廊,有 3 个节点
P5 ---- P6 ---- P7
这个走廊一次只能有一台车通过(太窄了,无法会车)
如果只锁单个节点:
AGV-1 从 P5 向 P7 移动,锁了 P5, Path56, P6
AGV-2 从 P7 向 P5 移动,锁了 P7, Path67, P6... 等等,P6 被锁了!
但如果 AGV-1 锁了 P5→P6,AGV-2 锁了 P7→P6:
AGV-1 在 P6,AGV-2 在 P7
AGV-1 想去 P7(被 AGV-2 占)
AGV-2 想去 P6(被 AGV-1 占)
→ 死锁!
Block 的解决方案:
定义 Block:NarrowCorridor = {P5, Path56, P6, Path67, P7}
Block 容量 = 1
规则:一次只有一台车能进入这个 Block
AGV-1 想进入走廊:
申请 Block NarrowCorridor
→ 批准(没有其他车在里面)
→ AGV-1 进入,从 P5 走到 P7
AGV-2 想进入走廊:
申请 Block NarrowCorridor
→ 拒绝(AGV-1 在里面)
→ AGV-2 等待
AGV-1 离开走廊(到达 P7 之后的下一个点):
释放 Block NarrowCorridor
→ AGV-2 获得批准
→ AGV-2 进入走廊
Block 的本质是:
把多个资源打包成一个"大资源",实现区域级别的互斥。
8.7 Scheduler 的完整工作流程
AGV-3 要执行路径:P1 → P2 → P3 → P4 → P5
Scheduler 的工作:
1. 收到 AGV-3 的路径
2. 计算需要的资源序列:
[P1, Path12, P2, Path23, P3, Path34, P4, Path45, P5]
3. 检查 Block:路径是否经过任何 Block?
是 → 需要额外申请 Block
4. 根据 look-ahead 策略,先申请前 N 段资源
5. 如果资源可用 → 批准,AGV-3 开始移动
6. 如果资源不可用 → AGV-3 等待
7. AGV-3 每到达一个新节点:
- 释放已经离开的资源
- 申请下一段资源
8. 如果下一段资源不可用:
- AGV-3 在当前 HALT_POSITION 等待
- 或者触发重新规划
8.8 资源锁 vs 简单避障
再次强调这个区别,因为它是理解工业系统的关键:
| 维度 | 简单避障(学生项目) | 资源锁(工业系统) |
|---|---|---|
| 时机 | 碰到了再处理 | 进入之前就预约 |
| 信息 | 只知道眼前的障碍 | 知道所有车的计划 |
| 结果 | 可能死锁 | 从设计上避免死锁 |
| 效率 | 频繁停车等待 | 提前规划,减少等待 |
| 确定性 | 不确定(取决于运气) | 确定(有明确的规则) |
| 安全性 | 依赖传感器(可能失灵) | 逻辑保证(不依赖传感器) |
一个关键洞察:
资源锁是"逻辑层面"的安全保证。
即使传感器全部失灵,只要资源锁正确工作,车辆就不会碰撞。
因为系统从逻辑上保证了"两台车不会同时出现在同一个位置"。
当然,实际系统中传感器避障仍然存在,作为"最后一道防线"。但它不应该是主要的防碰撞手段。
九、死锁问题:多车系统最棘手的挑战
9.1 什么是死锁?
经典死锁:
AGV-1 占用 A,等待 B
AGV-2 占用 B,等待 A
→ 两者互相等待,谁也动不了
更复杂的死锁:
环形死锁:
AGV-1 占用 A,等待 B
AGV-2 占用 B,等待 C
AGV-3 占用 C,等待 A
→ 三者形成环,谁也动不了
实际场景中的死锁:
场景:单车道走廊
AGV-1 →→→→→→→→→ 方向
←←←←←←←←← AGV-2
两台车在走廊中相向而行
走廊太窄,无法会车
→ 死锁
9.2 死锁的四个必要条件
操作系统理论中的经典知识,完全适用于 AGV 系统:
| 条件 | 在 AGV 系统中的含义 |
|---|---|
| 互斥 | 一个位置一次只能有一台车 |
| 持有并等待 | 车辆占着当前位置,等待下一个位置 |
| 不可抢占 | 不能强制把一台车"搬走" |
| 循环等待 | 多台车形成等待环 |
四个条件同时满足 → 死锁发生。
9.3 openTCS 如何预防死锁?
openTCS 主要通过以下策略降低死锁概率:
策略 1:路网设计(破坏循环等待)
最有效的方法:从路网设计上避免死锁
方法 1:单向路网
所有路径都是单向的
→ 不可能出现两车相向而行
→ 从根本上消除了最常见的死锁场景
P1 → P2 → P3
↑ ↓
P6 ← P5 ← P4
方法 2:环形路网
所有车辆沿同一方向行驶
→ 不会出现对向冲突
方法 3:设置等待区
在关键路口旁边设置等待点
→ 车辆可以"让路"
→ 避免堵在路口
策略 2:资源有序分配(破坏循环等待)
给所有资源编号:P1=1, P2=2, P3=3, ...
规则:车辆只能按编号递增的顺序申请资源
AGV-1 在 P2,想去 P5:
申请顺序:P3, P4, P5(递增)✓
AGV-2 在 P5,想去 P2:
按规则应该:P4, P3, P2(递减)✗ 违反规则!
实际做法:走另一条路(编号递增的路径)
这样就不可能出现循环等待。
但这种方法限制了路径选择,可能导致路径不是最优的。
策略 3:Block 区域互斥(破坏持有并等待)
把可能死锁的区域定义为 Block
一次只允许一台车进入
效果:
车辆要么完全进入 Block(不会和其他车冲突)
要么完全不进入(在外面等待)
→ 不会出现"两台车都在 Block 里互相堵"的情况
策略 4:超时检测 + 恢复
如果预防措施失败(设计不完美),还有最后一道防线:
检测:
如果一台车等待超过 T 秒(例如 30 秒)
→ 可能发生了死锁
恢复:
方案 1:让其中一台车后退
方案 2:让其中一台车绕路
方案 3:人工干预
9.4 死锁 vs 活锁
还有一种更隐蔽的问题:活锁(Livelock)
死锁:两台车都不动
活锁:两台车都在动,但都到不了目的地
例如:
AGV-1 和 AGV-2 在走廊相遇
AGV-1 让路,移到旁边
AGV-2 也让路,移到同一边
AGV-1 又让路,移到另一边
AGV-2 也移到另一边
→ 两台车不停地互相让路,但谁也过不去
就像两个人在走廊里面对面,互相让路但总是让到同一边。
解决活锁的方法:
中央调度的优势:
系统统一决定"谁让谁"
不会出现两台车自己协商导致的活锁
具体做法:
Scheduler 明确指定:AGV-1 等待,AGV-2 先过
不存在"互相让"的情况
9.5 实际工程中的死锁处理
在实际工程中,死锁处理通常是多层次的:
第一层:路网设计(消除 90% 的死锁可能)
- 单向路
- 环形路
- 足够的等待区
第二层:Scheduler 规则(消除 9% 的死锁可能)
- Block 互斥
- 有序资源分配
- Look-ahead 检测
第三层:超时恢复(处理剩余 1%)
- 检测等待超时
- 自动恢复策略
- 人工干预通知
十、Vehicle Driver:插件化通信接口
10.1 为什么需要 Driver 抽象?
现实中 AGV 品牌很多,每家的通信协议都不同:
| 品牌 | 通信方式 | 协议 |
|---|---|---|
| MiR | REST API | HTTP/JSON |
| 海康机器人 | 私有协议 | TCP |
| 快仓 | 私有协议 | TCP |
| 极智嘉 | REST API | HTTP/JSON |
| 仙工 | 私有协议 | TCP/MQTT |
| 自研 AGV | 自定义 | 各种 |
如果调度系统直接和每种 AGV 通信:
Kernel 代码里:
if (brand == "MiR") {
sendHttpRequest(...);
} else if (brand == "海康") {
sendTcpPacket(...);
} else if (brand == "快仓") {
sendCustomProtocol(...);
} else if ...
问题:
* 每增加一种 AGV,就要修改 Kernel 代码
* Kernel 变得越来越臃肿
* 测试困难(需要所有品牌的 AGV 才能测试)
* 耦合严重(一个品牌的协议变了,可能影响其他品牌)
10.2 openTCS 的解决方案:Driver 接口
openTCS 的设计思想是:
Kernel 永远不关心底层协议。它只和一个统一的接口交互。
Kernel
↓ (统一接口)
Vehicle Driver Interface
↓ (不同实现)
┌─────────────┬─────────────┬─────────────┐
│ MiR Driver │ 海康 Driver │ 自研 Driver │
└─────────────┴─────────────┴─────────────┘
↓ ↓ ↓
┌─────────────┬─────────────┬─────────────┐
│ MiR AGV │ 海康 AGV │ 自研 AGV │
└─────────────┴─────────────┴─────────────┘
10.3 Driver 接口的定义
openTCS 的 Vehicle Driver(官方称为 Communication Adapter)需要实现以下核心功能:
public interface VehicleCommAdapter {
// === 生命周期 ===
void enable(); // 启用适配器,建立与 AGV 的连接
void disable(); // 禁用适配器,断开连接
// === 指令下发 ===
void sendCommand(MovementCommand command);
// MovementCommand 包含:
// - 目标 Point
// - 经过的 Path
// - 到达后执行的操作(取货/放货/充电)
// - 最大速度
// - 方向(前进/后退)
// === 状态上报 ===
// 适配器需要主动上报以下信息:
// - 车辆当前位置(哪个 Point)
// - 车辆状态(空闲/执行中/错误)
// - 电量
// - 载荷状态(有货/无货)
// - 错误信息
}
10.4 一个具体的 Driver 实现示例
假设你有一台自研 AGV,通过 TCP 通信:
public class MyAgvCommAdapter implements VehicleCommAdapter {
private TcpClient tcpClient;
private String agvIp;
private int agvPort;
@Override
public void enable() {
// 建立 TCP 连接
tcpClient = new TcpClient(agvIp, agvPort);
tcpClient.connect();
// 启动状态轮询线程
startStatusPolling();
}
@Override
public void sendCommand(MovementCommand command) {
// 将 openTCS 的指令转换为 AGV 能理解的格式
byte[] packet = encodeCommand(command);
tcpClient.send(packet);
}
private byte[] encodeCommand(MovementCommand command) {
// 自研 AGV 的协议格式:
// [Header][CommandType][TargetX][TargetY][Speed][Action]
ByteBuffer buffer = ByteBuffer.allocate(20);
buffer.put((byte) 0xAA); // Header
buffer.put((byte) 0x01); // CommandType: MOVE
buffer.putFloat(command.getTargetPoint().getX());
buffer.putFloat(command.getTargetPoint().getY());
buffer.putFloat(command.getMaxSpeed());
buffer.put(encodeAction(command.getOperation()));
return buffer.array();
}
private void startStatusPolling() {
// 每 200ms 查询一次 AGV 状态
scheduler.scheduleAtFixedRate(() -> {
byte[] response = tcpClient.sendAndReceive(STATUS_QUERY);
AgvStatus status = decodeStatus(response);
// 上报给 Kernel
if (status.positionChanged()) {
reportPosition(status.getCurrentPoint());
}
if (status.stateChanged()) {
reportState(status.getState());
}
reportEnergyLevel(status.getBattery());
}, 0, 200, TimeUnit.MILLISECONDS);
}
}
10.5 Driver 的职责边界
Driver 的职责非常明确:
| Driver 负责 | Driver 不负责 |
|---|---|
| 与 AGV 通信 | 路径规划 |
| 协议转换 | 任务分配 |
| 状态上报 | 资源管理 |
| 指令下发 | 冲突检测 |
| 连接管理 | 调度决策 |
这种清晰的边界意味着:
开发一个新的 Driver:
只需要了解 AGV 的通信协议
不需要了解调度算法
不需要了解资源管理
不需要了解其他 AGV 的存在
开发调度算法:
只需要了解 Driver 接口
不需要了解具体 AGV 的协议
不需要了解 TCP/HTTP/MQTT 的细节
10.6 插件化的工程价值
价值 1:独立开发和测试
团队 A:开发调度算法(只需要 Mock Driver)
团队 B:开发 MiR Driver(只需要 Driver 接口文档)
团队 C:开发海康 Driver(只需要 Driver 接口文档)
三个团队可以并行工作,互不依赖。
价值 2:热插拔
系统运行中:
- 新增一种 AGV → 加载新 Driver,不需要重启系统
- 某种 AGV 下线 → 卸载 Driver,不影响其他 AGV
价值 3:仿真和实际无缝切换
开发阶段:使用 SimulationDriver(模拟 AGV 行为)
测试阶段:使用 LoopbackDriver(回环测试)
部署阶段:使用 RealAgvDriver(真实 AGV)
切换只需要更换 Driver 配置,Kernel 代码完全不变。
这一点对比赛特别有用:
你可以先用仿真 Driver 开发和调试整个系统,最后再接入真实硬件。
价值 4:异构车队管理
同一个 Kernel 可以同时管理:
- 3 台 MiR(使用 MiR Driver)
- 5 台自研 AGV(使用自研 Driver)
- 2 台叉车(使用叉车 Driver)
Kernel 不关心它们是什么品牌
只关心它们的状态和能力
10.7 Driver 与 Kernel 的交互时序
时间线:
t=0: Kernel 决定让 AGV-3 从 P2 移动到 P3
t=1: Kernel 调用 driver.sendCommand(MoveTo P3)
t=2: Driver 将指令转换为 AGV 协议,发送给 AGV
t=3: AGV 开始移动
t=4: AGV 到达 P3
t=5: AGV 上报位置(通过 AGV 自己的协议)
t=6: Driver 收到位置上报
t=7: Driver 调用 reportPosition(P3) 通知 Kernel
t=8: Kernel 更新 AGV-3 的位置为 P3
t=9: Kernel 释放 P2 的资源锁
t=10: Kernel 为 AGV-3 申请下一段资源
注意:
Kernel 不是通过"猜测"来知道 AGV 到达了。
而是通过 Driver 的明确上报。
这保证了系统状态和物理状态的一致性。
十一、VDA5050:AGV 行业的统一通信标准
11.1 为什么需要标准?
在 VDA5050 出现之前,AGV 行业的状况是:
每个 AGV 厂商都有自己的:
- 通信协议
- 数据格式
- 状态定义
- 指令集
- 错误码
结果:
- 每接入一种新 AGV,都要从头开发 Driver
- 不同品牌的 AGV 无法混合部署
- 更换 AGV 品牌意味着重写整个通信层
- 企业被厂商锁定
这就像:
如果每个手机品牌都用不同的充电接口,你换手机就要换充电器。
USB-C 统一了充电接口。
VDA5050 统一了 AGV 通信接口。
11.2 VDA5050 是什么?
VDA5050 是由德国汽车工业协会(VDA)和德国机械设备制造业联合会(VDMA)联合推出的 AGV/AMR 通信标准。
核心特点:
| 特点 | 说明 |
|---|---|
| 基于 MQTT | 使用 MQTT 消息队列通信 |
| JSON 格式 | 数据用 JSON 编码,人类可读 |
| 双向通信 | 调度系统下发指令,AGV 上报状态 |
| 标准化状态 | 统一的状态定义和错误码 |
| 路径模型 | 基于 Node + Edge 的路径描述 |
11.3 VDA5050 的通信架构
Fleet Manager(调度系统)
↓↑
MQTT Broker
↓↑
AGV(车辆)
Topic 结构:
下发指令:uagv/v2/{manufacturer}/{serialNumber}/order
上报状态:uagv/v2/{manufacturer}/{serialNumber}/state
可视化: uagv/v2/{manufacturer}/{serialNumber}/visualization
连接状态:uagv/v2/{manufacturer}/{serialNumber}/connection
11.4 VDA5050 的核心消息
Order(订单/指令)
调度系统下发给 AGV 的路径和动作:
{
"headerId": 1,
"timestamp": "2024-01-15T10:30:00Z",
"version": "2.0.0",
"manufacturer": "MyCompany",
"serialNumber": "AGV-001",
"orderId": "order_001",
"orderUpdateId": 0,
"nodes": [
{
"nodeId": "P1",
"sequenceId": 0,
"released": true,
"nodePosition": {"x": 10.0, "y": 5.0, "mapId": "floor1"},
"actions": []
},
{
"nodeId": "P2",
"sequenceId": 2,
"released": true,
"nodePosition": {"x": 15.0, "y": 5.0, "mapId": "floor1"},
"actions": [
{
"actionType": "pick",
"actionId": "action_001",
"blockingType": "HARD"
}
]
},
{
"nodeId": "P3",
"sequenceId": 4,
"released": false,
"nodePosition": {"x": 20.0, "y": 5.0, "mapId": "floor1"},
"actions": []
}
],
"edges": [
{
"edgeId": "E1-2",
"sequenceId": 1,
"released": true,
"startNodeId": "P1",
"endNodeId": "P2",
"maxSpeed": 1.5
},
{
"edgeId": "E2-3",
"sequenceId": 3,
"released": false,
"startNodeId": "P2",
"endNodeId": "P3",
"maxSpeed": 1.0
}
]
}
注意 released 字段:
released = true:AGV 可以立即执行这段路径
released = false:AGV 不能执行,需要等待后续指令释放
这就是 VDA5050 版本的"资源锁"!
调度系统通过控制 released 字段来实现交通管制:
- 先释放前几段路径
- AGV 执行
- 确认安全后,再释放后续路径
State(状态上报)
AGV 定期上报给调度系统的状态:
{
"headerId": 42,
"timestamp": "2024-01-15T10:30:05Z",
"orderId": "order_001",
"lastNodeId": "P1",
"lastNodeSequenceId": 0,
"driving": true,
"paused": false,
"nodeStates": [...],
"edgeStates": [...],
"agvPosition": {
"x": 12.5,
"y": 5.0,
"theta": 0.0,
"mapId": "floor1",
"positionInitialized": true
},
"velocity": {
"vx": 1.2,
"vy": 0.0,
"omega": 0.0
},
"batteryState": {
"batteryCharge": 75.5,
"charging": false
},
"operatingMode": "AUTOMATIC",
"errors": [],
"loads": [
{
"loadId": "pallet_001",
"loadType": "EURO_PALLET",
"weight": 500
}
],
"safetyState": {
"eStop": "NONE",
"fieldViolation": false
}
}
11.5 VDA5050 的关键设计思想
思想 1:Base + Horizon
Order 中的路径分为两部分:
Base(基础段):released = true
AGV 必须执行的路径
调度系统保证这段路径是安全的(资源已锁定)
Horizon(预览段):released = false
AGV 知道后续路径,但不能执行
用于提前减速、规划等
调度系统确认安全后,会发送 orderUpdate 释放
好处:
- AGV 可以提前知道后续路径(平滑运动)
- 但不会提前进入未授权的区域(安全)
- 调度系统可以随时修改 Horizon(灵活)
这和 openTCS 的 look-ahead 思想完全一致。
思想 2:增量更新
不需要每次都发送完整的 Order
可以通过 orderUpdateId 增量更新:
- 释放更多节点(把 released 从 false 改为 true)
- 追加新的节点和边
- 修改 Horizon
这减少了通信量,也提高了灵活性。
思想 3:Action 与 Movement 分离
Node 上可以定义 Action(动作):
- pick(取货)
- drop(放货)
- charge(充电)
- scan(扫描)
- wait(等待)
Action 和 Movement 是分离的:
Movement:从 A 移动到 B(由 Edge 定义)
Action:到达 B 后做什么(由 Node 的 actions 定义)
这使得路径规划和任务执行解耦。
11.6 VDA5050 与 openTCS 的关系
openTCS 的概念 VDA5050 的概念
─────────────────────────────────────
Point Node
Path Edge
DriveOrder Order
MovementCommand Node + Edge(released=true)
Vehicle State State 消息
Operation Action
Resource Lock released 字段
Look-ahead Horizon(released=false)
可以看到,两者的概念高度对应。
实际集成方式:
openTCS Kernel
↓
VDA5050 Driver(将 openTCS 指令转换为 VDA5050 Order)
↓
MQTT Broker
↓
支持 VDA5050 的 AGV
11.7 VDA5050 的行业影响
| 之前 | 之后 |
|---|---|
| 每种 AGV 需要专门的 Driver | 一个 VDA5050 Driver 适配所有 |
| 更换 AGV 品牌需要重写通信层 | 只要支持 VDA5050 就能即插即用 |
| 不同品牌 AGV 无法混合部署 | 统一协议,混合部署成为可能 |
| 企业被厂商锁定 | 自由选择最适合的 AGV |
目前支持 VDA5050 的厂商越来越多:
- KUKA
- SEW-EURODRIVE
- SICK
- 部分国内厂商也在跟进
11.8 VDA5050 的局限
| 局限 | 说明 |
|---|---|
| 不涉及调度算法 | 只定义通信协议,不定义如何调度 |
| 不涉及路径规划 | 只定义如何下发路径,不定义如何计算路径 |
| 版本兼容性 | v1 和 v2 差异较大 |
| 实现差异 | 不同厂商对标准的理解可能不同 |
| 功能覆盖 | 某些特殊功能可能标准未覆盖 |
十二、openTCS 的地图建模详解
12.1 地图的核心要素
openTCS 的地图由以下要素组成:
Point(点)
├── 坐标 (x, y)
├── 类型(HALT / PARK / REPORT)
└── 属性(自定义 key-value)
Path(路径)
├── 起点 Point
├── 终点 Point
├── 长度
├── 最大正向速度
├── 最大反向速度(0 = 不允许反向)
├── 路由代价(正向/反向)
├── 是否锁定
└── 属性
Location(工位)
├── 关联的 Point
├── 类型(取货点/放货点/充电桩)
└── 允许的操作列表
LocationType(工位类型)
├── 名称
└── 允许的操作列表
Block(互斥区域)
├── 包含的资源集合(Points + Paths)
└── 互斥类型
12.2 地图设计的原则
原则 1:节点密度适中
太密:
P1 -- P2 -- P3 -- P4 -- P5 -- P6(每 0.5 米一个节点)
问题:资源锁粒度太细,频繁申请/释放,性能差
太疏:
P1 ─────────────────────── P2(10 米一个节点)
问题:一台车占用 10 米的路径,其他车无法通过中间区域
合适:
P1 ──── P2 ──── P3 ──── P4(2-3 米一个节点)
兼顾性能和灵活性
原则 2:关键位置必须有节点
以下位置必须设置节点:
- 路口(多条路径交汇)
- 工位(取货/放货点)
- 等待区(车辆可以停留等待)
- 充电桩
- 电梯口
- 门口
- 路径分叉点
- 路径合并点
原则 3:单向路优先
双向路的问题:
两台车可能在同一条路上相向而行
→ 需要复杂的会车逻辑
→ 或者死锁
单向路的好处:
所有车同向行驶
→ 不可能相向冲突
→ 大幅简化调度逻辑
代价:
路径可能不是最短的(需要绕行)
但系统整体效率更高(没有冲突等待)
原则 4:设置足够的等待区
等待区的作用:
当车辆无法继续前进时(前方资源被占用)
需要一个地方停下来等待
这个地方不能阻塞其他车辆
好的等待区设计:
- 在每个路口旁边设置等待点
- 等待点不在主干道上
- 等待点有足够的容量
坏的设计:
- 车辆只能停在路口等待
- 一台车等待就阻塞了整个路口
- 导致连锁拥堵
12.3 Block 的详细设计
Block 是 openTCS 中非常重要但经常被忽视的概念。
Block 类型
openTCS 支持不同类型的 Block:
| 类型 | 含义 | 场景 |
|---|---|---|
| SINGLE_VEHICLE_ONLY | 一次只允许一台车 | 窄走廊、单车道 |
| SAME_DIRECTION_ONLY | 允许多台车,但必须同向 | 宽走廊、单向主干道 |
SINGLE_VEHICLE_ONLY 示例
窄走廊:
P3 ──── P4 ──── P5 ──── P6 ──── P7
Block: NarrowCorridor = {P3, P4, P5, P6, P7,
Path34, Path45, Path56, Path67}
Type: SINGLE_VEHICLE_ONLY
效果:
整个走廊一次只能有一台车
其他车必须在走廊外等待
SAME_DIRECTION_ONLY 示例
主干道(宽,可以跟车但不能会车):
P10 ──── P11 ──── P12 ──── P13
Block: MainRoad = {P10, P11, P12, P13,
Path1011, Path1112, Path1213}
Type: SAME_DIRECTION_ONLY
效果:
多台车可以同时在主干道上
但必须同向行驶
不允许有车反向进入
Block 的嵌套
复杂场景可能需要嵌套 Block:
大区域 Block A = {P1, P2, P3, P4, P5, ...}
小区域 Block B = {P3, P4}(Block A 的子集)
Block B 是一个特别关键的区域(比如电梯口)
Block A 是整个走廊
规则:
进入 Block A 需要申请 Block A
进入 Block B 需要同时申请 Block A 和 Block B
12.4 地图建模的实际案例
一个小型仓库的地图:
充电区 主干道 工位区
C1─P1 ═══ P2 ═══ P3 ═══ P4 ═══ P5─W1
│ │ │
C2─P6 P7 ═══ P8 ═══ P9 ═══ P10─W2
│ │ │ │
P11═══ P12═══ P13═══ P14═══ P15─W3
│
P16─W4(特殊工位)
═══ 表示双向路径
─── 表示单向路径
Block 定义:
Block1: {P7, P8, P9, Path78, Path89} - 中间走廊,SINGLE_VEHICLE_ONLY
Block2: {P16} - 特殊工位入口,SINGLE_VEHICLE_ONLY
Location 定义:
C1, C2: 充电桩(类型:ChargeStation)
W1, W2, W3, W4: 工位(类型:WorkStation)
十三、Transport Order:任务的完整生命周期
13.1 什么是 Transport Order?
Transport Order 是 openTCS 中的核心概念,代表一个完整的运输任务。
一个 Transport Order 包含:
- 起点(从哪里取货)
- 终点(送到哪里)
- 操作(取货、放货)
- 优先级
- 截止时间(可选)
- 依赖关系(可选:必须在某个任务之后执行)
13.2 Transport Order 的状态机
RAW
│ 新创建,等待调度
v
ACTIVE
│ 已激活,等待分配车辆
v
DISPATCHABLE
│ 可调度,等待 Dispatcher 分配
v
BEING_PROCESSED
│ 正在执行
v
FINISHED / FAILED / WITHDRAWN
最终状态
每个状态的含义:
| 状态 | 含义 | 触发条件 |
|---|---|---|
| RAW | 刚创建 | 外部系统提交订单 |
| ACTIVE | 已激活 | 依赖的前置任务完成 |
| DISPATCHABLE | 可调度 | 系统确认任务合法 |
| BEING_PROCESSED | 执行中 | Dispatcher 分配了车辆 |
| FINISHED | 完成 | 车辆到达终点并完成操作 |
| FAILED | 失败 | 执行过程中出错 |
| WITHDRAWN | 撤回 | 人工取消 |
13.3 DriveOrder:Transport Order 的分解
一个 Transport Order 可能包含多个步骤:
Transport Order: "从工位 A 取货,送到工位 B"
分解为 DriveOrder 序列:
DriveOrder 1: 从当前位置移动到工位 A
DriveOrder 2: 在工位 A 执行取货操作
DriveOrder 3: 从工位 A 移动到工位 B
DriveOrder 4: 在工位 B 执行放货操作
每个 DriveOrder 包含:
DriveOrder:
- Route: 具体的路径(Point 和 Path 序列)
- Destination: 目标 Location
- Operation: 到达后执行的操作
13.4 Route:路径的详细描述
Route 是 DriveOrder 中最核心的部分:
Route:
Step 1: P1 → Path12 → P2(前进,速度 1.5m/s)
Step 2: P2 → Path23 → P3(前进,速度 1.0m/s)
Step 3: P3 → Path34 → P4(前进,速度 1.5m/s)
总代价: 15
总距离: 12m
预计时间: 10s
13.5 任务执行的完整时序
t=0: ERP 提交 Transport Order:"从 W1 取货送到 W3"
t=1: Order 状态:RAW → ACTIVE → DISPATCHABLE
t=2: Dispatcher 触发,评估候选车辆
t=3: 选择 AGV-2(在 P6,空闲,电量 80%)
t=4: Router 计算路径:P6 → P2 → P3 → P4 → P5 → W1
t=5: 生成 DriveOrder 1:移动到 W1
t=6: Scheduler 申请资源:[P6, Path62,
P2, Path23, P3](look-ahead=2)
t=7: 资源可用,批准
t=8: Vehicle Driver 下发指令给 AGV-2
t=9: AGV-2 开始移动
t=10: AGV-2 到达 P2,释放 P6 和 Path62
t=11: Scheduler 申请下一段:[Path34, P4]
t=12: AGV-2 到达 P3,释放 Path23, P2
t=13: 继续移动...
t=20: AGV-2 到达 W1
t=21: 执行取货操作(Action: PICK)
t=22: 取货完成,DriveOrder 1 完成
t=23: 生成 DriveOrder 2:从 W1 移动到 W3
t=24: Router 计算路径:P5 → P4 → P3 → P13 → P14 → P15 → W3
t=25: Scheduler 申请资源...
t=26: AGV-2 开始移动
...
t=40: AGV-2 到达 W3
t=41: 执行放货操作(Action: DROP)
t=42: 放货完成
t=43: Transport Order 状态:BEING_PROCESSED → FINISHED
t=44: AGV-2 状态:EXECUTING → IDLE
t=45: Dispatcher 被触发(车辆变为空闲)
t=46: 检查是否有新任务可分配...
13.6 任务失败的处理
任务执行过程中可能出现各种异常:
场景 1:AGV 故障
AGV-2 在 P3 突然报错(电机故障)
→ Driver 上报 ERROR 状态
→ Kernel 标记 AGV-2 为 ERROR
→ 当前 DriveOrder 标记为 FAILED
→ Transport Order 标记为 FAILED
→ 释放 AGV-2 占用的后续资源(但保留当前位置 P3)
→ Dispatcher 可以选择:
a) 将任务重新分配给其他车辆
b) 等待 AGV-2 恢复
c) 标记任务失败,通知上层系统
场景 2:路径被阻塞
AGV-2 在 P3,下一段 P4 被另一台故障车占用
→ Scheduler 拒绝资源申请
→ AGV-2 在 P3 等待
→ 等待超时(例如 60 秒)
→ 触发重新规划
→ Router 计算绕行路径:P3 → P8 → P9 → P14 → P15 → W3
→ 继续执行
场景 3:任务被取消
人工在系统中取消了这个 Transport Order
→ Kernel 发送"停止"指令给 AGV-2
→ AGV-2 在最近的 HALT_POSITION 停下
→ 释放所有已申请的资源
→ Transport Order 状态:WITHDRAWN
→ AGV-2 状态:IDLE
13.7 任务优先级
工业系统中,不同任务有不同的优先级:
优先级示例:
紧急补料:优先级 = 10(最高)
正常搬运:优先级 = 5
空箱回收:优先级 = 1(最低)
Dispatcher 的行为:
当有多个任务等待分配时,优先分配高优先级任务
甚至可能:
AGV-3 正在执行低优先级任务
来了一个紧急任务
→ 中断 AGV-3 的当前任务
→ 让 AGV-3 先执行紧急任务
→ 紧急任务完成后,再继续原来的任务
这叫做"任务抢占"(Preemption),是高级调度系统的功能。
十四、openTCS 的可扩展性设计
14.1 插件化架构总览
openTCS 的几乎所有核心模块都是可替换的:
┌─────────────────────────────────────────┐
│ openTCS Kernel │
├─────────────────────────────────────────┤
│ │
│ ┌───────────┐ 可替换 │
│ │ Dispatcher│ ←── 你可以实现自己的调度算法│
│ └───────────┘ │
│ │
│ ┌───────────┐ 可替换 │
│ │ Router │ ←── 你可以实现自己的路径算法│
│ └───────────┘ │
│ │
│ ┌───────────┐ 可替换 │
│ │ Scheduler │ ←── 你可以实现自己的资源策略│
│ └───────────┘ │
│ │
│ ┌───────────┐ 可替换 │
│ │ Driver │ ←── 你可以接入任何品牌 AGV │
│ └───────────┘ │
│ │
└─────────────────────────────────────────┘
14.2 为什么要可扩展?
因为不同的应用场景需要不同的策略:
| 场景 | Dispatcher 策略 | Router 策略 | Scheduler 策略 |
|---|---|---|---|
| 电商仓库 | 订单批量优化 | 最短时间 | 激进锁定 |
| 汽车工厂 | 产线节拍匹配 | 固定路线 | 区域互斥 |
| 医院 | 紧急优先 | 避开人流 | 保守锁定 |
| 半导体工厂 | 洁净度优先 | 避开污染区 | 严格互斥 |
一个通用的默认实现不可能满足所有场景。
14.3 扩展点的接口设计
openTCS 使用 Java SPI(Service Provider Interface)机制实现插件化:
// Dispatcher 接口
public interface Dispatcher {
void dispatch();
void withdrawOrder(TransportOrder order, boolean immediateAbort);
void reroute(Vehicle vehicle);
}
// Router 接口
public interface Router {
Optional<List<DriveOrder>> getRoute(Vehicle vehicle,
Point sourcePoint,
TransportOrder transportOrder);
long getCosts(Vehicle vehicle, Point sourcePoint, Point destinationPoint);
void updateRoutingTopology();
}
// Scheduler 接口(简化)
public interface Scheduler {
void claim(Client client, List<Set<TCSResource<?>>> resources);
void allocate(Client client, Set<TCSResource<?>> resources);
void free(Client client, Set<TCSResource<?>> resources);
}
14.4 自定义 Dispatcher 示例
假设你想实现一个考虑电量的 Dispatcher:
public class BatteryAwareDispatcher implements Dispatcher {
private final Router router;
private final TransportOrderService orderService;
private final VehicleService vehicleService;
@Override
public void dispatch() {
// 获取所有待分配的订单
List<TransportOrder> pendingOrders = orderService.getPendingOrders();
// 获取所有空闲车辆
List<Vehicle> idleVehicles = vehicleService.getIdleVehicles();
for (TransportOrder order : pendingOrders) {
Vehicle bestVehicle = null;
double bestScore = Double.MAX_VALUE;
for (Vehicle vehicle : idleVehicles) {
// 计算路径代价
double routeCost = router.getCosts(vehicle,
vehicle.getCurrentPosition(),
order.getDestination());
// 计算电量因子
double energyNeeded = estimateEnergyConsumption(routeCost);
double currentEnergy = vehicle.getEnergyLevel();
// 如果电量不够,跳过
if (currentEnergy < energyNeeded * 1.2) { // 20% 余量
continue;
}
// 综合评分:路径代价 + 电量惩罚
double score = routeCost;
if (currentEnergy < 50) {
score *= 1.5; // 低电量车辆,增加代价(不太想用它)
}
if (score < bestScore) {
bestScore = score;
bestVehicle = vehicle;
}
}
if (bestVehicle != null) {
assignOrderToVehicle(order, bestVehicle);
idleVehicles.remove(bestVehicle);
}
}
}
private double estimateEnergyConsumption(double routeCost) {
// 简单估算:每米消耗 0.1% 电量
return routeCost * 0.1;
}
}
14.5 自定义 Router 示例
假设你想实现一个考虑实时拥堵的 Router:
public class CongestionAwareRouter implements Router {
private final PlantModelService modelService;
private final VehicleService vehicleService;
@Override
public Optional<List<DriveOrder>> getRoute(Vehicle vehicle,
Point source,
TransportOrder order) {
// 构建带拥堵权重的图
WeightedGraph graph = buildCongestionGraph(vehicle);
// Dijkstra 求最短路
List<Point> path = dijkstra(graph, source, order.getDestination());
if (path.isEmpty()) {
return Optional.empty();
}
return Optional.of(convertToDriveOrders(path, order));
}
private WeightedGraph buildCongestionGraph(Vehicle requestingVehicle) {
WeightedGraph graph = new WeightedGraph();
for (Path path : modelService.getAllPaths()) {
double baseCost = path.getLength();
// 检查这条路径上是否有其他车辆
int vehiclesOnPath = countVehiclesOnPath(path);
// 拥堵因子:每多一台车,代价增加 50%
double congestionFactor = 1.0 + vehiclesOnPath * 0.5;
// 检查这条路径是否被锁定
if (path.isLocked()) {
continue; // 锁定的路径不可通行
}
// 检查车辆类型是否允许通过
if (!isVehicleAllowed(requestingVehicle, path)) {
continue;
}
double finalCost = baseCost * congestionFactor;
graph.addEdge(path.getSourcePoint(),
path.getDestinationPoint(),
finalCost);
}
return graph;
}
private int countVehiclesOnPath(Path path) {
int count = 0;
for (Vehicle v : vehicleService.getAllVehicles()) {
if (v.getCurrentPosition().equals(path.getSourcePoint()) ||
v.getCurrentPosition().equals(path.getDestinationPoint())) {
count++;
}
}
return count;
}
}
十五、openTCS 与 MAPF 的关系
15.1 什么是 MAPF?
MAPF(Multi-Agent Path Finding)是学术界研究的多智能体路径规划问题:
给定:
- 一张地图
- N 个智能体
- 每个智能体有起点和终点
求:
- 每个智能体的路径
- 所有路径不冲突(不会碰撞)
- 某种目标最优(总路径最短 / 最大完成时间最短)
15.2 MAPF 的经典算法
| 算法 | 思想 | 特点 |
|---|---|---|
| CBS(Conflict-Based Search) | 先独立规划,发现冲突后添加约束重新规划 | 最优解,但慢 |
| ECBS | CBS 的有界次优版本 | 快,解质量有保证 |
| WHCA*(Windowed Hierarchical A*) | 时间窗口内的协同 A* | 快,但不保证最优 |
| Priority-Based | 给智能体排优先级,高优先级先规划 | 快,但可能不公平 |
| ORCA | 基于速度障碍的分布式避碰 | 实时,但适合开放空间 |
15.3 openTCS 不是 MAPF 系统
这是很多人的误解。
openTCS 和 MAPF 的核心区别:
| 维度 | MAPF | openTCS |
|---|---|---|
| 规划时机 | 所有智能体同时规划 | 一台车一台车地规划 |
| 冲突处理 | 规划阶段就消除冲突 | 执行阶段通过资源锁避免冲突 |
| 最优性 | 追求全局最优 | 追求稳定可行 |
| 动态性 | 通常假设静态场景 | 天然支持动态(新任务、故障) |
| 规模 | 学术实验通常 < 100 智能体 | 工业部署 10-100 台 |
| 重规划 | 全部重新计算 | 只重新计算受影响的车辆 |
15.4 为什么 openTCS 不用 MAPF?
原因 1:动态性
MAPF 假设:
所有任务在开始时就已知
一次性规划所有路径
工业现实:
任务随时到达
车辆随时故障
路径随时被封
→ 需要持续的、增量的调度
→ 不适合"一次性全局规划"
原因 2:计算时间
CBS 求解 50 个智能体的最优解:
可能需要几秒到几分钟
工业系统的要求:
新任务到达后,必须在 100ms 内响应
→ 不能等待全局最优解
原因 3:稳定性 vs 最优性
MAPF 追求:
"所有车的总路径最短"
工业系统追求:
"系统 7×24 稳定运行,不出事故"
这两个目标有时是矛盾的:
最优路径可能让多台车走同一条路(因为那条路最短)
→ 虽然理论最优,但实际上增加了冲突风险
稳定的做法是让车辆分散在不同路径上
→ 虽然不是理论最优,但冲突少、系统稳定
原因 4:工程复杂度
MAPF 算法实现复杂度高
调试困难(全局规划,一个 bug 影响所有车)
边界情况多(新任务到达时如何融入已有规划?)
openTCS 的方法:
每台车独立规划路径
通过资源锁避免冲突
逻辑简单、可预测、容易调试
15.5 两种方法的适用场景
| 场景 | 更适合 MAPF | 更适合 openTCS 方法 |
|---|---|---|
| 高密度仓库(100+ AGV) | ✓ | |
| 任务批量到达 | ✓ | |
| 追求吞吐量极限 | ✓ | |
| 任务动态到达 | ✓ | |
| 异构车队 | ✓ | |
| 需要高稳定性 | ✓ | |
| 需要快速响应 | ✓ | |
| 车辆数量适中(10-50) | ✓ |
15.6 融合方案
实际上,现代系统越来越倾向于融合两种方法:
混合架构:
层 1:全局规划(MAPF 思想)
- 每隔 N 秒,对所有活跃车辆做一次全局路径规划
- 消除明显的冲突
- 优化整体效率
层 2:实时调度(openTCS 思想)
- 资源锁保证安全
- 处理动态变化(新任务、故障)
- 局部重规划
效果:
- 全局规划提供"大方向"
- 资源锁保证"安全底线"
- 两者互补
十六、openTCS 的实际部署架构
16.1 单体部署
最简单的部署方式:
┌─────────────────────────────────┐
│ 单台服务器 │
│ │
│ ┌─────────────────────────┐ │
│ │ openTCS Kernel │ │
│ │ (Dispatcher + Router │ │
│ │ + Scheduler + Drivers)│ │
│ └─────────────────────────┘ │
│ │
│ ┌─────────────────────────┐ │
│ │ Plant Overview │ │
│ │ (Web UI / Swing UI) │ │
│ └─────────────────────────┘ │
│ │
└─────────────────────────────────┘
│
│ TCP/MQTT/HTTP
│
┌────┴────┐
│ AGVs │
└─────────┘
适用于:
- 小规模部署(< 20 台 AGV)
- 开发和测试环境
- 比赛演示
16.2 高可用部署
工业环境中的部署:
┌──────────────────────────────────────────────┐
│ 主服务器 │
│ ┌──────────────────────────────────────┐ │
│ │ openTCS Kernel (Active) │ │
│ └──────────────────────────────────────┘ │
└──────────────────────────────────────────────┘
│ │
│ 状态同步 │ 心跳
│ │
┌──────────────────────────────────────────────┐
│ 备服务器 │
│ ┌──────────────────────────────────────┐ │
│ │ openTCS Kernel (Standby) │ │
│ └──────────────────────────────────────┘ │
└──────────────────────────────────────────────┘
主服务器故障时:
备服务器自动接管
AGV 重新连接到备服务器
任务继续执行
16.3 与外部系统的集成
┌─────────┐ ┌─────────┐ ┌─────────┐
│ ERP │ │ MES │ │ WMS │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└───────────────┼───────────────┘
│
┌──────┴──────┐
│ REST API │
│ / Web Hook │
└──────┬──────┘
│
┌──────┴──────┐
│ openTCS │
│ Kernel │
└──────┬──────┘
│
┌──────┴──────┐
│ MQTT Broker │
└──────┬──────┘
│
┌───────────┼───────────┐
│ │ │
┌────┴───┐ ┌────┴───┐ ┌───┴────┐
│ AGV-1 │ │ AGV-2 │ │ AGV-3 │
└────────┘ └────────┘ └────────┘
集成接口:
| 接口 | 方向 | 用途 |
|---|---|---|
| REST API | 外部 → openTCS | 提交订单、查询状态 |
| WebSocket | openTCS → 外部 | 实时状态推送 |
| MQTT | openTCS ↔ AGV | 车辆通信 |
| Database | openTCS → DB | 日志、历史记录 |
16.4 监控和运维
工业系统必须有完善的监控:
监控维度:
系统级:
- Kernel CPU/内存使用率
- 消息队列积压
- API 响应时间
业务级:
- 当前活跃任务数
- 平均任务完成时间
- 车辆利用率
- 死锁发生次数
- 任务失败率
车辆级:
- 每台车的位置
- 每台车的状态
- 每台车的电量
- 每台车的错误历史
- 每台车的行驶里程
十七、如果你想做自己的 AGV 调度系统
17.1 核心设计原则
从 openTCS 中可以提炼出以下设计原则:
原则 1:中央调度
不要:
每辆车自己决定去哪
车辆之间互相协商
而是:
统一的中央大脑做所有决策
车辆只负责执行
原因:
全局最优、确定性、容易调试、容易保证安全
原则 2:资源预约
不要:
碰到障碍再停下
检测到冲突再处理
而是:
进入之前先申请资源
申请不到就等待或绕路
原因:
从逻辑上保证不会碰撞
不依赖传感器的可靠性
可以提前检测死锁
原则 3:状态机驱动
不要:
只维护车辆位置
用 if-else 判断车辆能不能接任务
而是:
维护完整的状态机
状态转换有明确的规则
每个状态有明确的允许操作
原因:
防止非法操作
便于故障恢复
系统行为可预测
原则 4:分层解耦
不要:
调度逻辑和通信逻辑混在一起
路径规划和资源管理混在一起
而是:
明确分层:
调度层(Dispatcher)
规划层(Router)
资源层(Scheduler)
通信层(Driver)
原因:
每层可以独立开发、测试、替换
逻辑清晰,容易维护
原则 5:事件驱动
不要:
每隔 1 秒轮询一次所有状态
定时触发调度
而是:
状态变化时立即触发相应逻辑
新任务到达 → 触发调度
车辆空闲 → 触发调度
资源释放 → 通知等待的车辆
原因:
响应快
资源消耗低
逻辑清晰
17.2 一个适合比赛的简化版架构
如果你们做软件设计大赛或课程项目,可以这样设计:
┌─────────────────────────────────────────────────┐
│ Frontend (Vue) │
│ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ 地图可视化│ │ 任务面板 │ │ 车辆状态面板 │ │
│ └──────────┘ └──────────┘ └──────────────┘ │
└─────────────────────┬───────────────────────────┘
│ WebSocket / REST
┌─────────────────────┴───────────────────────────┐
│ Backend (Python) │
│ │
│ ┌────────────────┐ │
│ │ Task Manager │ ← 接收和管理运输订单 │
│ └───────┬────────┘ │
│ │ │
│ ┌───────┴────────┐ │
│ │ Dispatcher │ ← 选择最优车辆 │
│ └───────┬────────┘ │
│ │ │
│ ┌───────┴────────┐ │
│ │ Path Planner │ ← A*/Dijkstra/WHCA* │
│ └───────┬────────┘ │
│ │ │
│ ┌───────┴────────┐ │
│ │ Scheduler │ ← 资源锁管理 │
│ └───────┬────────┘ │
│ │ │
│ ┌───────┴────────┐ │
│ │ AGV Adapter │ ← 统一通信接口 │
│ └───────┬────────┘ │
│ │ │
└──────────┼───────────────────────────────────────┘
│
┌──────────┴───────────────────────────────────────┐
│ Simulation Engine (Pygame/PyQt) │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │AGV-1 │ │AGV-2 │ │AGV-3 │ │AGV-4 │ │
│ └──────┘ └──────┘ └──────┘ └──────┘ │
└──────────────────────────────────────────────────┘
17.3 技术栈推荐
| 模块 | 推荐技术 | 原因 |
|---|---|---|
| 后端 | Python + FastAPI | 开发快,异步支持好 |
| 前端 | Vue 3 + Canvas/SVG | 轻量,可视化方便 |
| 通信 | WebSocket | 实时双向通信 |
| 仿真 | Pygame 或 PyQt | Python 生态,容易集成 |
| 路径算法 | NetworkX | Python 图算法库,开箱即用 |
| 消息队列 | Redis Pub/Sub | 轻量,适合事件驱动 |
17.4 核心数据结构
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
# === 地图模型 ===
@dataclass
class Point:
id: str
x: float
y: float
type: str # "halt" | "park" | "charge"
@dataclass
class Path:
id: str
source: str # Point ID
destination: str # Point ID
length: float
max_speed: float
bidirectional: bool = False
locked: bool = False
@dataclass
class Location:
id: str
point_id: str
type: str # "pickup" | "dropoff" | "charge"
operations: list[str] = field(default_factory=list)
# === 车辆模型 ===
class VehicleState(Enum):
IDLE = "idle"
EXECUTING = "executing"
WAITING = "waiting"
CHARGING = "charging"
ERROR = "error"
UNAVAILABLE = "unavailable"
@dataclass
class Vehicle:
id: str
current_position: str # Point ID
state: VehicleState = VehicleState.IDLE
energy_level: float = 100.0
current_order: Optional[str] = None
def can_accept_order(self) -> bool:
return (self.state == VehicleState.IDLE and
self.energy_level > 20.0)
# === 任务模型 ===
class OrderState(Enum):
PENDING = "pending"
DISPATCHED = "dispatched"
EXECUTING = "executing"
FINISHED = "finished"
FAILED = "failed"
@dataclass
class TransportOrder:
id: str
source_location: str # Location ID(取货点)
destination_location: str # Location ID(放货点)
priority: int = 5
state: OrderState = OrderState.PENDING
assigned_vehicle: Optional[str] = None
created_at: float = 0.0
deadline: Optional[float] = None
# === 资源锁模型 ===
@dataclass
class ResourceLock:
resource_id: str # Point ID 或 Path ID
vehicle_id: str # 占用该资源的车辆
lock_type: str = "exclusive" # "exclusive" | "shared"
timestamp: float = 0.0
17.5 Scheduler 的简化实现
class Scheduler:
"""资源锁管理器"""
def __init__(self):
self.locks: dict[str, ResourceLock] = {} # resource_id → lock
self.waiting_queue: dict[str, list[str]] = {} # resource_id → [vehicle_ids]
def try_allocate(self, vehicle_id: str, resources: list[str]) -> bool:
"""
尝试为车辆分配一组资源。
要么全部成功,要么全部失败(原子性)。
"""
# 检查所有资源是否可用
for resource_id in resources:
if resource_id in self.locks:
existing_lock = self.locks[resource_id]
if existing_lock.vehicle_id != vehicle_id:
# 资源被其他车辆占用
return False
# 全部可用,执行分配
for resource_id in resources:
self.locks[resource_id] = ResourceLock(
resource_id=resource_id,
vehicle_id=vehicle_id,
timestamp=time.time()
)
return True
def release(self, vehicle_id: str, resources: list[str]):
"""释放车辆占用的资源"""
for resource_id in resources:
if resource_id in self.locks:
lock = self.locks[resource_id]
if lock.vehicle_id == vehicle_id:
del self.locks[resource_id]
# 通知等待该资源的车辆
self._notify_waiting(resource_id)
def release_all(self, vehicle_id: str):
"""释放某车辆的所有资源"""
to_release = [
rid for rid, lock in self.locks.items()
if lock.vehicle_id == vehicle_id
]
self.release(vehicle_id, to_release)
def get_locked_resources(self, vehicle_id: str) -> list[str]:
"""获取某车辆占用的所有资源"""
return [
rid for rid, lock in self.locks.items()
if lock.vehicle_id == vehicle_id
]
def is_available(self, resource_id: str, requesting_vehicle: str) -> bool:
"""检查资源是否对某车辆可用"""
if resource_id not in self.locks:
return True
return self.locks[resource_id].vehicle_id == requesting_vehicle
def _notify_waiting(self, resource_id: str):
"""资源释放后,通知等待队列中的车辆"""
if resource_id in self.waiting_queue:
waiting = self.waiting_queue[resource_id]
if waiting:
next_vehicle = waiting.pop(0)
# 触发该车辆重新尝试分配
self._trigger_retry(next_vehicle)
def check_deadlock(self) -> list[list[str]]:
"""
检测死锁:寻找等待环。
返回所有死锁环中的车辆列表。
"""
# 构建等待图:vehicle_A → vehicle_B 表示 A 在等待 B 占用的资源
wait_graph: dict[str, set[str]] = {}
for resource_id, waiters in self.waiting_queue.items():
if resource_id in self.locks:
holder = self.locks[resource_id].vehicle_id
for waiter in waiters:
if waiter not in wait_graph:
wait_graph[waiter] = set()
wait_graph[waiter].add(holder)
# DFS 找环
cycles = self._find_cycles(wait_graph)
return cycles
def _find_cycles(self, graph: dict[str, set[str]]) -> list[list[str]]:
"""在有向图中找所有环"""
visited = set()
rec_stack = set()
cycles = []
def dfs(node, path):
visited.add(node)
rec_stack.add(node)
path.append(node)
for neighbor in graph.get(node, set()):
if neighbor not in visited:
dfs(neighbor, path)
elif neighbor in rec_stack:
# 找到环
cycle_start = path.index(neighbor)
cycles.append(path[cycle_start:])
path.pop()
rec_stack.remove(node)
for node in graph:
if node not in visited:
dfs(node, [])
return cycles
17.6 Dispatcher 的简化实现
class Dispatcher:
"""任务分配器"""
def __init__(self, router: 'Router', scheduler: Scheduler,
vehicles: dict[str, Vehicle], orders: dict[str, TransportOrder]):
self.router = router
self.scheduler = scheduler
self.vehicles = vehicles
self.orders = orders
def dispatch(self):
"""
核心调度逻辑:将待分配的订单分配给合适的车辆。
触发时机:新订单到达、车辆变为空闲、车辆故障。
"""
pending_orders = self._get_pending_orders()
available_vehicles = self._get_available_vehicles()
if not pending_orders or not available_vehicles:
return
# 按优先级排序订单(高优先级先分配)
pending_orders.sort(key=lambda o: -o.priority)
for order in pending_orders:
best_vehicle = self._find_best_vehicle(order, available_vehicles)
if best_vehicle:
self._assign_order(order, best_vehicle)
available_vehicles.remove(best_vehicle)
def _get_pending_orders(self) -> list[TransportOrder]:
return [o for o in self.orders.values() if o.state == OrderState.PENDING]
def _get_available_vehicles(self) -> list[Vehicle]:
return [v for v in self.vehicles.values() if v.can_accept_order()]
def _find_best_vehicle(self, order: TransportOrder,
candidates: list[Vehicle]) -> Optional[Vehicle]:
"""为订单找到最优车辆"""
best_vehicle = None
best_cost = float('inf')
for vehicle in candidates:
# 计算从车辆当前位置到取货点的代价
cost = self.router.get_cost(
vehicle.current_position,
order.source_location
)
if cost is None:
continue # 无法到达
# 检查电量是否足够
total_distance = cost + self.router.get_cost(
order.source_location,
order.destination_location
)
if total_distance is None:
continue
energy_needed = self._estimate_energy(total_distance)
if vehicle.energy_level < energy_needed * 1.3: # 30% 余量
continue
# 拥堵惩罚
congestion_factor = self._get_congestion_factor(
vehicle.current_position, order.source_location
)
adjusted_cost = cost * congestion_factor
if adjusted_cost < best_cost:
best_cost = adjusted_cost
best_vehicle = vehicle
return best_vehicle
def _assign_order(self, order: TransportOrder, vehicle: Vehicle):
"""将订单分配给车辆"""
order.state = OrderState.DISPATCHED
order.assigned_vehicle = vehicle.id
vehicle.state = VehicleState.EXECUTING
vehicle.current_order = order.id
# 计算完整路径
route = self.router.compute_route(
vehicle.current_position,
order.source_location,
order.destination_location
)
# 开始执行
self._execute_route(vehicle, route, order)
def _estimate_energy(self, distance: float) -> float:
"""估算能耗:每米 0.1% 电量"""
return distance * 0.1
def _get_congestion_factor(self, source: str, destination: str) -> float:
"""计算路径拥堵因子"""
path = self.router.get_path(source, destination)
if not path:
return 1.0
vehicles_on_path = sum(
1 for v in self.vehicles.values()
if v.current_position in path and v.state == VehicleState.EXECUTING
)
return 1.0 + vehicles_on_path * 0.3
17.7 Router 的简化实现
import networkx as nx
class Router:
"""路径规划器"""
def __init__(self, points: dict[str, Point], paths: list[Path]):
self.graph = nx.DiGraph()
self._build_graph(points, paths)
def _build_graph(self, points: dict[str, Point], paths: list[Path]):
"""构建路网图"""
for point in points.values():
self.graph.add_node(point.id, x=point.x, y=point.y, type=point.type)
for path in paths:
if not path.locked:
self.graph.add_edge(
path.source,
path.destination,
weight=path.length,
max_speed=path.max_speed,
path_id=path.id
)
if path.bidirectional:
self.graph.add_edge(
path.destination,
path.source,
weight=path.length,
max_speed=path.max_speed,
path_id=path.id + "_rev"
)
def get_cost(self, source: str, destination: str) -> Optional[float]:
"""计算两点之间的最短路径代价"""
try:
return nx.shortest_path_length(
self.graph, source, destination, weight='weight'
)
except nx.NetworkXNoPath:
return None
def get_path(self, source: str, destination: str) -> Optional[list[str]]:
"""计算两点之间的最短路径(节点序列)"""
try:
return nx.shortest_path(
self.graph, source, destination, weight='weight'
)
except nx.NetworkXNoPath:
return None
def compute_route(self, current_pos: str, pickup: str,
dropoff: str) -> Optional[list[str]]:
"""计算完整路线:当前位置 → 取货点 → 放货点"""
path_to_pickup = self.get_path(current_pos, pickup)
path_to_dropoff = self.get_path(pickup, dropoff)
if path_to_pickup is None or path_to_dropoff is None:
return None
# 合并路径(去掉重复的中间点)
full_path = path_to_pickup + path_to_dropoff[1:]
return full_path
def compute_route_avoiding(self, source: str, destination: str,
avoid_points: set[str]) -> Optional[list[str]]:
"""计算避开某些点的路径(用于绕行)"""
# 创建临时图,移除需要避开的节点
temp_graph = self.graph.copy()
for point in avoid_points:
if point in temp_graph:
temp_graph.remove_node(point)
try:
return nx.shortest_path(temp_graph, source, destination, weight='weight')
except nx.NetworkXNoPath:
return None
def update_edge_weight(self, source: str, destination: str, new_weight: float):
"""动态更新边的权重(用于拥堵感知)"""
if self.graph.has_edge(source, destination):
self.graph[source][destination]['weight'] = new_weight
def lock_path(self, source: str, destination: str):
"""锁定路径(不可通行)"""
if self.graph.has_edge(source, destination):
self.graph.remove_edge(source, destination)
def unlock_path(self, source: str, destination: str, weight: float):
"""解锁路径"""
self.graph.add_edge(source, destination, weight=weight)
17.8 执行引擎的简化实现
import asyncio
class ExecutionEngine:
"""
执行引擎:协调 Scheduler 和 Vehicle Driver,
逐步执行车辆的路径。
"""
def __init__(self, scheduler: Scheduler, vehicles: dict[str, Vehicle]):
self.scheduler = scheduler
self.vehicles = vehicles
self.look_ahead = 2 # 提前锁定 2 步
async def execute_route(self, vehicle_id: str, route: list[str],
order: TransportOrder):
"""执行一条完整路径"""
vehicle = self.vehicles[vehicle_id]
for i in range(len(route) - 1):
current = route[i]
next_point = route[i + 1]
# 计算需要锁定的资源(当前段 + look-ahead)
resources_needed = self._get_resources_for_step(route, i)
# 尝试获取资源
success = await self._acquire_resources_with_retry(
vehicle_id, resources_needed
)
if not success:
# 获取资源失败(超时),尝试重新规划
await self._handle_blocked(vehicle, route, i, order)
return
# 资源获取成功,执行移动
await self._move_vehicle(vehicle, next_point)
# 释放已经离开的资源
if i > 0:
previous = route[i - 1]
path_id = f"{previous}_{current}"
self.scheduler.release(vehicle_id, [previous, path_id])
# 路径执行完成
self._complete_order(vehicle, order)
def _get_resources_for_step(self, route: list[str],
current_index: int) -> list[str]:
"""计算当前步骤需要锁定的资源"""
resources = []
end_index = min(current_index + self.look_ahead + 1, len(route))
for i in range(current_index, end_index):
resources.append(route[i]) # Point
if i < end_index - 1:
path_id = f"{route[i]}_{route[i+1]}"
resources.append(path_id) # Path
return resources
async def _acquire_resources_with_retry(self, vehicle_id: str,
resources: list[str],
timeout: float = 30.0) -> bool:
"""带重试的资源获取"""
start_time = asyncio.get_event_loop().time()
while True:
if self.scheduler.try_allocate(vehicle_id, resources):
return True
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed > timeout:
return False
# 等待一小段时间后重试
await asyncio.sleep(0.5)
async def _move_vehicle(self, vehicle: Vehicle, target_point: str):
"""
模拟车辆移动。
在实际系统中,这里会通过 Driver 发送指令给真实 AGV,
然后等待 AGV 上报到达。
"""
# 模拟移动时间(根据距离计算)
move_time = 2.0 # 简化:固定 2 秒
await asyncio.sleep(move_time)
# 更新车辆位置
vehicle.current_position = target_point
async def _handle_blocked(self, vehicle: Vehicle, route: list[str],
blocked_at: int, order: TransportOrder):
"""处理路径被阻塞的情况"""
# 策略 1:等待(已经在 acquire 中等了 timeout)
# 策略 2:重新规划绕行路径
blocked_point = route[blocked_at + 1]
# 尝试绕行
new_route = self.router.compute_route_avoiding(
vehicle.current_position,
route[-1], # 最终目的地
avoid_points={blocked_point}
)
if new_route:
# 用新路径继续执行
await self.execute_route(vehicle.id, new_route, order)
else:
# 无法绕行,标记任务失败
order.state = OrderState.FAILED
vehicle.state = VehicleState.IDLE
vehicle.current_order = None
def _complete_order(self, vehicle: Vehicle, order: TransportOrder):
"""完成订单"""
order.state = OrderState.FINISHED
vehicle.state = VehicleState.IDLE
vehicle.current_order = None
# 释放所有剩余资源(只保留当前位置)
all_resources = self.scheduler.get_locked_resources(vehicle.id)
resources_to_release = [
r for r in all_resources if r != vehicle.current_position
]
self.scheduler.release(vehicle.id, resources_to_release)
17.9 把它们组装在一起
class AGVControlSystem:
"""AGV 调度系统主类"""
def __init__(self):
# 加载地图
self.points, self.paths, self.locations = self._load_map()
# 初始化核心模块
self.router = Router(self.points, self.paths)
self.scheduler = Scheduler()
self.vehicles: dict[str, Vehicle] = {}
self.orders: dict[str, TransportOrder] = {}
self.dispatcher = Dispatcher(
self.router, self.scheduler, self.vehicles, self.orders
)
self.execution_engine = ExecutionEngine(self.scheduler, self.vehicles)
# 事件处理
self.event_handlers = {
'new_order': self._on_new_order,
'vehicle_idle': self._on_vehicle_idle,
'vehicle_error': self._on_vehicle_error,
}
def add_vehicle(self, vehicle_id: str, position: str):
"""注册新车辆"""
vehicle = Vehicle(id=vehicle_id, current_position=position)
self.vehicles[vehicle_id] = vehicle
# 锁定车辆当前位置
self.scheduler.try_allocate(vehicle_id, [position])
def submit_order(self, order: TransportOrder):
"""提交新订单"""
self.orders[order.id] = order
self._trigger_event('new_order', order)
def _on_new_order(self, order: TransportOrder):
"""新订单到达时触发"""
self.dispatcher.dispatch()
def _on_vehicle_idle(self, vehicle: Vehicle):
"""车辆变为空闲时触发"""
self.dispatcher.dispatch()
def _on_vehicle_error(self, vehicle: Vehicle):
"""车辆故障时触发"""
# 重新分配该车辆的任务
if vehicle.current_order:
order = self.orders[vehicle.current_order]
order.state = OrderState.PENDING
order.assigned_vehicle = None
vehicle.current_order = None
# 触发重新调度
self.dispatcher.dispatch()
def _trigger_event(self, event_type: str, data):
"""触发事件"""
handler = self.event_handlers.get(event_type)
if handler:
handler(data)
async def run(self):
"""主循环"""
print("AGV Control System started.")
# 定期检查死锁
while True:
cycles = self.scheduler.check_deadlock()
if cycles:
print(f"Deadlock detected! Vehicles involved: {cycles}")
self._resolve_deadlock(cycles)
await asyncio.sleep(5) # 每 5 秒检查一次
def _resolve_deadlock(self, cycles: list[list[str]]):
"""解决死锁"""
for cycle in cycles:
# 简单策略:让优先级最低的车辆让路
vehicles_in_cycle = [self.vehicles[vid] for vid in cycle]
victim = min(vehicles_in_cycle,
key=lambda v: self.orders.get(v.current_order,
TransportOrder(id='', source_location='',
destination_location='')).priority)
# 让 victim 释放资源并重新规划
self.scheduler.release_all(victim.id)
# 保留当前位置
self.scheduler.try_allocate(victim.id, [victim.current_position])
# 重新规划路径
if victim.current_order:
order = self.orders[victim.current_order]
order.state = OrderState.PENDING
victim.state = VehicleState.IDLE
victim.current_order = None
self.dispatcher.dispatch()
17.10 使用示例
async def main():
# 创建系统
system = AGVControlSystem()
# 注册车辆
system.add_vehicle("AGV-1", "P1")
system.add_vehicle("AGV-2", "P6")
system.add_vehicle("AGV-3", "P9")
# 提交订单
system.submit_order(TransportOrder(
id="order_001",
source_location="W1", # 从工位 1 取货
destination_location="W3", # 送到工位 3
priority=5
))
system.submit_order(TransportOrder(
id="order_002",
source_location="W2",
destination_location="W4",
priority=8 # 高优先级
))
# 启动系统
await system.run()
if __name__ == "__main__":
asyncio.run(main())
十八、总结:从 openTCS 中学到的核心思想
18.1 架构层面
| 思想 | 说明 | 为什么重要 |
|---|---|---|
| 中央调度 | 一个大脑管所有车 | 全局最优、确定性、安全 |
| 分层解耦 | 调度/规划/资源/通信分离 | 可维护、可替换、可测试 |
| 插件化 | 核心模块可替换 | 适应不同场景 |
| 事件驱动 | 状态变化触发逻辑 | 响应快、资源省 |
18.2 算法层面
| 思想 | 说明 | 为什么重要 |
|---|---|---|
| 资源预约 | 进入前先锁定 | 逻辑安全保证 |
| 图模型 | 离散化的路网 | 资源可管理 |
| 代价模型 | 不只是最短路 | 综合考虑多因素 |
| 动态重规划 | 路径可以随时调整 | 应对变化 |
18.3 工程层面
| 思想 | 说明 | 为什么重要 |
|---|---|---|
| 状态机 | 完整的状态管理 | 防止非法操作 |
| 原子操作 | 资源分配全部成功或全部失败 | 一致性 |
| 超时机制 | 等待有上限 | 避免无限阻塞 |
| 故障恢复 | 任何组件故障都有应对 | 7×24 运行 |
| 可观测性 | 所有状态可查询 | 运维友好 |
18.4 一句话总结
openTCS 教给我们的最重要的一课是:
多车协同的核心不是"路径规划算法",而是"资源管理系统"。
算出路径只是第一步。
保证几十台车在同一个空间里安全、高效、不死锁地运行——
这才是真正的工程挑战。
而解决这个挑战的关键,是一套严谨的资源预约和释放机制。
这就是 openTCS 的完整技术解析。从架构设计到核心算法,从理论原理到代码实现,希望这篇文章能帮你建立对工业 AGV 调度系统的完整认知。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)