【概率论在机器学习中·第七篇】图模型:贝叶斯网络、马尔可夫随机场与 EM 算法

作者:技术博主 | 更新时间:2026-05-21 | 阅读时长:约 24 分钟
系列:概率论在机器学习中(共 8 篇)
环境:Python 3.12 + NumPy + SciPy
标签图模型 贝叶斯网络 马尔可夫随机场 EM算法 混合高斯 隐变量 条件独立 因子图


在这里插入图片描述

🔥 本篇目标:大多数机器学习模型其实都是某种图模型:朴素贝叶斯、HMM、LDA、混合高斯……都可以用图来表示变量间的依赖关系。图模型提供了一个统一的语言来描述联合概率分布的结构,让"哪些变量有关、哪些变量独立"变得一目了然。本篇从有向图(贝叶斯网络)到无向图(马尔可夫随机场),再到含隐变量模型的核心训练算法——EM 算法


系列进度

篇次 主题 状态
第一篇~第六篇 基础到 MCMC ✅ 已发布
第七篇(本篇) 图模型:贝叶斯网络与 EM 算法
第八篇 现代应用:VAE、扩散模型与贝叶斯深度学习 即将发布

目录


一、为什么需要图模型

import numpy as np
import scipy.stats as stats
from scipy.stats import multivariate_normal, norm
from scipy.special import logsumexp
import warnings
warnings.filterwarnings('ignore')

print("为什么需要图模型?")
print()
print("  问题:n 个二值随机变量的联合分布")
print("  需要参数数量:2ⁿ - 1(指数级!)")
print()

for n in [2, 5, 10, 20, 100]:
    n_params = 2**n - 1
    print(f"  n={n:4d}: 需要 {n_params:>15,} 个参数")

print()
print("  解决方案:用条件独立性假设压缩参数")
print()
print("  条件独立性:X ⊥ Y | Z")
print("  '给定 Z,X 和 Y 独立'")
print("  → 联合分布可以因子化")
print()
print("  图模型的两个功能:")
print("  ① 表示:用图结构直观表达条件独立性假设")
print("  ② 推断:利用图结构高效计算边缘/条件概率")
print()
print("  两大类:")
print("  有向图(贝叶斯网络,DAG):因果/生成方向明确")
print("  无向图(马尔可夫随机场,MRF):变量间对称相互作用")

二、贝叶斯网络:有向图模型

2.1 定义与因子化

class BayesianNetwork:
    """
    贝叶斯网络(有向无环图,DAG)

    核心性质:
    联合分布 = 各节点的条件概率分布之积
    P(X₁,...,Xₙ) = ∏ᵢ P(Xᵢ | parents(Xᵢ))

    每个节点只依赖其父节点(局部马尔可夫性质)
    → 大幅减少参数数量
    """

    def __init__(self):
        self.nodes   = {}   # 节点名 → CPT(条件概率表)
        self.parents = {}   # 节点名 → 父节点列表

    def add_node(self, name: str, parents: list, cpt: dict):
        self.nodes[name]   = cpt
        self.parents[name] = parents

    def query(self, evidence: dict, query_var: str) -> dict:
        """枚举法计算后验概率(仅适用于离散小网络)"""
        # 这里简化实现,实际用消息传递
        pass


# 经典例子:学生网络(Koller & Friedman 教材)
print("贝叶斯网络示例:学生成绩预测")
print()
print("  变量:")
print("  D(课程难度):easy=0.6, hard=0.4")
print("  I(学生智力):low=0.7, high=0.3")
print("  G(成绩):依赖 D 和 I")
print("  S(SAT分数):依赖 I")
print("  L(推荐信):依赖 G")
print()
print("  图结构(有向边):")
print("  D → G ← I → S")
print("        ↓")
print("        L")
print()
print("  联合分布因子化:")
print("  P(D,I,G,S,L) = P(D)·P(I)·P(G|D,I)·P(S|I)·P(L|G)")
print()

# 条件概率表
P_D    = {0: 0.6, 1: 0.4}   # easy=0, hard=1
P_I    = {0: 0.7, 1: 0.3}   # low=0, high=1

# P(G | D, I):3个等级(A/B/C)
P_G_given_DI = {
    # (D=easy, I=low)
    (0, 0): {0: 0.3, 1: 0.4, 2: 0.3},   # A/B/C
    # (D=easy, I=high)
    (0, 1): {0: 0.9, 1: 0.08, 2: 0.02},
    # (D=hard, I=low)
    (1, 0): {0: 0.05, 1: 0.25, 2: 0.70},
    # (D=hard, I=high)
    (1, 1): {0: 0.50, 1: 0.30, 2: 0.20},
}

P_S_given_I = {
    0: {0: 0.95, 1: 0.05},   # I=low: 95% low SAT
    1: {0: 0.20, 1: 0.80},   # I=high: 80% high SAT
}

P_L_given_G = {
    0: {0: 0.1,  1: 0.9},    # G=A: 90% good letter
    1: {0: 0.4,  1: 0.6},    # G=B
    2: {0: 0.99, 1: 0.01},   # G=C: 1% good letter
}

# 枚举计算 P(L=good | S=high)(推断演示)
print("推断演示:P(好推荐信 | SAT 高分)")
print()

p_L1_S1 = 0.0
p_S1    = 0.0

for d in [0, 1]:
    for i in [0, 1]:
        for g in [0, 1, 2]:
            # 联合概率
            joint = (P_D[d] * P_I[i] * P_G_given_DI[(d,i)][g]
                     * P_S_given_I[i][1]     # S=high
                     * P_L_given_G[g][1])    # L=good
            p_L1_S1 += joint

            # P(S=high) 的边际
            p_S1 += P_D[d] * P_I[i] * P_G_given_DI[(d,i)][g] * P_S_given_I[i][1]

p_L1_given_S1 = p_L1_S1 / p_S1
print(f"  P(L=好推荐信 | S=高SAT) = {p_L1_given_S1:.4f}")

# 对比无条件
p_L1 = 0.0
for d in [0, 1]:
    for i in [0, 1]:
        for g in [0, 1, 2]:
            for s in [0, 1]:
                p_L1 += (P_D[d] * P_I[i] * P_G_given_DI[(d,i)][g]
                         * P_S_given_I[i][s] * P_L_given_G[g][1])

print(f"  P(L=好推荐信)           = {p_L1:.4f}")
print(f"  SAT 高分后,好推荐信概率从 {p_L1:.3f} 变为 {p_L1_given_S1:.3f}")

2.2 常见贝叶斯网络结构

def common_bn_structures():
    print("\n常见贝叶斯网络结构及其条件独立性:")
    print()

    structures = [
        ("链(Chain)",
         "A → B → C",
         "A ⊥ C | B(给定 B,A 和 C 独立)",
         "马尔可夫链、HMM(隐马尔可夫模型)"),
        ("叉(Fork / Common Cause)",
         "A ← B → C",
         "A ⊥ C | B(给定公共原因 B,A 和 C 独立)",
         "朴素贝叶斯(类别→特征)"),
        ("对撞(Collider / V-structure)",
         "A → B ← C",
         "A ⊥ C(边际独立)但 A ⊭ C | B!",
         "伯克森悖论、解释消除效应"),
        ("完全连接(Dense)",
         "所有节点两两相连",
         "无条件独立性",
         "参数最多,但假设最少"),
    ]

    for name, graph, independence, example in structures:
        print(f"  [{name}] {graph}")
        print(f"    条件独立性:{independence}")
        print(f"    ML 应用:   {example}")
        print()

    # 对撞节点(Collider)的特殊性
    print("  ⚠️  对撞节点(Collider)的反直觉性质:")
    print()
    print("  结构:A → B ← C(A 和 C 是 B 的共同原因)")
    print()
    print("  边际独立:P(A,C) = P(A)·P(C)(A 和 C 无关)")
    print("  条件相关:P(A,C|B) ≠ P(A|B)·P(C|B)(给定 B 后,A 和 C 相关!)")
    print()
    print("  直觉(伯克森悖论):")
    print("  A=学习努力,C=天赋,B=成绩好(A 和 C 的共同结果)")
    print("  知道 B=成绩好 之后:如果 A=不努力,则 C=有天赋的概率变高!")
    print("  → 观测到结果,使两个原因之间产生了'虚假相关'")

    # 数值验证
    np.random.seed(42)
    n = 10000
    A = np.random.binomial(1, 0.5, n)   # 努力
    C = np.random.binomial(1, 0.5, n)   # 天赋

    # B = A OR C(有一个就能成绩好)
    B = np.logical_or(A, C).astype(int)

    # 边际相关
    corr_AC_marginal = np.corrcoef(A, C)[0, 1]

    # 给定 B=1 时的相关性
    mask_B1    = B == 1
    corr_AC_B1 = np.corrcoef(A[mask_B1], C[mask_B1])[0, 1]

    print()
    print(f"  数值验证:")
    print(f"    Corr(A,C)      = {corr_AC_marginal:.4f}(接近 0,独立 ✓)")
    print(f"    Corr(A,C|B=1)  = {corr_AC_B1:.4f}(负相关!给定成绩好后)")


common_bn_structures()

三、d-分离:图中读取条件独立性

def d_separation_rules():
    """d-分离规则:从图中直接判断条件独立性"""

    print("\nd-分离(d-Separation)规则:")
    print()
    print("  给定观测集 Z,判断 X 和 Y 是否条件独立(X ⊥ Y | Z)")
    print("  需要检查 X 到 Y 的所有路径是否被'阻断'")
    print()
    print("  路径 X - ... - Y 被阻断的条件:")
    print()

    rules = [
        ("链(顺序路径)",
         "X → M → Y  或  X ← M ← Y",
         "M ∈ Z(中间节点被观测)",
         "链被切断,X ⊥ Y | M"),
        ("叉(共同原因)",
         "X ← M → Y",
         "M ∈ Z(公共原因被观测)",
         "叉被切断,X ⊥ Y | M"),
        ("对撞(V-结构)",
         "X → M ← Y",
         "M ∉ Z 且 M 的后代 ∉ Z",
         "对撞激活,X ⊥ Y(不观测 M)"),
        ("对撞(V-结构)观测",
         "X → M ← Y",
         "M ∈ Z 或 M 的后代 ∈ Z",
         "对撞被激活,X ⊭ Y | M(路径开放!)"),
    ]

    for case_name, structure, condition, result in rules:
        print(f"  [{case_name}]")
        print(f"    结构:{structure}")
        print(f"    条件:{condition}")
        print(f"    结论:{result}")
        print()

    print("  X ⊥ Y | Z 当且仅当 Z d-分离 X 和 Y")
    print("  (X 到 Y 的所有路径都被 Z 阻断)")
    print()

    # 实际例子
    print("  实际例子(朴素贝叶斯):")
    print()
    print("  结构:Y → X₁, Y → X₂, ..., Y → Xₙ")
    print("  (Y 是类别标签,Xᵢ 是特征)")
    print()
    print("  d-分离分析:")
    print("  X₁ 到 X₂ 的路径:X₁ ← Y → X₂(叉结构)")
    print("  给定 Y(Y ∈ Z):叉被切断 → X₁ ⊥ X₂ | Y")
    print()
    print("  → 朴素贝叶斯假设的数学基础:给定类别,特征条件独立")
    print()
    print("  实际例子(隐马尔可夫模型 HMM):")
    print()
    print("  结构:z₁ → z₂ → z₃ → ...(隐状态链)")
    print("         ↓     ↓     ↓")
    print("         x₁    x₂    x₃  ...(观测)")
    print()
    print("  d-分离分析:")
    print("  z₁ 到 z₃:z₁ → z₂ → z₃(链)")
    print("  给定 z₂:z₁ ⊥ z₃ | z₂(马尔可夫性!)")


d_separation_rules()

四、马尔可夫随机场:无向图模型

def markov_random_field():
    """马尔可夫随机场的介绍"""

    print("\n马尔可夫随机场(MRF / 无向图模型):")
    print()
    print("  与贝叶斯网络的区别:")
    print("  贝叶斯网络:有向边,表示条件概率 P(X|parents)")
    print("  MRF:无向边,表示变量间的对称相互作用")
    print()
    print("  联合分布:")
    print("  P(X₁,...,Xₙ) = (1/Z) ∏_C ψ_C(X_C)")
    print("  Z = ∑_X ∏_C ψ_C(X_C)(配分函数)")
    print()
    print("  ψ_C:势函数(clique potential),非负实函数")
    print("  X_C:团(clique,图中互相连接的子集)中的变量")
    print()
    print("  条件独立性(马尔可夫性):")
    print("  X_i ⊥ X_{-i,-nb} | X_{nb(i)}")
    print("  '给定邻居,节点与非邻居条件独立'")
    print()

    # 常见 MRF 例子
    mrf_examples = [
        ("伊辛模型(Ising Model)",
         "X = {±1},ψ(Xᵢ,Xⱼ) = exp(J·Xᵢ·Xⱼ)",
         "磁性材料、图像去噪",
         "J>0 相邻像素倾向相同,J<0 倾向相反"),
        ("条件随机场(CRF)",
         "P(Y|X) ∝ ∏ψ_C(Y_C, X_C)",
         "序列标注(NER、POS tagging)",
         "给定输入 X,对输出序列 Y 建模"),
        ("玻尔兹曼机(RBM)",
         "P(v,h) = exp(-E(v,h))/Z,E=-v^T W h",
         "特征学习、生成模型",
         "可见层 v 和隐层 h 的二部图"),
        ("图像 MRF",
         "像素 = 节点,相邻像素 = 边",
         "图像分割、去噪、修复",
         "MRF 先验 + 似然 = 贝叶斯图像处理"),
    ]

    for name, form, application, intuition in mrf_examples:
        print(f"  [{name}]")
        print(f"    势函数:{form}")
        print(f"    应用:  {application}")
        print(f"    直觉:  {intuition}")
        print()

    # 贝叶斯网络 vs MRF 的转换
    print("  贝叶斯网络 vs MRF 的选择:")
    print()
    print(f"  {'特征':^20} {'贝叶斯网络':^22} {'MRF':^22}")
    print("  " + "─" * 68)
    comparison = [
        ("边的方向",     "有向(因果关系)",       "无向(对称关系)"),
        ("联合分布",     "∏P(Xᵢ|parents(Xᵢ))",   "(1/Z)∏ψ_C(X_C)"),
        ("归一化",       "自动满足",                "需要配分函数 Z"),
        ("条件独立读取", "d-分离规则",              "图分离规则"),
        ("典型应用",     "生成模型,因果推断",       "判别模型,空间结构"),
    ]
    for feature, bn, mrf in comparison:
        print(f"  {feature:^20} {bn:^22} {mrf:^22}")


markov_random_field()

五、常见图模型的统一视角

def unified_view_of_graphical_models():
    print("\n常见 ML 模型的图模型视角:")
    print()

    models = [
        {
            "name":   "朴素贝叶斯",
            "graph":  "Y → X₁, Y → X₂, ..., Y → Xd",
            "assumption": "给定类别 Y,特征 Xᵢ 条件独立",
            "factorization": "P(Y,X) = P(Y)·∏P(Xᵢ|Y)",
            "params": "P(Y) + d个 P(Xᵢ|Y)(线性参数)",
            "vs_full": f"完整: {2**10:,}, 朴素贝叶斯: {1+10*2} (d=10, 二值)",
        },
        {
            "name":   "隐马尔可夫模型(HMM)",
            "graph":  "z₁ → z₂ → ... → zₜ,zₜ → xₜ",
            "assumption": "马尔可夫性(当前状态只依赖前一状态)",
            "factorization": "P(z₁:T, x₁:T) = P(z₁)∏P(zₜ|zₜ₋₁)∏P(xₜ|zₜ)",
            "params": "初始/转移/发射概率(线性)",
            "vs_full": "序列建模最重要的结构假设之一",
        },
        {
            "name":   "混合高斯模型(GMM)",
            "graph":  "π → z, z → x(z 是离散隐变量)",
            "assumption": "x 来自 K 个高斯之一,选哪个由 z 决定",
            "factorization": "P(x) = ∑_z P(z)P(x|z) = Σₖ πₖN(x;μₖ,Σₖ)",
            "params": "K 个权重 + K 个高斯参数",
            "vs_full": "EM 算法训练(本篇重点)",
        },
        {
            "name":   "主题模型(LDA)",
            "graph":  "α→θ→z→w, β→φ(词/文档/主题层次)",
            "assumption": "文档是主题的混合,主题是词的分布",
            "factorization": "P(w,z,θ,φ|α,β) = P(φ|β)∏d P(θd|α)∏n P(zdn|θd)P(wdn|φ)",
            "params": "K 个主题(各为词典大小的分布)",
            "vs_full": "贝叶斯 EM / 变分推理训练",
        },
    ]

    for model in models:
        print(f"  [{model['name']}]")
        print(f"    图结构:  {model['graph']}")
        print(f"    核心假设:{model['assumption']}")
        print(f"    因子化:  {model['factorization']}")
        print(f"    参数量:  {model['params']}")
        print()


unified_view_of_graphical_models()

六、含隐变量的模型与 EM 算法

def em_motivation():
    """EM 算法的动机"""

    print("\nEM 算法的动机:含隐变量模型的 MLE")
    print()
    print("  问题:")
    print("  观测:X = {x₁,...,xₙ}(可见)")
    print("  隐变量:Z = {z₁,...,zₙ}(不可见)")
    print("  参数:θ(待估计)")
    print()
    print("  完整对数似然(如果 Z 已知):")
    print("  log P(X,Z|θ) = Σᵢ log P(xᵢ,zᵢ|θ)  ← 简单!")
    print()
    print("  观测对数似然(Z 未知,需要边缘化):")
    print("  log P(X|θ) = Σᵢ log [Σ_z P(xᵢ,z|θ)] = Σᵢ log Σ_z P(xᵢ,z|θ)")
    print("  ↑ log 里有求和,通常没有解析解!")
    print()
    print("  EM 的解决思路:")
    print("  用 E 步猜 Z 的分布,M 步最大化期望完整对数似然")
    print()
    print("  ┌──────────────────────────────────────────────────────┐")
    print("  │  EM = 迭代地:                                        │")
    print("  │  E 步:计算 Q(θ|θ_old) = E_{Z|X,θ_old}[log P(X,Z|θ)]│")
    print("  │  M 步:θ_new = argmax_θ Q(θ|θ_old)                  │")
    print("  │  保证:log P(X|θ) 在每次迭代后单调不减               │")
    print("  └──────────────────────────────────────────────────────┘")


em_motivation()

七、EM 算法:E 步和 M 步详解

def em_derivation():
    """EM 算法的完整推导"""

    print("\nEM 算法的完整推导(从 ELBO 出发):")
    print()
    print("  目标:最大化 log P(X|θ)")
    print()
    print("  引入任意分布 q(Z),使用 Jensen 不等式:")
    print()
    print("  log P(X|θ) = log Σ_Z P(X,Z|θ)")
    print("             = log Σ_Z q(Z) · P(X,Z|θ)/q(Z)")
    print("             ≥ Σ_Z q(Z) log [P(X,Z|θ)/q(Z)]   ← Jensen")
    print("             = E_q[log P(X,Z|θ)] + H(q)")
    print("             = L(q, θ)  ← 下界(ELBO)")
    print()
    print("  等号成立当且仅当:")
    print("  P(X,Z|θ)/q(Z) = const → q(Z) = P(Z|X,θ)(真实后验!)")
    print()
    print("  EM 的两步:")
    print()
    print("  E 步(Expectation):")
    print("  固定 θ_old,最大化 L(q, θ_old) 关于 q")
    print("  → 最优 q* = P(Z|X, θ_old)(后验分布)")
    print("  此时 L(q*, θ_old) = log P(X|θ_old)(紧界!)")
    print()
    print("  M 步(Maximization):")
    print("  固定 q*,最大化 L(q*, θ) 关于 θ")
    print("  → θ_new = argmax_θ E_{P(Z|X,θ_old)}[log P(X,Z|θ)]")
    print("          = argmax_θ Q(θ|θ_old)  ← 期望完整对数似然")
    print()
    print("  单调性保证(EM 不会降低似然):")
    print("  log P(X|θ_new)")
    print("  ≥ L(q*, θ_new)   (ELBO 是下界)")
    print("  ≥ L(q*, θ_old)   (M 步提高了 L)")
    print("  = log P(X|θ_old) (E 步后 ELBO = 对数似然)")
    print()
    print("  → 每次迭代对数似然单调不减 ✓")


em_derivation()

八、代码实战:EM 算法拟合高斯混合模型

class GaussianMixture:
    """
    高斯混合模型(GMM)的 EM 算法实现

    模型:
    P(x) = Σₖ πₖ · N(x; μₖ, Σₖ)

    隐变量:zᵢ ∈ {1,...,K}(第 i 个点属于哪个分量)
    参数:θ = {π₁,...,πₖ, μ₁,...,μₖ, Σ₁,...,Σₖ}

    EM 推导:
    E 步:γₙₖ = P(zₙ=k|xₙ,θ) = πₖN(xₙ;μₖ,Σₖ) / Σⱼ πⱼN(xₙ;μⱼ,Σⱼ)
    M 步:
      Nₖ = Σₙ γₙₖ
      μₖ = (1/Nₖ) Σₙ γₙₖ xₙ
      Σₖ = (1/Nₖ) Σₙ γₙₖ (xₙ-μₖ)(xₙ-μₖ)^T
      πₖ = Nₖ / N
    """

    def __init__(self, n_components: int = 2, tol: float = 1e-6, max_iter: int = 300):
        self.K        = n_components
        self.tol      = tol
        self.max_iter = max_iter

    def _init_params(self, X: np.ndarray):
        N, d  = X.shape
        K     = self.K

        # K-means++ 风格的初始化(更稳定)
        centers_idx = [np.random.randint(N)]
        for _ in range(K - 1):
            dists = np.array([min(np.linalg.norm(X[i] - X[c])**2
                                  for c in centers_idx) for i in range(N)])
            prob  = dists / dists.sum()
            centers_idx.append(np.random.choice(N, p=prob))

        self.mu    = X[centers_idx].copy().astype(float)
        self.Sigma = np.array([np.eye(d) for _ in range(K)])
        self.pi    = np.ones(K) / K

    def _e_step(self, X: np.ndarray) -> np.ndarray:
        """
        E 步:计算每个点属于每个分量的后验概率
        γₙₖ = P(zₙ=k | xₙ, θ)
        返回:(N, K) 的责任矩阵
        """
        N = len(X)
        log_resp = np.zeros((N, self.K))

        for k in range(self.K):
            log_resp[:, k] = (np.log(self.pi[k] + 1e-300)
                              + multivariate_normal.logpdf(X, self.mu[k], self.Sigma[k]))

        # 对数归一化(数值稳定)
        log_resp -= logsumexp(log_resp, axis=1, keepdims=True)
        return np.exp(log_resp)

    def _m_step(self, X: np.ndarray, gamma: np.ndarray):
        """
        M 步:最大化期望完整对数似然,更新参数
        """
        N, d = X.shape
        Nk   = gamma.sum(axis=0)   # 每个分量的有效样本数 (K,)

        # 更新均值
        self.mu = (gamma.T @ X) / Nk[:, np.newaxis]   # (K, d)

        # 更新协方差
        for k in range(self.K):
            diff        = X - self.mu[k]                          # (N, d)
            self.Sigma[k] = (gamma[:, k:k+1] * diff).T @ diff / Nk[k]
            # 添加正则化(数值稳定)
            self.Sigma[k] += 1e-6 * np.eye(d)

        # 更新混合权重
        self.pi = Nk / N

    def _log_likelihood(self, X: np.ndarray) -> float:
        """计算对数似然 log P(X|θ)"""
        N = len(X)
        log_lik = np.zeros((N, self.K))
        for k in range(self.K):
            log_lik[:, k] = (np.log(self.pi[k] + 1e-300)
                             + multivariate_normal.logpdf(X, self.mu[k], self.Sigma[k]))
        return logsumexp(log_lik, axis=1).sum()

    def fit(self, X: np.ndarray, verbose: bool = False) -> 'GaussianMixture':
        """EM 迭代"""
        np.random.seed(42)
        self._init_params(X)

        prev_ll  = -np.inf
        self.ll_history = []

        for iteration in range(self.max_iter):
            # E 步
            gamma = self._e_step(X)

            # M 步
            self._m_step(X, gamma)

            # 对数似然
            ll = self._log_likelihood(X)
            self.ll_history.append(ll)

            if verbose and iteration % 20 == 0:
                print(f"    迭代 {iteration:4d}: log-likelihood = {ll:.4f}")

            # 收敛判断
            if abs(ll - prev_ll) < self.tol:
                if verbose:
                    print(f"    第 {iteration} 步收敛 ✓")
                break
            prev_ll = ll

        self.gamma_ = gamma
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        """预测每个点的分量标签"""
        gamma = self._e_step(X)
        return gamma.argmax(axis=1)

    def score(self, X: np.ndarray) -> float:
        """平均对数似然"""
        return self._log_likelihood(X) / len(X)


# ── 完整实验 ────────────────────────────────────────────────────
print("\n完整实战:EM 算法拟合高斯混合模型")
print()

np.random.seed(42)

# 生成真实的 GMM 数据(3 个分量)
K_true  = 3
n_total = 300

true_params = [
    {"pi": 0.3, "mu": np.array([-3, 1]),  "Sigma": np.array([[1.0, 0.3], [0.3, 0.5]])},
    {"pi": 0.5, "mu": np.array([2, 2]),   "Sigma": np.array([[0.8, -0.2],[-0.2, 1.2]])},
    {"pi": 0.2, "mu": np.array([0, -3]),  "Sigma": np.array([[0.5, 0.0], [0.0, 2.0]])},
]

X_list  = []
y_true  = []
for k, p in enumerate(true_params):
    n_k = int(p["pi"] * n_total)
    X_k = multivariate_normal.rvs(p["mu"], p["Sigma"], n_k)
    X_list.append(X_k)
    y_true.extend([k] * n_k)

X      = np.vstack(X_list)
y_true = np.array(y_true)

print(f"  数据:{len(X)} 个样本,{K_true} 个真实分量,2 维")
print()

# 用 EM 拟合(K=3)
gmm = GaussianMixture(n_components=3, tol=1e-8, max_iter=300)
gmm.fit(X, verbose=True)

print()

# 结果对比
print("  参数估计结果:")
print()
print(f"  {'参数':^12} {'真实值':^30} {'EM 估计':^30}")
print("  " + "─" * 76)

for k in range(K_true):
    true  = true_params[k]
    est_pi = gmm.pi[k]
    est_mu = gmm.mu[k]

    print(f"  分量 {k+1}:π   {true['pi']:^30.3f} {est_pi:^30.4f}")
    print(f"          μ   {str(true['mu'].round(2)):^30} {str(est_mu.round(4)):^30}")
    print()

# 聚类准确率(排列对齐)
y_pred = gmm.predict(X)

# 排列匹配(Hungarian 算法的简化版)
from itertools import permutations
best_acc = 0.0
for perm in permutations(range(K_true)):
    y_mapped = np.array([perm[p] for p in y_pred])
    acc      = (y_mapped == y_true).mean()
    best_acc = max(best_acc, acc)

print(f"  聚类准确率(最优排列):{best_acc:.4f}")
print(f"  最终对数似然:{gmm.ll_history[-1]:.4f}")
print(f"  迭代次数:{len(gmm.ll_history)}")

# 对数似然收敛过程
print()
print("  对数似然收敛过程(每10步):")
print(f"  {'迭代':^8} {'log-likelihood':^18} {'增量':^14}")
print("  " + "─" * 44)
prev_ll = gmm.ll_history[0]
for i, ll in enumerate(gmm.ll_history):
    if i % 10 == 0 or i == len(gmm.ll_history) - 1:
        delta = ll - prev_ll if i > 0 else 0
        print(f"  {i:^8} {ll:^18.4f} {delta:^14.6f}")
    if i % 10 == 0:
        prev_ll = ll

print()
print("  观察:对数似然单调不减(EM 的理论保证)✓")

# EM 的局限性
print()
print("  EM 算法的局限性:")
limitations = [
    ("收敛到局部最大值",
     "EM 保证单调收敛,但不保证全局最大",
     "多次随机初始化,选最大对数似然"),
    ("收敛速度慢",
     "接近收敛时,每步改进越来越小",
     "使用 EM 的加速变体(ACEM 等)"),
    ("K 的选择",
     "需要事先指定分量数 K",
     "BIC/AIC 信息准则,或贝叶斯非参数模型"),
    ("初始化敏感",
     "不好的初始化可能导致糟糕的局部最大",
     "K-means++ 初始化,或谱方法初始化"),
]

for problem, detail, solution in limitations:
    print(f"  [{problem}]")
    print(f"    问题:{detail}")
    print(f"    解决:{solution}")
    print()

# EM 的推广
print("  EM 算法的推广:")
generalizations = [
    ("广义 EM(GEM)", "M 步不必找全局最大,只需提升 Q"),
    ("期望传播(EP)", "E 步用更精确的后验近似"),
    ("变分 EM(VEM)", "E 步用变分推理(第五篇)代替精确后验"),
    ("随机 EM(SEM)", "M 步用随机梯度(适合大数据)"),
    ("在线 EM",       "逐样本更新参数,流式数据"),
]
for name, desc in generalizations:
    print(f"  [{name}]:{desc}")

总结

本篇三个核心收获:

① 图模型提供统一语言:

P ( X 1 , … , X n ) = ∏ i P ( X i ∣ parents ( X i ) ) (贝叶斯网络) P(X_1,\ldots,X_n) = \prod_i P(X_i \mid \text{parents}(X_i)) \quad \text{(贝叶斯网络)} P(X1,,Xn)=iP(Xiparents(Xi))(贝叶斯网络)

朴素贝叶斯、HMM、GMM、LDA 都是图模型的特例,图结构直接揭示了条件独立性假设。

② d-分离让独立性可读:

  • X → M → Y X \to M \to Y XMY:给定 M M M X ⊥ Y X \perp Y XY
  • X ← M → Y X \leftarrow M \rightarrow Y XMY:给定 M M M X ⊥ Y X \perp Y XY
  • 对撞 X → M ← Y X \to M \leftarrow Y XMY:观测 M M M 反而激活 X X X- Y Y Y 的关联(伯克森悖论)

③ EM 算法从 ELBO 自然导出:

E  步 ⏟ 固定  θ ,更新  q M  步 ⏟ 固定  q ,更新  θ \underbrace{E\text{ 步}}_{\text{固定 }\theta\text{,更新 }q} \quad \underbrace{M\text{ 步}}_{\text{固定 }q\text{,更新 }\theta} 固定 θ,更新 q E 固定 q,更新 θ M 

每次迭代保证对数似然单调不减。GMM 是 EM 最经典的应用,E 步计算责任权重,M 步更新参数。

下一篇(收官)预告:现代应用——用本系列学到的所有概率论工具,重新审视 VAE、扩散模型和贝叶斯深度学习的数学本质。为什么 ELBO 出现在 VAE、扩散模型和 EM 算法三个地方?贝叶斯深度学习如何量化神经网络的不确定性?


💬 你用 EM 算法做过什么任务?有没有遇到过对撞节点(Collider)引起的虚假相关问题? 欢迎评论区分享!

🙏 如果这篇帮到你,点赞 + 收藏,系列收官篇即将发布!


本文为原创技术分享。代码在 Python 3.12 + NumPy + SciPy 下验证。最后更新:2026-05-21

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐