声明:本教程基于 Python 3.10+ 环境编写,代码示例使用 PyTorch 框架实现。建议读者具备单智能体强化学习(DQN、PPO)的基础知识。


引言:从单打独斗到群雄逐鹿

想象这样一个场景:一群无人机需要协同完成搜救任务,它们必须分工合作、共享信息,同时避免相互碰撞。又或者想象一场足球比赛,每个球员既要与队友配合,又要对抗对手。这些场景的共同特点是——多个决策者在同一环境中交互

这就是**多智能体强化学习(Multi-Agent Reinforcement Learning, MARL)**研究的核心问题。与单智能体强化学习不同,MARL中的每个智能体不仅要学习环境 dynamics,还要建模其他智能体的行为。这种复杂性使得MARL成为当前人工智能研究中最具挑战性和前景的领域之一。

从自动驾驶车队协调到分布式机器人控制,从游戏AI到智能交通系统,MARL正在改变我们解决复杂群体智能问题的方式。2024年以来,随着大语言模型与多智能体系统的结合,MARL迎来了新的研究热潮。


1. 多智能体环境的形式化定义

1.1 随机博弈(Stochastic Game)框架

多智能体环境通常被建模为随机博弈,也称为马尔可夫博弈(Markov Game)。这是单智能体MDP向多智能体的自然扩展。

一个n智能体的马尔可夫博弈由以下元组定义:

G=⟨N,S,{Ai}i=1n,P,{Ri}i=1n,γ⟩\mathcal{G} = \langle \mathcal{N}, \mathcal{S}, \{\mathcal{A}_i\}_{i=1}^n, \mathcal{P}, \{R_i\}_{i=1}^n, \gamma \rangleG=N,S,{Ai}i=1n,P,{Ri}i=1n,γ

其中:

  • N={1,2,...,n}\mathcal{N} = \{1, 2, ..., n\}N={1,2,...,n}:智能体集合,表示环境中所有决策者的索引
  • S\mathcal{S}S:全局状态空间,描述环境的完整状态
  • Ai\mathcal{A}_iAi:第i个智能体的动作空间,所有智能体的联合动作空间为 A=A1×A2×...×An\mathcal{A} = \mathcal{A}_1 \times \mathcal{A}_2 \times ... \times \mathcal{A}_nA=A1×A2×...×An
  • P:S×A×S→[0,1]\mathcal{P}: \mathcal{S} \times \mathcal{A} \times \mathcal{S} \rightarrow [0, 1]P:S×A×S[0,1]:状态转移概率函数,P(s′∣s,a)\mathcal{P}(s'|s, \mathbf{a})P(ss,a) 表示在状态s执行联合动作a=(a1,...,an)\mathbf{a} = (a_1, ..., a_n)a=(a1,...,an)后转移到状态s’的概率
  • Ri:S×A×S→RR_i: \mathcal{S} \times \mathcal{A} \times \mathcal{S} \rightarrow \mathbb{R}Ri:S×A×SR:第i个智能体的奖励函数
  • γ∈[0,1)\gamma \in [0, 1)γ[0,1):折扣因子

1.2 观测与部分可观测性

在实际应用中,智能体通常无法直接观察全局状态,而是只能获得局部观测。这引出了**部分可观测随机博弈(Partially Observable Stochastic Game, POSG)**的概念:

GPO=⟨N,S,{Ai},P,{Ri},{Oi},{Ωi},γ⟩\mathcal{G}_{PO} = \langle \mathcal{N}, \mathcal{S}, \{\mathcal{A}_i\}, \mathcal{P}, \{R_i\}, \{\mathcal{O}_i\}, \{\Omega_i\}, \gamma \rangleGPO=N,S,{Ai},P,{Ri},{Oi},{Ωi},γ

其中:

  • Oi\mathcal{O}_iOi:第i个智能体的观测空间
  • Ωi:S×A→Δ(Oi)\Omega_i: \mathcal{S} \times \mathcal{A} \rightarrow \Delta(\mathcal{O}_i)Ωi:S×AΔ(Oi):观测函数,定义了给定状态下智能体获得特定观测的概率分布

每个智能体维护一个观测历史 τi=(oi0,ai0,oi1,ai1,...,oit)\tau_i = (o_i^0, a_i^0, o_i^1, a_i^1, ..., o_i^t)τi=(oi0,ai0,oi1,ai1,...,oit),并根据历史选择动作。智能体的策略表示为 πi(ai∣τi)\pi_i(a_i | \tau_i)πi(aiτi)

1.3 多智能体环境的分类

根据智能体之间的关系,多智能体环境可以分为以下几类:

环境类型 特征描述 典型场景
完全合作 所有智能体共享相同奖励函数,目标完全一致 机器人协作搬运、无人机编队
完全竞争 智能体奖励之和为零,一方所得即另一方所失 围棋、扑克、零和博弈
混合博弈 既有合作又有竞争,奖励函数部分重叠 足球比赛、自动驾驶
独立学习 智能体各自优化自身目标,不考虑他人 资源分配、交通规划

理解环境类型对于选择合适的MARL算法至关重要。完全合作环境可以使用值分解方法,而竞争环境则需要博弈论的分析工具。


2. 博弈论基础:纳什均衡与帕累托最优

2.1 纳什均衡(Nash Equilibrium)

在多智能体系统中,每个智能体的最优策略依赖于其他智能体的策略。这种相互依赖性使得我们需要博弈论的概念来分析系统行为。

纳什均衡是博弈论中最重要的解概念之一。在一个策略组合 π∗=(π1∗,...,πn∗)\pi^* = (\pi_1^*, ..., \pi_n^*)π=(π1,...,πn) 中,如果对于每个智能体i,都有:

Vi(πi∗,π−i∗)≥Vi(πi,π−i∗),∀πi∈ΠiV_i(\pi_i^*, \pi_{-i}^*) \geq V_i(\pi_i, \pi_{-i}^*), \quad \forall \pi_i \in \Pi_iVi(πi,πi)Vi(πi,πi),πiΠi

则称 π∗\pi^*π 为一个纳什均衡。其中 π−i∗=(π1∗,...,πi−1∗,πi+1∗,...,πn∗)\pi_{-i}^* = (\pi_1^*, ..., \pi_{i-1}^*, \pi_{i+1}^*, ..., \pi_n^*)πi=(π1,...,πi1,πi+1,...,πn) 表示除i外所有智能体的策略。

直观理解:在纳什均衡状态下,没有任何一个智能体能够通过单方面改变策略来获得更高的收益。每个智能体的策略都是对其他智能体策略的最佳回应。

存在性定理(纳什,1950):任何有限n人标准型博弈至少存在一个纳什均衡(可能是混合策略均衡)。

2.2 帕累托最优(Pareto Optimality)

纳什均衡关注的是个体理性,而帕累托最优关注的是集体效率。一个策略组合 π\piπ 是帕累托最优的,如果不存在其他策略组合 π′\pi'π 使得:

Vi(π′)≥Vi(π),∀i∈NV_i(\pi') \geq V_i(\pi), \quad \forall i \in \mathcal{N}Vi(π)Vi(π),iN

且至少有一个严格不等式成立。

直观理解:帕累托最优意味着无法在不损害任何智能体利益的前提下改善至少一个智能体的收益。这是一种"没有浪费"的状态。

2.3 社会困境与均衡选择

纳什均衡与帕累托最优并不总是一致的。经典的囚徒困境展示了这一点:

合作 背叛
合作 (-1, -1) (-3, 0)
背叛 (0, -3) (-2, -2)

在这个博弈中,(背叛,背叛)是唯一的纳什均衡,但(合作,合作)是帕累托最优的。这种冲突被称为社会困境,是多智能体学习中的核心挑战。

MARL算法设计的一个重要目标就是引导智能体从低效的纳什均衡(如相互背叛)向高效的帕累托最优解(如相互合作)收敛。


3. 独立学习方法

在多智能体学习的早期研究中,最简单的方法是将每个智能体视为独立的学习者,直接应用单智能体算法。

3.1 独立Q学习(Independent Q-Learning, IQL)

IQL是最简单的多智能体学习方法。每个智能体独立运行标准的Q学习算法,将其他智能体视为环境的一部分:

Qi(s,ai)←Qi(s,ai)+α[ri+γmax⁡ai′Qi(s′,ai′)−Qi(s,ai)]Q_i(s, a_i) \leftarrow Q_i(s, a_i) + \alpha [r_i + \gamma \max_{a_i'} Q_i(s', a_i') - Q_i(s, a_i)]Qi(s,ai)Qi(s,ai)+α[ri+γaimaxQi(s,ai)Qi(s,ai)]

优点

  • 实现简单,无需修改单智能体算法
  • 计算复杂度与智能体数量线性增长
  • 适用于大规模多智能体系统

缺点

  • 非平稳性问题:从单个智能体的视角,环境变得非平稳,因为其他智能体的策略在不断变化
  • 缺乏理论收敛保证(在非合作博弈中)
  • 无法利用其他智能体的信息

3.2 值分解网络(Value Decomposition Networks, VDN)

VDN是针对完全合作多智能体问题的重要突破。其核心思想是将联合动作值函数分解为各个智能体值函数的和:

Qtot(s,a)=∑i=1nQi(si,ai;θi)Q_{tot}(\mathbf{s}, \mathbf{a}) = \sum_{i=1}^n Q_i(s_i, a_i; \theta_i)Qtot(s,a)=i=1nQi(si,ai;θi)

其中 sis_isi 是智能体i的局部观测,θi\theta_iθi 是智能体i的网络参数。

关键假设个体全局最大化(IGM)原则

arg⁡max⁡aQtot(s,a)=(arg⁡max⁡a1Q1(s1,a1)⋮arg⁡max⁡anQn(sn,an))\arg\max_{\mathbf{a}} Q_{tot}(\mathbf{s}, \mathbf{a}) = \begin{pmatrix} \arg\max_{a_1} Q_1(s_1, a_1) \\ \vdots \\ \arg\max_{a_n} Q_n(s_n, a_n) \end{pmatrix}argamaxQtot(s,a)= argmaxa1Q1(s1,a1)argmaxanQn(sn,an)

这意味着每个智能体可以基于自己的Q值独立选择动作,而联合动作恰好最大化全局Q值。

训练目标

L(θ)=E(s,a,r,s′)∼D[(y−Qtot(s,a;θ))2]\mathcal{L}(\theta) = \mathbb{E}_{(s, \mathbf{a}, r, s') \sim \mathcal{D}} \left[ \left( y - Q_{tot}(s, \mathbf{a}; \theta) \right)^2 \right]L(θ)=E(s,a,r,s)D[(yQtot(s,a;θ))2]

其中 y=r+γmax⁡a′Qtot(s′,a′;θ−)y = r + \gamma \max_{\mathbf{a}'} Q_{tot}(s', \mathbf{a}'; \theta^-)y=r+γmaxaQtot(s,a;θ)θ−\theta^-θ 是目标网络参数。

3.3 QMIX:单调性约束下的值分解

VDN的简单加性分解限制了其表达能力。QMIX通过引入单调性约束来增强值分解的灵活性。

核心思想:如果全局Q值对每个局部Q值的偏导数非负,即:

∂Qtot∂Qi≥0,∀i\frac{\partial Q_{tot}}{\partial Q_i} \geq 0, \quad \forall iQiQtot0,i

那么IGM原则仍然成立。QMIX使用一个**混合网络(Mixing Network)**来实现这种单调性:

Qtot(s,a)=f(Q1(s1,a1),...,Qn(sn,an);s)Q_{tot}(\mathbf{s}, \mathbf{a}) = f(Q_1(s_1, a_1), ..., Q_n(s_n, a_n); s)Qtot(s,a)=f(Q1(s1,a1),...,Qn(sn,an);s)

混合网络f是一个前馈神经网络,其权重由全局状态s通过超网络(hypernetwork)生成,且限制为非负值。

网络架构

  • 智能体网络:每个智能体一个DRQN(Deep Recurrent Q-Network),输入观测历史,输出Q值
  • 混合网络:接收所有智能体的Q值和全局状态,输出联合Q值
  • 超网络:根据全局状态生成混合网络的权重

QMIX在StarCraft II多智能体挑战(SMAC)基准测试中取得了显著优于VDN的性能,成为合作MARL的事实标准方法之一。


4. 中心化训练去中心化执行(CTDE)

4.1 CTDE范式概述

**中心化训练去中心化执行(Centralized Training with Decentralized Execution, CTDE)**是MARL中最具影响力的训练范式,由MADDPG算法首次系统提出。

核心思想

  • 训练阶段:允许使用全局信息(全局状态、所有智能体的观测和动作),解决非平稳性问题
  • 执行阶段:每个智能体仅基于局部观测做决策,满足实际部署的通信和隐私约束

这种范式巧妙地平衡了学习效率与实际可行性,成为当前MARL算法设计的主流框架。

4.2 MADDPG:多智能体深度确定性策略梯度

MADDPG将DDPG算法扩展到多智能体场景,是CTDE范式的开创性工作。

Actor-Critic架构

  • Actor(策略网络)μi(oi;θi)\mu_i(o_i; \theta_i)μi(oi;θi),仅使用局部观测,用于执行阶段
  • Critic(值网络)Qi(s,a1,...,an;ϕi)Q_i(s, a_1, ..., a_n; \phi_i)Qi(s,a1,...,an;ϕi),使用全局状态和所有智能体的动作,仅用于训练阶段

Critic更新

L(ϕi)=E(s,a,ri,s′)∼D[(Qi(s,a;ϕi)−yi)2]\mathcal{L}(\phi_i) = \mathbb{E}_{(s, \mathbf{a}, r_i, s') \sim \mathcal{D}} \left[ \left( Q_i(s, \mathbf{a}; \phi_i) - y_i \right)^2 \right]L(ϕi)=E(s,a,ri,s)D[(Qi(s,a;ϕi)yi)2]

其中:

yi=ri+γQi(s′,μ1′(o1′),...,μn′(on′);ϕi−)y_i = r_i + \gamma Q_i(s', \mu_1'(o_1'), ..., \mu_n'(o_n'); \phi_i^-)yi=ri+γQi(s,μ1(o1),...,μn(on);ϕi)

Actor更新

∇θiJ(θi)=Es∼D[∇θiμi(oi)∇aiQi(s,a1,...,an)∣ai=μi(oi)]\nabla_{\theta_i} J(\theta_i) = \mathbb{E}_{s \sim \mathcal{D}} \left[ \nabla_{\theta_i} \mu_i(o_i) \nabla_{a_i} Q_i(s, a_1, ..., a_n) |_{a_i=\mu_i(o_i)} \right]θiJ(θi)=EsD[θiμi(oi)aiQi(s,a1,...,an)ai=μi(oi)]

关键创新:通过中心化Critic,MADDPG解决了多智能体环境中的信用分配问题和非平稳性问题。每个智能体的Critic可以明确知道其他智能体的动作,从而更准确地评估当前策略。

4.3 MAPPO:多智能体PPO

MAPPO将PPO算法扩展到多智能体场景,通过CTDE范式实现高效训练。

核心设计

  • 共享Critic:所有智能体共享一个中心化Critic,输入全局状态,输出状态值函数 V(s)V(s)V(s)
  • 独立Actor:每个智能体有自己的策略网络 πi(ai∣oi)\pi_i(a_i|o_i)πi(aioi)

PPO-Clip目标

LCLIP(θ)=Et[min⁡(rt(θ)A^t,clip(rt(θ),1−ϵ,1+ϵ)A^t)]L^{CLIP}(\theta) = \mathbb{E}_t \left[ \min(r_t(\theta) \hat{A}_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon) \hat{A}_t) \right]LCLIP(θ)=Et[min(rt(θ)A^t,clip(rt(θ),1ϵ,1+ϵ)A^t)]

其中 rt(θ)=πθ(at∣ot)πθold(at∣ot)r_t(\theta) = \frac{\pi_{\theta}(a_t|o_t)}{\pi_{\theta_{old}}(a_t|o_t)}rt(θ)=πθold(atot)πθ(atot) 是重要性采样比率,A^t\hat{A}_tA^t 是广义优势估计(GAE)。

MAPPO的优势

  • PPO的稳定性使得MAPPO在大规模多智能体场景中表现优异
  • 相比MADDPG,MAPPO不需要学习确定性策略,探索更充分
  • 在SMAC和MPE等基准测试中,MAPPO往往优于MADDPG

5. 涌现行为与通信学习

5.1 涌现行为(Emergent Behaviors)

在多智能体系统中,涌现行为是指系统整体表现出的、未在个体层面显式编程的复杂行为模式。

典型涌现现象

  • 分工协作:在资源收集任务中,智能体自发形成角色分工
  • 群体智能:鸟群、鱼群式的协调运动模式
  • 工具使用:智能体学会利用环境中的物体作为工具
  • 社会规范:智能体自发形成合作规范

OpenAI的捉迷藏实验是涌现行为的经典案例:在简单的捉迷藏环境中,智能体逐步发展出复杂的策略,包括使用箱子搭建庇护所、利用斜坡移动箱子等,这些行为并未在奖励函数中显式定义。

5.2 通信学习

在多智能体协作中,通信是协调行动的关键。MARL研究如何让智能体学会通信协议

通信模型

  • 离散通信:智能体发送离散符号消息 mi∈{1,...,K}m_i \in \{1, ..., K\}mi{1,...,K}
  • 连续通信:智能体发送连续向量消息 mi∈Rdm_i \in \mathbb{R}^dmiRd

CommNet
CommNet是第一个端到端学习通信的MARL架构。每个智能体的隐藏状态通过通信信道聚合:

hi(l+1)=f(Wh(l)hi(l)+Wc(l)hˉ(l))h_i^{(l+1)} = f(W_h^{(l)} h_i^{(l)} + W_c^{(l)} \bar{h}^{(l)})hi(l+1)=f(Wh(l)hi(l)+Wc(l)hˉ(l))

其中 hˉ(l)=1n∑j=1nhj(l)\bar{h}^{(l)} = \frac{1}{n} \sum_{j=1}^n h_j^{(l)}hˉ(l)=n1j=1nhj(l) 是所有智能体隐藏状态的平均。

TarMAC(Targeted Multi-Agent Communication)
TarMAC引入注意力机制,让智能体选择性地关注来自特定智能体的消息:

αij=softmax(fatt(hi,hj))\alpha_{ij} = \text{softmax}(f_{att}(h_i, h_j))αij=softmax(fatt(hi,hj))

hi(l+1)=f(Wh(l)hi(l)+∑j≠iαijWc(l)hj(l))h_i^{(l+1)} = f(W_h^{(l)} h_i^{(l)} + \sum_{j \neq i} \alpha_{ij} W_c^{(l)} h_j^{(l)})hi(l+1)=f(Wh(l)hi(l)+j=iαijWc(l)hj(l))

** emergent communication的研究发现**:

  • 智能体可以发展出类似语言的符号系统
  • 通信内容往往与任务相关(如位置、意图信息)
  • 在多智能体系统中,通信协议的形成是一个自组织过程

5.3 2024-2025年MARL研究趋势

根据最新的研究进展,MARL领域呈现以下趋势:

  1. 大语言模型与MARL的结合:利用LLM的推理能力增强智能体的决策和通信
  2. 离线MARL:从固定数据集中学习多智能体策略,减少对在线交互的依赖
  3. 可迁移MARL:研究跨任务、跨环境的策略迁移
  4. 安全MARL:确保多智能体系统的行为符合安全约束
  5. 层级MARL:处理长时程、复杂的多智能体任务

6. QMIX算法实现

下面是一个完整的QMIX算法实现,使用PyTorch框架。

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from collections import deque
import random


class AgentNetwork(nn.Module):
    """
    智能体网络:每个智能体独立的DRQN
    输入:观测历史
    输出:该智能体所有动作的Q值
    """
    def __init__(self, obs_dim, n_actions, hidden_dim=64):
        super(AgentNetwork, self).__init__()
        self.hidden_dim = hidden_dim
        
        # GRU层处理序列观测
        self.gru = nn.GRU(obs_dim, hidden_dim, batch_first=True)
        
        # 全连接层输出Q值
        self.fc1 = nn.Linear(hidden_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, n_actions)
        
    def init_hidden(self, batch_size=1):
        """初始化隐藏状态"""
        return torch.zeros(1, batch_size, self.hidden_dim)
    
    def forward(self, obs, hidden_state):
        """
        前向传播
        obs: (batch_size, seq_len, obs_dim)
        hidden_state: (1, batch_size, hidden_dim)
        """
        # GRU输出
        gru_out, new_hidden = self.gru(obs, hidden_state)
        
        # 取最后一个时间步的输出
        gru_out = gru_out[:, -1, :]
        
        # 全连接层
        x = F.relu(self.fc1(gru_out))
        q_values = self.fc2(x)
        
        return q_values, new_hidden


class HyperNetwork(nn.Module):
    """
    超网络:根据全局状态生成混合网络的权重
    """
    def __init__(self, state_dim, hidden_dim, n_agents):
        super(HyperNetwork, self).__init__()
        self.n_agents = n_agents
        self.hidden_dim = hidden_dim
        
        # 第一层权重生成网络
        self.hyper_w1 = nn.Linear(state_dim, n_agents * hidden_dim)
        # 第二层权重生成网络
        self.hyper_w2 = nn.Linear(state_dim, hidden_dim)
        
        # 偏置生成网络
        self.hyper_b1 = nn.Linear(state_dim, hidden_dim)
        self.hyper_b2 = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )
        
    def forward(self, state, q_values):
        """
        生成混合网络并计算Q_tot
        state: (batch_size, state_dim)
        q_values: (batch_size, n_agents)
        """
        batch_size = state.size(0)
        
        # 生成第一层权重 (batch_size, n_agents * hidden_dim)
        w1 = torch.abs(self.hyper_w1(state))
        w1 = w1.view(batch_size, self.n_agents, self.hidden_dim)
        
        # 生成第一层偏置 (batch_size, hidden_dim)
        b1 = self.hyper_b1(state)
        b1 = b1.view(batch_size, 1, self.hidden_dim)
        
        # 第一层: (batch_size, 1, n_agents) @ (batch_size, n_agents, hidden_dim)
        q_values = q_values.view(batch_size, 1, self.n_agents)
        hidden = F.elu(torch.bmm(q_values, w1) + b1)
        
        # 生成第二层权重 (batch_size, hidden_dim)
        w2 = torch.abs(self.hyper_w2(state))
        w2 = w2.view(batch_size, self.hidden_dim, 1)
        
        # 生成第二层偏置 (batch_size, 1)
        b2 = self.hyper_b2(state)
        b2 = b2.view(batch_size, 1, 1)
        
        # 第二层: (batch_size, 1, hidden_dim) @ (batch_size, hidden_dim, 1)
        q_tot = torch.bmm(hidden, w2) + b2
        q_tot = q_tot.view(batch_size)
        
        return q_tot


class QMIXAgent:
    """
    QMIX智能体:包含多个AgentNetwork和一个HyperNetwork
    """
    def __init__(self, n_agents, obs_dim, state_dim, n_actions, 
                 hidden_dim=64, gamma=0.99, lr=1e-3, buffer_size=10000):
        self.n_agents = n_agents
        self.obs_dim = obs_dim
        self.state_dim = state_dim
        self.n_actions = n_actions
        self.gamma = gamma
        
        # 为每个智能体创建网络
        self.agent_networks = nn.ModuleList([
            AgentNetwork(obs_dim, n_actions, hidden_dim) 
            for _ in range(n_agents)
        ])
        
        # 混合网络
        self.mixer = HyperNetwork(state_dim, hidden_dim, n_agents)
        
        # 目标网络
        self.target_agent_networks = nn.ModuleList([
            AgentNetwork(obs_dim, n_actions, hidden_dim) 
            for _ in range(n_agents)
        ])
        self.target_mixer = HyperNetwork(state_dim, hidden_dim, n_agents)
        
        # 同步目标网络
        self._update_target_networks(tau=1.0)
        
        # 优化器
        self.optimizer = torch.optim.Adam(
            list(self.agent_networks.parameters()) + 
            list(self.mixer.parameters()),
            lr=lr
        )
        
        # 经验回放缓冲区
        self.buffer = deque(maxlen=buffer_size)
        
    def _update_target_networks(self, tau=0.01):
        """软更新目标网络"""
        for target, source in zip(self.target_agent_networks, self.agent_networks):
            for target_param, param in zip(target.parameters(), source.parameters()):
                target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
        
        for target_param, param in zip(self.target_mixer.parameters(), self.mixer.parameters()):
            target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
    
    def select_actions(self, observations, epsilon=0.1):
        """
        epsilon-贪心策略选择动作
        observations: list of obs for each agent
        """
        actions = []
        for i, obs in enumerate(observations):
            if random.random() < epsilon:
                action = random.randint(0, self.n_actions - 1)
            else:
                obs_tensor = torch.FloatTensor(obs).unsqueeze(0).unsqueeze(0)
                hidden = self.agent_networks[i].init_hidden()
                with torch.no_grad():
                    q_values, _ = self.agent_networks[i](obs_tensor, hidden)
                    action = q_values.argmax(dim=1).item()
            actions.append(action)
        return actions
    
    def store_transition(self, observations, state, actions, reward, 
                        next_observations, next_state, done):
        """存储经验"""
        self.buffer.append((observations, state, actions, reward, 
                           next_observations, next_state, done))
    
    def learn(self, batch_size=32):
        """训练QMIX"""
        if len(self.buffer) < batch_size:
            return
        
        # 采样
        batch = random.sample(self.buffer, batch_size)
        
        # 解包
        observations, states, actions, rewards, next_observations, next_states, dones = zip(*batch)
        
        # 转换为张量
        observations = torch.FloatTensor(np.array(observations))  # (batch, n_agents, obs_dim)
        states = torch.FloatTensor(np.array(states))  # (batch, state_dim)
        actions = torch.LongTensor(np.array(actions))  # (batch, n_agents)
        rewards = torch.FloatTensor(np.array(rewards))  # (batch,)
        next_observations = torch.FloatTensor(np.array(next_observations))
        next_states = torch.FloatTensor(np.array(next_states))
        dones = torch.FloatTensor(np.array(dones))
        
        # 计算当前Q值
        q_values_list = []
        for i in range(self.n_agents):
            obs_i = observations[:, i, :].unsqueeze(1)  # (batch, 1, obs_dim)
            hidden = self.agent_networks[i].init_hidden(batch_size)
            q_values, _ = self.agent_networks[i](obs_i, hidden)
            # 选择对应动作的Q值
            q_values_list.append(q_values.gather(1, actions[:, i:i+1]))
        
        q_values = torch.cat(q_values_list, dim=1)  # (batch, n_agents)
        q_tot = self.mixer(states, q_values)
        
        # 计算目标Q值
        with torch.no_grad():
            next_q_values_list = []
            for i in range(self.n_agents):
                next_obs_i = next_observations[:, i, :].unsqueeze(1)
                hidden = self.target_agent_networks[i].init_hidden(batch_size)
                next_q_values, _ = self.target_agent_networks[i](next_obs_i, hidden)
                next_q_values_list.append(next_q_values.max(dim=1, keepdim=True)[0])
            
            next_q_values = torch.cat(next_q_values_list, dim=1)
            next_q_tot = self.target_mixer(next_states, next_q_values)
            target_q = rewards + self.gamma * next_q_tot * (1 - dones)
        
        # 计算损失
        loss = F.mse_loss(q_tot, target_q)
        
        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(
            list(self.agent_networks.parameters()) + list(self.mixer.parameters()), 
            10
        )
        self.optimizer.step()
        
        # 软更新目标网络
        self._update_target_networks()
        
        return loss.item()

7. 实战:多智能体协作任务

下面是一个完整的多智能体协作环境示例——协同宝藏收集

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle, Rectangle
import torch


class TreasureCollectionEnv:
    """
    协同宝藏收集环境
    - 多个智能体需要在网格世界中收集宝藏
    - 每个宝藏需要特定数量的智能体同时到达才能收集
    - 智能体需要学会协作分工
    """
    def __init__(self, n_agents=3, grid_size=8, n_treasures=2, min_collect_agents=2):
        self.n_agents = n_agents
        self.grid_size = grid_size
        self.n_treasures = n_treasures
        self.min_collect_agents = min_collect_agents
        
        # 动作空间: 0=上, 1=下, 2=左, 3=右, 4=停留
        self.n_actions = 5
        
        # 观测维度: 自身位置(2) + 相对宝藏位置(2*n_treasures) + 其他智能体相对位置(2*(n_agents-1))
        self.obs_dim = 2 + 2 * n_treasures + 2 * (n_agents - 1)
        self.state_dim = 2 * n_agents + 2 * n_treasures  # 所有智能体和宝藏的位置
        
        self.reset()
    
    def reset(self):
        """重置环境"""
        # 随机初始化智能体位置
        self.agent_positions = []
        for _ in range(self.n_agents):
            pos = [np.random.randint(0, self.grid_size), 
                   np.random.randint(0, self.grid_size)]
            self.agent_positions.append(pos)
        
        # 随机初始化宝藏位置
        self.treasure_positions = []
        self.treasure_collected = []
        for _ in range(self.n_treasures):
            pos = [np.random.randint(0, self.grid_size), 
                   np.random.randint(0, self.grid_size)]
            self.treasure_positions.append(pos)
            self.treasure_collected.append(False)
        
        self.steps = 0
        self.max_steps = 100
        
        return self.get_observations(), self.get_state()
    
    def get_observations(self):
        """获取每个智能体的局部观测"""
        observations = []
        for i in range(self.n_agents):
            obs = []
            # 自身位置(归一化)
            obs.extend([
                self.agent_positions[i][0] / self.grid_size,
                self.agent_positions[i][1] / self.grid_size
            ])
            
            # 相对宝藏位置
            for j in range(self.n_treasures):
                if not self.treasure_collected[j]:
                    rel_x = (self.treasure_positions[j][0] - self.agent_positions[i][0]) / self.grid_size
                    rel_y = (self.treasure_positions[j][1] - self.agent_positions[i][1]) / self.grid_size
                else:
                    rel_x, rel_y = 0, 0
                obs.extend([rel_x, rel_y])
            
            # 相对其他智能体位置
            for j in range(self.n_agents):
                if i != j:
                    rel_x = (self.agent_positions[j][0] - self.agent_positions[i][0]) / self.grid_size
                    rel_y = (self.agent_positions[j][1] - self.agent_positions[i][1]) / self.grid_size
                    obs.extend([rel_x, rel_y])
            
            observations.append(obs)
        return observations
    
    def get_state(self):
        """获取全局状态"""
        state = []
        # 所有智能体位置
        for pos in self.agent_positions:
            state.extend([pos[0] / self.grid_size, pos[1] / self.grid_size])
        # 所有宝藏位置
        for j, pos in enumerate(self.treasure_positions):
            if not self.treasure_collected[j]:
                state.extend([pos[0] / self.grid_size, pos[1] / self.grid_size])
            else:
                state.extend([-1, -1])
        return state
    
    def step(self, actions):
        """执行动作"""
        self.steps += 1
        
        # 移动智能体
        for i, action in enumerate(actions):
            x, y = self.agent_positions[i]
            if action == 0:  # 上
                y = min(y + 1, self.grid_size - 1)
            elif action == 1:  # 下
                y = max(y - 1, 0)
            elif action == 2:  # 左
                x = max(x - 1, 0)
            elif action == 3:  # 右
                x = min(x + 1, self.grid_size - 1)
            # action == 4: 停留
            self.agent_positions[i] = [x, y]
        
        # 计算奖励
        reward = 0
        for j, treasure_pos in enumerate(self.treasure_positions):
            if not self.treasure_collected[j]:
                # 统计在宝藏位置的智能体数量
                agents_at_treasure = sum([
                    1 for agent_pos in self.agent_positions 
                    if agent_pos == treasure_pos
                ])
                
                if agents_at_treasure >= self.min_collect_agents:
                    # 成功收集宝藏
                    self.treasure_collected[j] = True
                    reward += 10  # 大奖励
                elif agents_at_treasure > 0:
                    # 部分智能体到达,给予小奖励鼓励
                    reward += 0.1 * agents_at_treasure
        
        # 每步小惩罚,鼓励快速完成
        reward -= 0.05
        
        # 检查是否结束
        done = all(self.treasure_collected) or self.steps >= self.max_steps
        
        return self.get_observations(), self.get_state(), reward, done
    
    def render(self):
        """可视化环境"""
        fig, ax = plt.subplots(figsize=(8, 8))
        ax.set_xlim(0, self.grid_size)
        ax.set_ylim(0, self.grid_size)
        ax.set_aspect('equal')
        ax.grid(True)
        
        # 绘制宝藏
        colors = ['gold', 'orange']
        for j, pos in enumerate(self.treasure_positions):
            if not self.treasure_collected[j]:
                circle = Circle((pos[0] + 0.5, pos[1] + 0.5), 0.3, 
                               color=colors[j % len(colors)], alpha=0.7)
                ax.add_patch(circle)
                ax.text(pos[0] + 0.5, pos[1] + 0.5, f'T{j+1}', 
                       ha='center', va='center', fontsize=10)
        
        # 绘制智能体
        agent_colors = ['red', 'blue', 'green', 'purple']
        for i, pos in enumerate(self.agent_positions):
            rect = Rectangle((pos[0] + 0.2, pos[1] + 0.2), 0.6, 0.6, 
                            color=agent_colors[i % len(agent_colors)], alpha=0.7)
            ax.add_patch(rect)
            ax.text(pos[0] + 0.5, pos[1] + 0.5, f'A{i+1}', 
                   ha='center', va='center', fontsize=10, color='white')
        
        plt.title(f'Multi-Agent Treasure Collection (Step {self.steps})')
        plt.show()


def train_qmix():
    """训练QMIX智能体"""
    # 环境参数
    n_agents = 3
    env = TreasureCollectionEnv(n_agents=n_agents, grid_size=8, 
                                n_treasures=2, min_collect_agents=2)
    
    # 创建QMIX智能体
    agent = QMIXAgent(
        n_agents=n_agents,
        obs_dim=env.obs_dim,
        state_dim=env.state_dim,
        n_actions=env.n_actions,
        hidden_dim=64,
        gamma=0.99,
        lr=1e-3,
        buffer_size=10000
    )
    
    # 训练参数
    n_episodes = 2000
    batch_size = 32
    epsilon_start = 1.0
    epsilon_end = 0.05
    epsilon_decay = 0.995
    epsilon = epsilon_start
    
    # 记录训练过程
    episode_rewards = []
    
    for episode in range(n_episodes):
        observations, state = env.reset()
        episode_reward = 0
        done = False
        
        while not done:
            # 选择动作
            actions = agent.select_actions(observations, epsilon)
            
            # 执行动作
            next_observations, next_state, reward, done = env.step(actions)
            
            # 存储经验
            agent.store_transition(observations, state, actions, reward,
                                  next_observations, next_state, done)
            
            # 更新状态
            observations = next_observations
            state = next_state
            episode_reward += reward
            
            # 学习
            if len(agent.buffer) >= batch_size:
                agent.learn(batch_size)
        
        # 衰减探索率
        epsilon = max(epsilon_end, epsilon * epsilon_decay)
        
        episode_rewards.append(episode_reward)
        
        # 打印进度
        if (episode + 1) % 100 == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            print(f'Episode {episode + 1}/{n_episodes}, '
                  f'Avg Reward: {avg_reward:.2f}, Epsilon: {epsilon:.3f}')
    
    return agent, episode_rewards


# 运行训练
if __name__ == '__main__':
    print('开始训练QMIX智能体...')
    trained_agent, rewards = train_qmix()
    
    # 绘制训练曲线
    plt.figure(figsize=(10, 5))
    plt.plot(rewards)
    plt.xlabel('Episode')
    plt.ylabel('Episode Reward')
    plt.title('QMIX Training Progress')
    plt.grid(True)
    plt.show()

8. 协作与竞争行为展示

8.1 协作行为分析

在完全合作环境中,智能体需要学会以下协作模式:

角色分工
在资源收集任务中,智能体可能自发形成不同的角色。例如,在协同宝藏收集环境中,某些智能体可能专门负责探索,而另一些则负责协助收集。

信息共享
通过通信机制或隐式协调,智能体学会共享关键信息,如宝藏位置、自身意图等。这种信息共享显著提高了团队效率。

同步行动
某些任务要求智能体在特定时刻采取协调行动。QMIX等值分解方法通过联合Q值的学习,使智能体能够学会这种同步。

8.2 竞争行为分析

在竞争环境中,智能体展现出不同的行为模式:

对抗策略
在零和博弈中,智能体学会预测对手的行动并采取对抗措施。MADDPG等算法通过中心化Critic学习对手的行为模型。

欺骗与伪装
在复杂的竞争环境中,智能体可能学会欺骗性策略,如伪装意图、设置陷阱等。这些行为往往在训练过程中自发涌现。

混合博弈中的权衡
在既有合作又有竞争的环境中,智能体需要在协作与竞争之间找到平衡。这种权衡是MARL研究的重要课题。

8.3 可视化协作与竞争

def visualize_behaviors():
    """
    可视化多智能体的协作与竞争行为
    """
    # 创建两个对比环境
    fig, axes = plt.subplots(1, 2, figsize=(16, 7))
    
    # 左侧:协作场景
    ax1 = axes[0]
    ax1.set_xlim(0, 10)
    ax1.set_ylim(0, 10)
    ax1.set_aspect('equal')
    ax1.set_title('协作场景:协同覆盖', fontsize=14)
    ax1.grid(True, alpha=0.3)
    
    # 绘制协作智能体
    coop_positions = [(2, 5), (5, 8), (8, 5), (5, 2)]
    coop_targets = [(0, 0), (10, 0), (10, 10), (0, 10)]
    
    for i, (pos, target) in enumerate(zip(coop_positions, coop_targets)):
        # 智能体
        circle = Circle(pos, 0.4, color='blue', alpha=0.6)
        ax1.add_patch(circle)
        ax1.text(pos[0], pos[1], f'A{i+1}', ha='center', va='center', 
                color='white', fontsize=10)
        
        # 目标区域
        rect = Rectangle((target[0]-1, target[1]-1), 2, 2, 
                         color='green', alpha=0.2)
        ax1.add_patch(rect)
        
        # 协作箭头
        ax1.annotate('', xy=target, xytext=pos,
                    arrowprops=dict(arrowstyle='->', color='blue', lw=2))
    
    # 添加协作标注
    ax1.text(5, 5, '协作覆盖\n所有区域', ha='center', va='center',
            fontsize=12, bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.5))
    
    # 右侧:竞争场景
    ax2 = axes[1]
    ax2.set_xlim(0, 10)
    ax2.set_ylim(0, 10)
    ax2.set_aspect('equal')
    ax2.set_title('竞争场景:资源争夺', fontsize=14)
    ax2.grid(True, alpha=0.3)
    
    # 绘制竞争智能体
    team_a_pos = [(2, 5), (3, 7)]
    team_b_pos = [(8, 5), (7, 3)]
    resources = [(5, 5), (5, 8), (5, 2)]
    
    for pos in team_a_pos:
        circle = Circle(pos, 0.4, color='red', alpha=0.6)
        ax2.add_patch(circle)
        ax2.text(pos[0], pos[1], 'A', ha='center', va='center', 
                color='white', fontsize=10)
    
    for pos in team_b_pos:
        circle = Circle(pos, 0.4, color='blue', alpha=0.6)
        ax2.add_patch(circle)
        ax2.text(pos[0], pos[1], 'B', ha='center', va='center', 
                color='white', fontsize=10)
    
    for pos in resources:
        star = Circle(pos, 0.3, color='gold', alpha=0.8)
        ax2.add_patch(star)
    
    # 绘制对抗箭头
    ax2.annotate('', xy=(5, 5), xytext=(2.5, 5),
                arrowprops=dict(arrowstyle='->', color='red', lw=2))
    ax2.annotate('', xy=(5, 5), xytext=(7.5, 5),
                arrowprops=dict(arrowstyle='->', color='blue', lw=2))
    
    # 添加竞争标注
    ax2.text(5, 1, '两队争夺\n中央资源', ha='center', va='center',
            fontsize=12, bbox=dict(boxstyle='round', facecolor='lightyellow', alpha=0.5))
    
    plt.tight_layout()
    plt.show()


# 运行可视化
visualize_behaviors()

9. 避坑小贴士

9.1 训练不稳定的常见问题

问题1:非平稳性导致的训练发散

在多智能体训练中,其他智能体的策略变化使得环境变得非平稳。这可能导致训练发散或收敛到次优解。

解决方案

  • 使用CTDE范式,通过中心化Critic稳定训练
  • 降低学习率,使用更保守的更新策略
  • 引入目标网络,减缓策略变化速度

问题2:信用分配问题

在合作任务中,难以确定哪个智能体对团队成功贡献最大。

解决方案

  • 使用值分解方法(VDN、QMIX)明确分配信用
  • 设计差异化的奖励函数,使每个智能体的贡献可区分
  • 考虑使用反事实基线(Counterfactual Baseline)

问题3:局部最优陷阱

多智能体系统容易陷入低效的局部最优,如相互背叛的囚徒困境均衡。

解决方案

  • 引入内在奖励(Intrinsic Reward),鼓励探索
  • 使用课程学习,逐步增加任务难度
  • 考虑群体奖励塑形(Reward Shaping)

9.2 超参数调优建议

超参数 建议值 说明
学习率 1e-4 ~ 1e-3 多智能体训练通常需要较小的学习率
折扣因子 gamma 0.95 ~ 0.99 根据任务时程调整
经验缓冲区大小 1e5 ~ 1e6 确保足够的样本多样性
目标网络更新频率 软更新 tau=0.01 或使用硬更新每1000步
Epsilon衰减 0.995 ~ 0.999 保持足够的探索时间
批量大小 32 ~ 128 根据计算资源调整

9.3 调试技巧

  1. 可视化策略:定期渲染智能体的行为,直观检查策略合理性
  2. 监控Q值:跟踪Q值的变化,检查是否出现发散
  3. 消融实验:逐一移除组件,确定关键设计
  4. 简化环境:先在简单环境上验证算法,再迁移到复杂环境

10. 延伸阅读

10.1 经典论文

  1. MADDPG: Lowe et al. “Multi-Agent Actor-Critic for Mixed Cooperative-Competitive Environments”, NeurIPS 2017

    • CTDE范式的开创性工作
  2. QMIX: Rashid et al. “QMIX: Monotonic Value Function Factorisation for Deep Multi-Agent Reinforcement Learning”, ICML 2018

    • 值分解方法的重要突破
  3. MAPPO: Yu et al. “The Surprising Effectiveness of PPO in Cooperative Multi-Agent Games”, NeurIPS 2022

    • 展示了PPO在多智能体场景中的优异性能
  4. VDN: Sunehag et al. “Value-Decomposition Networks For Cooperative Multi-Agent Learning”, AAMAS 2018

    • 值分解方法的基础

10.2 前沿研究方向

  1. 离线MARL: 从固定数据集中学习多智能体策略
  2. 迁移MARL: 跨任务、跨环境的策略迁移
  3. 层级MARL: 处理长时程、复杂的多智能体任务
  4. 安全MARL: 确保多智能体系统的行为符合安全约束
  5. LLM增强MARL: 利用大语言模型的推理能力

10.3 推荐资源

  • SMAC(StarCraft Multi-Agent Challenge): 多智能体协作的标准测试平台
  • MPE(Multi-Agent Particle Environment): 轻量级多智能体环境
  • PyMARL: QMIX等算法的开源实现
  • EPyMARL: 扩展的MARL算法库

总结

多智能体强化学习是强化学习领域最具挑战性和前景的方向之一。本讲我们系统学习了:

  1. 多智能体环境的形式化:随机博弈框架、部分可观测性、环境分类
  2. 博弈论基础:纳什均衡、帕累托最优、社会困境
  3. 独立学习方法:IQL、VDN、QMIX的原理与实现
  4. CTDE范式:MADDPG与MAPPO的架构设计
  5. 涌现行为与通信:智能体如何自发形成复杂行为

MARL的核心挑战在于处理环境的非平稳性、解决信用分配问题、以及引导智能体向高效均衡收敛。CTDE范式和值分解方法是应对这些挑战的有效工具。

随着研究的深入,MARL正在从实验室走向实际应用,在自动驾驶、机器人协作、智能交通等领域展现出巨大潜力。2024年以来,大语言模型与MARL的结合更是开辟了新的研究方向。


本文首发于CSDN,欢迎点赞、收藏、评论交流。转载请注明出处。

标签: 强化学习, 多智能体, 深度学习, QMIX, MADDPG, 博弈论

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐