【学习笔记】图神经网络库 DGL 入门教程(backend pytorch)
dgl库笔记
DGL是目前非常流行的用于以知识图谱为代表的图神经网络研究的python包,在阅读项目代码GitHub@RE-Net 时发现该库非常重要, 几乎目前涉及GNN训练的情况需要使用该库进行网络搭建, 该项目代码的相关论文摘要参考【论文阅读】时间序列中的变量是一张知识图谱 ; 这篇paper目前应该算是时序知识图谱中的标杆, 它的模型评估是当前最好的, 主要使用的是自回归的神经网络以及一个RGCN的近邻聚合器;
笔者在本机CPU上跑通代码后开始尝试去跑GPU, 发现4G显存完全吃不住, 连预训练的部分都无法跑通, 于是决定先看一遍DGL官方文档 ; 不得不说这个User Guide真的写得实在是太好了, 区别于那些只是把接口函数的调用说明列得老长老长的库(比如torch
, sklearn
, 还有tensorflow
), DGL的User Guide层次清晰, 以图神经网络的搭建到训练的任务时间线为线索, 非常详细地介绍了如何使用DGL, 并且这篇User Guide更像是一篇综述性质地paper, 学一遍不仅是对GNN能有所了解, 而且对很多方法, 如消息传递, 近邻采样的数学原理也能了解, 图文并茂, 实乃不可多得的资源;
本文笔者主要是对DGL官方文档 的一个翻译, 截至本文发布, Chapter1-4部分官方文档已经有了中文翻译, 笔者在此基础上添加了一些个人理解的备注, 以供查阅; Chapter5-7目前只有英文, 笔者主要是做了一些翻译和备注, 使得阅读起来更加容易;
注意: 本文全部是以pytorch为后端的DGL使用;
PS:
- tensorflow可能并不是DGL的最佳选择, 在Chapter 7中可以看到, 分布式训练仅仅支持pytorch, 而无法使用tensorflow作为后端实现, 看来Google圈地自萌在自己的TPU上搞早晚是要与主流脱节了… (笔者瞎猜的…);
- 此外本文的学习本质上也是对
torch
的一个巩固, 其实DGL里面很多数据处理, 训练模型, 模型搭建, 包括自定义模块都与torch
是类似的, 总之强烈推荐去看一遍官方文档, 笔者大概前后看了整整两天, 个人认为花时间完整过一遍一定不会吃亏的; - 目录里的杂记章节可能会不定期更新;
目录
1 DGL的安装
DGL官方文档 的安装方法似乎有些繁琐, 直接下载wheel文件安装即可;
- 非CUDA版本的
dgl
库, 去清华镜像dgl仓库 下载对应版本的whl文件直接用pip install
安装即可; - CUDA版本的
dgl
库, 目前有五种不同的dgl
库对应不同的CUDA版本:
- 清华镜像dgl-cu90仓库 ;
- 清华镜像dgl-cu92仓库 ;
- 清华镜像dgl-cu100仓库 ;
- 清华镜像dgl-cu101仓库 ;
- 清华镜像dgl-cu102仓库 ;
- 备注:
- 安装所有依赖CUDA的库之前一定先把CUDA安装好,
dgl
直接在库命名上就给定了对应的CUDA版本,tensorflow-gpu
则还要查表看不同版本库需要的CUDA支持标准, torch和torchvision可以在https://download.pytorch.org/whl/torch_stable.html 下载, 该repository中也注明了对应的CUDA版本; - 虽然不同库对CUDA版本的依赖会有区别, 但是总之CUDA版本越高越好, 低版本的CUDA早晚会被淘汰,
dgl
最低都到CUDA9.0了,tensorflow-gpu
从2.0.0
开始就至少需要CUDA10.0, 所以建议跑GPU的PC机就不要装乱七八糟的软件了, 不如多装几个版本的CUDA来得实在; 现在CUDA安装配置还挺快捷, 笔者WIN10+1650Ti显卡(N卡)+固态硬盘的配置十几分钟就能装配好一个版本的CUDA, 而且从NVIDIA官网下载安装包似乎非常快,3G
左右的离线安装包用半小时不到就能下载好, 似乎是有国内代理, 比以前靠谱多了;
- 安装所有依赖CUDA的库之前一定先把CUDA安装好,
2 DGL的后端
通过修改C:\Users\caoyang\.dgl\config.json
中的配置值可以修改dgl
库的默认后端, 一般来说就pytorch
和tensorflow
两种, DGL官方文档 额外提到一种MXNet
的后端, 不过它后面的章节基本上以pytorch
为例写的, 其他两种后端都没有怎么提及, 看起来似乎torch
的势头有点反超tensorflow
, Google的tensorflow
在自己的TPU上圈地自萌, 把N卡A卡让给其他开源开发者, 总之笔者是觉得tensorflow
越来越不好用了, 各种意义上的不好用… 而且近期看得几篇近一年内发表的paper, 项目代码都是基于torch
写的, 见仁见智吧, 对于打工人可能也只有全都学一条路可走…
3 一个有趣的入门示例
DGL官方文档 给了一个非常有趣的入门示例;
3.1 从"Zachary’s karate club" Problem讲起
- 如下图所示, “Zachary’s karate club” Problem定义在一个包括34个成员的空手道俱乐部里的社交网络上, 俱乐部分为两个社区, 由教员(节点0)和俱乐部主席(节点33)领导, 分别以不同颜色的圆点表示, 问题目标是希望能够预测出每个成员将更倾向于加入哪一个社区;
- "Zachary’s karate club" Problem 图描述:
3.2 第一步: 使用DGL创建图
- DGL定义图的方法并非常见的邻接矩阵或出入度链表形式, 而是直接将所有边的出节点和入节点用两个
list
存储, 这样的好处是对于稀疏图(即邻接矩阵系数)可以大大减少存储成本, 且无需额外记录图的节点, 直接将两个list
拼接后去重就可以得到所有节点, 不过离群点(出度与入度都为零的节点)是不会被考虑进来的;
- 建图代码如下所示:
import dgl import numpy as np def build_karate_club_graph(): # All 78 edges are stored in two numpy arrays. One for source endpoints # while the other for destination endpoints. src = np.array([1, 2, 2, 3, 3, 3, 4, 5, 6, 6, 6, 7, 7, 7, 7, 8, 8, 9, 10, 10, 10, 11, 12, 12, 13, 13, 13, 13, 16, 16, 17, 17, 19, 19, 21, 21, 25, 25, 27, 27, 27, 28, 29, 29, 30, 30, 31, 31, 31, 31, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33]) dst = np.array([0, 0, 1, 0, 1, 2, 0, 0, 0, 4, 5, 0, 1, 2, 3, 0, 2, 2, 0, 4, 5, 0, 0, 3, 0, 1, 2, 3, 5, 6, 0, 1, 0, 1, 0, 1, 23, 24, 2, 23, 24, 2, 23, 26, 1, 8, 0, 24, 25, 28, 2, 8, 14, 15, 18, 20, 22, 23, 29, 30, 31, 8, 9, 13, 14, 15, 18, 19, 20, 22, 23, 26, 27, 28, 29, 30, 31, 32]) # Edges are directional in DGL; Make them bi-directional. u = np.concatenate([src, dst]) v = np.concatenate([dst, src]) # Construct a DGLGraph return dgl.DGLGraph((u, v)) G = build_karate_club_graph() print('We have %d nodes.' % G.number_of_nodes()) print('We have %d edges.' % G.number_of_edges())
- 输出结果:
We have 34 nodes. We have 156 edges.
- 使用
networkx
对图进行可视化:
- 如果希望在jupyter notebook中显示
nx.draw()
得到的绘图结果, 需要在代码中添加%matplotlib inline
注解; - 可视化代码如下所示:
import networkx as nx %matplotlib inline # Since the actual graph is undirected, we convert it for visualization # purpose. nx_G = G.to_networkx().to_undirected() # Kamada-Kawaii layout usually looks pretty for arbitrary graphs pos = nx.kamada_kawai_layout(nx_G) nx.draw(nx_G, pos, with_labels=True, node_color=[[.7, .7, .7]])
- 可视化结果:
3.3 第二步: 为图的边和图的节点赋值
DGLGraph
图的边和节点都可以进行赋值, 所谓赋值可以理解为添加特征, 特征当然可以不止一个, 下面的示例是给所有节点添加名为feat
的特征, 如果需要给边赋值则将ndata
替换为edata
即可;
- 代码示例:
# In DGL, you can add features for all nodes at once, using a feature tensor that # batches node features along the first dimension. The code below adds the learnable # embeddings for all nodes: import torch import torch.nn as nn import torch.nn.functional as F embed = nn.Embedding(34, 5) # 34 nodes with embedding dim equal to 5 G.ndata['feat'] = embed.weight # print out node 2's input feature print(G.ndata['feat'][2]) # print out node 10 and 11's input features print(G.ndata['feat'][[10, 11]])
- 输出结果:
tensor([ 0.4228, -1.1062, -0.1551, 1.1317, 0.9008], grad_fn=<SelectBackward>) tensor([[ 0.3872, 0.9674, -0.0219, 0.3755, -0.6305], [-0.7338, -0.4529, 1.1352, -0.6787, -1.0478]], grad_fn=<IndexBackward>)
3.4 第三步: 定义图卷积网络(GCN)
- 图卷积网络(Graph Convolutional Network, 下简称为GCN)最初在https://arxiv.org/abs/1609.02907 被提出, 简而言之就是在GCN的第 l l l层, 图中每个节点 v i l v_i^l vil都会带有一个特征向量 h i l h_i^l hil, 然后每个节点会通过图中的有向边进行特征传输, 一个最简单的例子如下图所示, 即每个节点的特征值更新为所有指向它的节点的特征值之和, 然后使用一个激活函数 f f f映射后的结果:
- 这其中就包含了一个消息传递的概念, 在DGL官方文档 中有详细说明;
- 创建GCN的示例代码:
from dgl.nn.pytorch import GraphConv class GCN(nn.Module): def __init__(self, in_feats, hidden_size, num_classes): super(GCN, self).__init__() self.conv1 = GraphConv(in_feats, hidden_size) self.conv2 = GraphConv(hidden_size, num_classes) def forward(self, g, inputs): h = self.conv1(g, inputs) h = torch.relu(h) h = self.conv2(g, h) return h # The first layer transforms input features of size of 5 to a hidden size of 5. # The second layer transforms the hidden layer and produces output features of # size 2, corresponding to the two groups of the karate club. net = GCN(5, 5, 2)
3.5 第四步: 数据预处理与初始化
简单定义所有的节点编号以及分类标签的编号, 以及模型的输入初始值inputs
;
inputs = embed.weight
labeled_nodes = torch.tensor([0, 33]) # only the instructor and the president nodes are labeled
labels = torch.tensor([0, 1]) # their labels are different
3.6 第五步: 图模型训练与可视化
- 模型训练本质与
torch
模型训练没有区别, 代码与输出结果如下所示:
- 训练代码示例:
import itertools optimizer = torch.optim.Adam(itertools.chain(net.parameters(), embed.parameters()), lr=0.01) all_logits = [] for epoch in range(50): logits = net(G, inputs) # we save the logits for visualization later all_logits.append(logits.detach()) logp = F.log_softmax(logits, 1) # we only compute loss for labeled nodes loss = F.nll_loss(logp[labeled_nodes], labels) optimizer.zero_grad() loss.backward() optimizer.step() print('Epoch %d | Loss: %.4f' % (epoch, loss.item()))
- 输出结果:
Epoch 0 | Loss: 0.8385 Epoch 1 | Loss: 0.8092 Epoch 2 | Loss: 0.7829 Epoch 3 | Loss: 0.7614 Epoch 4 | Loss: 0.7426 Epoch 5 | Loss: 0.7266 Epoch 6 | Loss: 0.7128 Epoch 7 | Loss: 0.6996 Epoch 8 | Loss: 0.6895 Epoch 9 | Loss: 0.6809 Epoch 10 | Loss: 0.6723 Epoch 11 | Loss: 0.6639 Epoch 12 | Loss: 0.6555 Epoch 13 | Loss: 0.6467 Epoch 14 | Loss: 0.6376 Epoch 15 | Loss: 0.6282 Epoch 16 | Loss: 0.6188 Epoch 17 | Loss: 0.6095 Epoch 18 | Loss: 0.5996 Epoch 19 | Loss: 0.5893 Epoch 20 | Loss: 0.5783 Epoch 21 | Loss: 0.5670 Epoch 22 | Loss: 0.5552 Epoch 23 | Loss: 0.5430 Epoch 24 | Loss: 0.5300 Epoch 25 | Loss: 0.5170 Epoch 26 | Loss: 0.5037 Epoch 27 | Loss: 0.4903 Epoch 28 | Loss: 0.4767 Epoch 29 | Loss: 0.4621 Epoch 30 | Loss: 0.4471 Epoch 31 | Loss: 0.4316 Epoch 32 | Loss: 0.4163 Epoch 33 | Loss: 0.4006 Epoch 34 | Loss: 0.3838 Epoch 35 | Loss: 0.3662 Epoch 36 | Loss: 0.3481 Epoch 37 | Loss: 0.3295 Epoch 38 | Loss: 0.3103 Epoch 39 | Loss: 0.2908 Epoch 40 | Loss: 0.2716 Epoch 41 | Loss: 0.2526 Epoch 42 | Loss: 0.2339 Epoch 43 | Loss: 0.2157 Epoch 44 | Loss: 0.1981 Epoch 45 | Loss: 0.1812 Epoch 46 | Loss: 0.1647 Epoch 47 | Loss: 0.1483 Epoch 48 | Loss: 0.1326 Epoch 49 | Loss: 0.1179
- 可视化方法:
- 画出训练时单个Epoch的图结构:
import matplotlib.animation as animation import matplotlib.pyplot as plt def draw(i): cls1color = '#00FFFF' cls2color = '#FF00FF' pos = {} colors = [] for v in range(34): pos[v] = all_logits[i][v].numpy() cls = pos[v].argmax() colors.append(cls1color if cls else cls2color) ax.cla() ax.axis('off') ax.set_title('Epoch: %d' % i) nx.draw_networkx(nx_G.to_undirected(), pos, node_color=colors, with_labels=True, node_size=300, ax=ax) fig = plt.figure(dpi=150) fig.clf() ax = fig.subplots() draw(0) # draw the prediction of the first epoch plt.close()
- 绘图结果:
- 通过添加下面的代码可以实现动态图(但是笔者并没有实现怎么动态化… 可能是jupyter notebook缺少相应插件):
ani = animation.FuncAnimation(fig, draw, frames=len(all_logits), interval=200)
Chapter 1: 图
1.1 关于图的基本概念
详见DGL官方文档 文字描述;
- 这里主要有同构图(homogeneous graph)和异构图(heterogeneous graph)两个概念需要注意, 这在paper里是经常会被提到的;
- 大部分深度学习考虑的问题通过数学抽象得到的图都是异构图, 即不同的节点会有不同的属性, 不同的边代表不同的含义, 最常见的异构图就是由RDF三元组 ( s , r , o ) (s,r,o) (s,r,o)构建的知识图谱, 每个节点表示不同的实体, 由不同的嵌入表示, 不同的边表示不同的实体关系;
- 同构图相对简单, 每个节点和每条边本质上都是相同的, 如在考虑社交网络分布时, 每个人都被同等的看待, 关系也视为单纯的社交关系, 一般来说像运筹优化领域的最大流, 旅行商等问题的抽象都是可以视为是一种同构图, 可以通过数学方法进行求解; 通常一个运筹优化问题从同构图拓展到异构图上就会变得复杂无比…
- 异构图在本章第5节被详细描述;
1.2 图, 节点和边
- 正如在入门示例章节中提到的那样,
dgl
库通过存储所有边的出入节点来构建图, 节点一般使用自然数进行编号; - 使用
dgl.graph()
可以创建一个DGLGraph
对象, 本章第4节介绍了从其他图网络库的实例化对象直接构建图的方法(如networkx
库);
- 创建图示例代码:
import dgl import torch as th # edges 0->1, 0->2, 0->3, 1->3 u, v = th.tensor([0, 0, 0, 1]), th.tensor([1, 2, 3, 3]) g = dgl.graph((u, v)) print(g) # number of nodes are inferred from the max node IDs in the given edges # Node IDs print(g.nodes()) # Edge end nodes print(g.edges()) # Edge end nodes and edge IDs print(g.edges(form='all')) # If the node with the largest ID is isolated (meaning no edges), # then one needs to explicitly set the number of nodes g = dgl.graph((u, v), num_nodes=8)
- 输出结果:
Graph(num_nodes=4, num_edges=4, ndata_schemes={} edata_schemes={}) tensor([0, 1, 2, 3]) (tensor([0, 0, 0, 1]), tensor([1, 2, 3, 3])) (tensor([0, 0, 0, 1]), tensor([1, 2, 3, 3]), tensor([0, 1, 2, 3]))
- 特别地, 可以通过为每条边都创建两个方向的边, 来实现定义无向图, 此时可以使用
dgl.to_bidirected()
函数来实现这个目的, 该函数可以把原图转换成一个包含反向边的图;
- 反向图示例代码:
bg = dgl.to_bidirected(g) bg.edges()
- 输出结果:
(tensor([0, 0, 0, 1, 1, 2, 3, 3]), tensor([1, 2, 3, 0, 3, 0, 0, 1]))
- 最后教程提到尽量使用
tensor
作为dgl.graph()
的参数输入, 不过也支持array
和list
进行快速测试, 前者相对来说在资源处理上更优化; 且可以通过配置dgl.graph()
的参数idtype
来修正图存储的数据类型, 比如将默认值int64
改为int32
就可以大大节约存储空间; 具体数据类型转换如下所示:
- 数据类型示例代码:
edges = th.tensor([2, 5, 3]), th.tensor([3, 5, 0]) # edges 2->3, 5->5, 3->0 g64 = dgl.graph(edges) # DGL uses int64 by default print(g64.idtype) g32 = dgl.graph(edges, idtype=th.int32) # create a int32 graph g32.idtype g64_2 = g32.long() # convert to int64 g64_2.idtype g32_2 = g64.int() # convert to int32 g32_2.idtype
- 输出结果:
torch.int64 torch.int32 torch.int64 torch.int32
- 相关接口方法:
dgl.graph()
;dgl.DGLGraph.nodes()
;dgl.DGLGraph.edges()
;dgl.to_bidirected()
;dgl.DGLGraph.int()
;dgl.DGLGraph.long()
;dgl.DGLGraph.idtype
;
1.3 节点与边的特征
- 正如入门示例中提到的那样, 每个特征都会由一个特征名, 以类似字典的形式存储在
DGLGraph
对象的ndata
和edata
中:
- 节点与边的特征定义代码示例:
import dgl import torch as th g = dgl.graph(([0, 0, 1, 5], [1, 2, 2, 0])) # 6 nodes, 4 edges print(g) g.ndata['x'] = th.ones(g.num_nodes(), 3) # node feature of length 3 g.edata['x'] = th.ones(g.num_edges(), dtype=th.int32) # scalar integer feature print(g) # different names can have different shapes g.ndata['y'] = th.randn(g.num_nodes(), 5) print(g.ndata['x'][1]) # get node 1's feature print(g.edata['x'][th.tensor([0, 3])]) # get features of edge 0 and 3
- 输出结果:
Graph(num_nodes=6, num_edges=4, ndata_schemes={} edata_schemes={}) Graph(num_nodes=6, num_edges=4, ndata_schemes={'x': Scheme(shape=(3,), dtype=torch.float32)} edata_schemes={'x': Scheme(shape=(), dtype=torch.int32)}) tensor([1., 1., 1.]) tensor([1, 1], dtype=torch.int32)
- 关于
ndata
和edata
的注意点:- 特征值只能是数值类型, 但不局限于标量, 可以是向量与张量;
- 如果使用张量赋值, 则张量的第一维必须与节点或边的数量相同(即默认给每个节点或每条边都赋值, 而不能给图中部分节点或边赋值);
- 所有节点或所有边的同名特征必须具有同样的维度, 如不能部分节点的嵌入特征是256维, 其他的则是512维, 需要进行padding, 其实本质上就是要求张量赋值时必须是完整的多面体, 不能边边角角缺了一些;
- 相关接口方法:
dgl.DGLGraph.ndata
;dgl.DGLGraph.edata
;
1.4 从外部源创建图
- 从
scipy
稀疏矩阵创建图:
- 代码示例:
import dgl import torch as th import scipy.sparse as sp spmat = sp.rand(100, 100, density=0.05) # 5% nonzero entries print(dgl.from_scipy(spmat)) # from SciPy
- 输出结果:
Graph(num_nodes=100, num_edges=500, ndata_schemes={} edata_schemes={})
- 从
networkx
图创建图:
- 代码示例:
import networkx as nx nx_g = nx.path_graph(5) # a chain 0-1-2-3-4 print(dgl.from_networkx(nx_g)) # from networkx nxg = nx.DiGraph([(2, 1), (1, 2), (2, 3), (0, 0)]) print(dgl.from_networkx(nxg))
- 输出结果:
Graph(num_nodes=5, num_edges=8, ndata_schemes={} edata_schemes={}) Graph(num_nodes=4, num_edges=4, ndata_schemes={} edata_schemes={})
- 注意
nx.path_graph(5)
会转成8条边, 原因是这是一个networkx
库的无向图, 而DGLGraph
必须是有向图, 所以给每条边都定义了正反两个方向;
- 注意
- 相关接口方法:
dgl.from_scipy()
;dgl.from_networkx()
;
1.5 异构图
- 创建异构图:
dgl.heterograph
- 相对来说异构图的创建比较麻烦, 需要类似RDF三元组 ( s , r , o ) (s,r,o) (s,r,o)的方法来定义图;
- 显然同构图只是异构图的一种特殊情况, 因此也可以用异构图的方法来定义;
- 代码示例:
import dgl import torch as th # Create a heterograph with 3 node types and 3 edges types. graph_data = { ('drug', 'interacts', 'drug'): (th.tensor([0, 1]), th.tensor([1, 2])), ('drug', 'interacts', 'gene'): (th.tensor([0, 1]), th.tensor([2, 3])), ('drug', 'treats', 'disease'): (th.tensor([1]), th.tensor([2])) } g = dgl.heterograph(graph_data) print(g.ntypes) print(g.etypes) print(g.canonical_etypes) print(g) print(g.metagraph().edges()) print(g.num_nodes()) print(g.num_nodes('drug')) print(g.nodes('drug'))
- 输出结果:
['disease', 'drug', 'gene'] ['interacts', 'interacts', 'treats'] [('drug', 'interacts', 'drug'), ('drug', 'interacts', 'gene'), ('drug', 'treats', 'disease')] Graph(num_nodes={'disease': 3, 'drug': 3, 'gene': 4}, num_edges={('drug', 'interacts', 'drug'): 2, ('drug', 'interacts', 'gene'): 2, ('drug', 'treats', 'disease'): 1}, metagraph=[('drug', 'drug', 'interacts'), ('drug', 'gene', 'interacts'), ('drug', 'disease', 'treats')]) [('drug', 'drug'), ('drug', 'gene'), ('drug', 'disease')] 10 3 tensor([0, 1, 2])
- 注意到异构图中多出一个
metagraph
, 本质是记录该异构图中所有不重复的RDF三元组 ( s , r , o ) (s,r,o) (s,r,o)
- 注意到异构图中多出一个
- 相关接口:
dgl.heterograph()
;ntypes
;etypes
;canonical_etypes
;metagraph
;num_nodes()
: 不加参数就是所有节点数, 可以添加参数找出特定名称的节点总数;nodes()
: 必须加参数(节点名称), 返回所有该节点的编号张量;
- 异构图的特征赋值:
- 本质与同构图区别不大, 都可以对节点或边进行特征赋值, 接口函数稍有差异;
- 代码示例:
# Set/get feature 'hv' for nodes of type 'drug' g.nodes['drug'].data['hv'] = th.ones(3, 1) print(g.nodes['drug'].data['hv']) # Set/get feature 'he' for edge of type 'treats' g.edges['treats'].data['he'] = th.zeros(1, 1) print(g.edges['treats'].data['he'])
- 输出结果:
tensor([[1.], [1.], [1.]]) tensor([[0.]])
- 异构图的子图: 从边来取子图;
- 代码示例: 使用
dgl.edge_type_subgraph
方法;g = dgl.heterograph({ ('drug', 'interacts', 'drug'): (th.tensor([0, 1]), th.tensor([1, 2])), ('drug', 'interacts', 'gene'): (th.tensor([0, 1]), th.tensor([2, 3])), ('drug', 'treats', 'disease'): (th.tensor([1]), th.tensor([2])) }) g.nodes['drug'].data['hv'] = th.ones(3, 1) # Retain relations ('drug', 'interacts', 'drug') and ('drug', 'treats', 'disease') # All nodes for 'drug' and 'disease' will be retained eg = dgl.edge_type_subgraph(g, [('drug', 'interacts', 'drug'), ('drug', 'treats', 'disease')]) print(eg) # The associated features will be copied as well print(eg.nodes['drug'].data['hv'])
- 输出结果:
Graph(num_nodes={'disease': 3, 'drug': 3}, num_edges={('drug', 'interacts', 'drug'): 2, ('drug', 'treats', 'disease'): 1}, metagraph=[('drug', 'drug', 'interacts'), ('drug', 'disease', 'treats')]) tensor([[1.], [1.], [1.]])
- 异构图转为同构图:
-
异构图为管理不同类型的节点和边及其相关特征提供了一个清晰的接口; 这在以下情况下尤其有用:
- 不同类型的节点和边的特征具有不同的数据类型或大小;
- 用户希望对不同类型的节点和边应用不同的操作;
-
如果上述情况不适用, 并且用户不希望在建模中区分节点和边的类型, 则
dgl
允许使用dgl.DGLGraph.to_homogeneous()
A将异构图转换为同构图, 具体算法如下:- 用从0开始的连续整数重新标记所有类型的节点和边;
- 对所有的节点和边合并用户指定的特征;
-
代码示例:
g = dgl.heterograph({ ('drug', 'interacts', 'drug'): (th.tensor([0, 1]), th.tensor([1, 2])), ('drug', 'treats', 'disease'): (th.tensor([1]), th.tensor([2]))}) g.nodes['drug'].data['hv'] = th.zeros(3, 1) g.nodes['disease'].data['hv'] = th.ones(3, 1) g.edges['interacts'].data['he'] = th.zeros(2, 1) g.edges['treats'].data['he'] = th.zeros(1, 2) # By default, it does not merge any features hg = dgl.to_homogeneous(g) print('hv' in hg.ndata) # Copy node features hg = dgl.to_homogeneous(g, ndata=['hv']) print(hg.ndata['hv']) # Copy edge features # For feature copy, it expects features to have # the same size and dtype across node/edge types hg = dgl.to_homogeneous(g, edata=['he'])
-
输出结果:
False tensor([[1.], [1.], [1.], [0.], [0.], [0.]]) DGLError: Cannot concatenate column he with shape Scheme(shape=(2,), dtype=torch.float32) and shape Scheme(shape=(1,), dtype=torch.float32)
-
续: 代码示例:
# Order of node types in the heterograph print(g.ntypes) # Original node types print(hg.ndata[dgl.NTYPE]) # Original type-specific node IDs print(hg.ndata[dgl.NID]) # Order of edge types in the heterograph print(g.etypes) # Original edge types print(hg.edata[dgl.ETYPE]) # Original type-specific edge IDs print(hg.edata[dgl.EID])
-
输出结果:
['disease', 'drug'] tensor([0, 0, 0, 1, 1, 1]) tensor([0, 1, 2, 0, 1, 2]) ['interacts', 'treats'] tensor([0, 0, 1]) tensor([0, 1, 0])
- 发现原始的节点或边的类型和对应的ID被存储在
ndata
和edata
中;
- 发现原始的节点或边的类型和对应的ID被存储在
-
其他注意事项:
- 出于建模的目的, 用户可能需要将一些关系合并, 并对它们应用相同的操作; 为了实现这一目的, 可以先抽取异构图的边类型子图(使用上面提到的
dgl.edge_type_subgraph
方法), 然后将该子图转换为同构图。
- 出于建模的目的, 用户可能需要将一些关系合并, 并对它们应用相同的操作; 为了实现这一目的, 可以先抽取异构图的边类型子图(使用上面提到的
- 保存与加载模型:
dgl.save_graphs(filename, g_list, labels=None)
;- 显然这里的
g_list
是一个list
, 里面可以放多个图, 如[g1, g2]
; labels
应当是一个str2Tensor
的字典;
import dgl import torch as th g1 = dgl.graph(([0, 1, 2], [1, 2, 3])) g2 = dgl.graph(([0, 2], [2, 3])) g2.edata["e"] = th.ones(2, 4) from dgl.data.utils import save_graphs graph_labels = {"glabel": th.tensor([0, 1])} save_graphs("./data.bin", [g1, g2], graph_labels)
- 显然这里的
dgl.load_graphs(filename, idx_list=None)
;- 当保存时是保存了多个图的话, 就需要定义
idx_list
以便于区分不同图了, 这是一个整数列表; - 返回值为
graph_list
和labels
, 后者即保存时定义的labels
;
from dgl.data.utils import load_graphs glist, label_dict = load_graphs("./data.bin") # glist will be [g1, g2] glist, label_dict = load_graphs("./data.bin", [0]) # glist will be [g1]
- 当保存时是保存了多个图的话, 就需要定义
1.6 在GPU上使用DGLGraph
- 两种方法创建GPU上的
DGLGraph
:
-
用两个已经存储在GPU上的
tensor
来创建DGLGraph
; -
使用
dgl.DGLGraph.to(device)
方法将DGLGraph
移动到指定device
的cuda上; -
代码示例:
import dgl import torch as th u, v = th.tensor([0, 1, 2]), th.tensor([2, 3, 4]) g = dgl.graph((u, v)) g.ndata['x'] = th.randn(5, 3) # original feature is on CPU print(g.device) cuda_g = g.to('cuda:0') # accepts any device objects from backend framework print(cuda_g.device) print(cuda_g.ndata['x'].device) # feature data is copied to GPU too # A graph constructed from GPU tensors is also on GPU u, v = u.to('cuda:0'), v.to('cuda:0') g = dgl.graph((u, v)) print(g.device)
-
输出结果:
cpu cuda:0 cuda:0 cuda:0
- 任何涉及GPU图的操作都是在GPU上运行的; 因此, 这要求所有张量参数都已经放在GPU上, 其结果(图或张量)也将在GPU上; 此外, GPU图只接受GPU上的特征数据;
- 代码说明:
print(cuda_g.in_degrees()) print(cuda_g.in_edges([2, 3, 4])) # ok for non-tensor type arguments print(cuda_g.in_edges(th.tensor([2, 3, 4]).to('cuda:0'))) # tensor type must be on GPU cuda_g.ndata['h'] = th.randn(5, 4) # ERROR! feature must be on GPU too!
- 输出结果:
tensor([0, 0, 1, 1, 1], device='cuda:0') (tensor([0, 1, 2], device='cuda:0'), tensor([2, 3, 4], device='cuda:0')) (tensor([0, 1, 2], device='cuda:0'), tensor([2, 3, 4], device='cuda:0')) DGLError: Cannot assign node feature "h" on device cpu to a graph on device cuda:0. Call DGLGraph.to() to copy the graph to the same device.
- 这里笔者之前在跑GitHub@RE-Net 时就一直出这里这个错误, 用
graph.to(device)
不好使, 最后只能是手动把torch.tensor
全部转到cuda上, 恶心了很久;
Chapter 2: 消息传递
- 消息传递: 实现图神经网络的一种通用框架和编程范式;
- 假设节点
v
v
v上的特征为
x
v
∈
R
d
1
x_v\in \mathbb{R}^{d_1}
xv∈Rd1, 边
(
u
,
v
)
(u,v)
(u,v)上的特征为
w
e
∈
R
d
2
w_e\in \mathbb{R}^{d_2}
we∈Rd2, 消息传递定义如下两种运算:
- 边上计算: m e t + 1 = ϕ ( x v ( t ) , x u ( t ) , w e ( t ) ) ( u , v , e ) ∈ E m_e^{{t+1}}=\phi(x_v^{(t)},x_u^{(t)},w_e^{(t)})\quad (u,v,e)\in \mathcal{E} met+1=ϕ(xv(t),xu(t),we(t))(u,v,e)∈E
- 点上计算: x v t + 1 = ψ ( x v ( t ) , ρ ( { m e ( t + 1 ) : ( u , v , e ) ∈ E } ) ) x_v^{{t+1}}=\psi(x_v^{(t)},\rho(\{m_e^{(t+1)}: (u,v,e)\in \mathcal{E}\})) xvt+1=ψ(xv(t),ρ({me(t+1):(u,v,e)∈E}))
- 其中:
- ϕ \phi ϕ是定义在每条边上的消息函数, 它通过将边上特征与其两端节点的特征相结合来生成消息;
- 聚合函数
ρ
\rho
ρ会聚合节点接受到的消息;
- 笔者经验是这个函数是非常重要的, GitHub@RE-Net 中提到了三种不同的聚合器, 如均值池化的聚合器, 带注意力机制的聚合器, 以及现在广为使用的RGCN聚合器;
- 更新函数 ψ \psi ψ会结合聚合后的消息和节点本身的特征来更新节点的特征;
2.1 内置的消息传递接口函数
- 消息函数 ϕ \phi ϕ:
- 参数:
edges
, 类型为dgl.EdgeBatch
;edges
有src
,dst
和data
三个成员属性, 分别用于访问源节点, 目标节点和边的特征;
- 聚合函数 ρ \rho ρ:
- 参数:
nodes
, 类型为dgl.NodeBatch
;nodes
有成员属性mailbox
可以用来访问节点收到的消息;
- 简单的聚合函数如求和, 均值, 取最大值, 取最小值等;
- 更新函数 ψ \psi ψ:
- 参数:
nodes
, 类型为dgl.NodeBatch
; 与聚合函数中的参数相同;
- 此函数对聚合函数的聚合结果进行操作, 通常在消息传递的最后一步将其与节点的特征相结合, 并将输出作为节点的新特征;
dgl.function
: https://docs.dgl.ai/api/python/dgl.function.html ;
- 该模块中定义了许多内置的聚合函数和消息函数, 官方文档建议使用内置函数, 因为它们经过了大量优化, 并且可以自动处理维度广播;
- 如果用户的消息传递函数无法用内置函数实现, 则可以实现自己的消息或聚合函数;
- 内置的消息函数可以是一元函数或二元函数:
- 对于一元函数, 支持
copy
函数; - 对于二元函数, 支持
add
,sub
,mul
,div
,dot
函数; - 消息的内置函数的命名约定是
u
表示源节点,v
表示目标节点,e
表示边; - 这些函数的参数是字符串, 指示相应节点和边的输入和输出特征字段名;
- 要对源节点的
hu
特征和目标节点的hv
特征求和, 然后将结果保存在边的he
特征上, 用户可以使用内置函数dgl.function.u_add_v('hu', 'hv', 'he')
; - 以下用户定义消息函数与此内置函数等价:
def message_func(edges): return {'he': edges.src['hu'] + edges.dst['hv']}
- 要对源节点的
- 对于一元函数, 支持
- 内置的聚合函数:
- 支持
sum
,max
,min
,mean
操作; - 聚合函数通常由两个参数, 类型都是字符串;
- 一个用于指定
mailbox
中的字段名; - 一个用于指示目标节点特征的字段名;
- 如
dgl.function.sum('m', 'h')
等价于如下所示的对接收到消息求和的用户定义函数:import torch def reduce_func(nodes): return {'h': torch.sum(nodes.mailbox['m'], dim=1)}
- 一个用于指定
- 用户自定义函数见本文附录章节中的链接;
- 支持
- 在不涉及消息传递时, 也可以通过
apply_edges()
单独调用逐边计算:
- 参数为一个消息函数;
- 默认该接口会更新所有边;
- 举例:
import dgl.function as fn graph.apply_edges(fn.u_add_v('el', 'er', 'e'))
- 消息传递高级接口:
update_all()
- 该接口函数中合并了消息生成, 消息聚合, 节点特征更新, 这为从整体上进行系统优化提供了空间;
- 参数为一个消息函数, 一个聚合函数, 一个更新函数(optional):
- 当更新函数不给定时, 可以在
update_all
完成后直接对节点特征进行操作; - 由于更新函数通常可以用纯张量操作实现, 官方文档不推荐在
update_all
中指定更新函数;
- 当更新函数不给定时, 可以在
- 举例:
def updata_all_example(graph): # store the result in graph.ndata['ft'] graph.update_all(fn.u_mul_e('ft', 'a', 'm'), fn.sum('m', 'ft')) # Call update function outside of update_all final_ft = graph.ndata['ft'] * 2 return final_ft
- 这段代码中源节点特征
ft
与边特征a
相乘生成消息m
, 然后对所有消息求和来更新节点特征ft
, 再将ft
乘以2得到最终结果final_ft
; 调用结束后, 中间消息将被清楚; 数学公式如下: f i n a l _ f t i = 2 ⋅ ∑ j ∈ N ( i ) ( f t j ⋅ a i j ) {\rm final\_ft}_i=2\cdot \sum_{j\in \mathcal{N}(i)}({\rm ft}_j\cdot a_{ij}) final_fti=2⋅j∈N(i)∑(ftj⋅aij)
- 这段代码中源节点特征
2.2 如何编写高效的消息传递代码
关于
dgl
内置函数是如何优化消息传递的内存消耗和计算速度的, 详见文字描述: DGL官方文档 ; 总结来说主要是合并内核, 并行逐边运算, 减少点边拷贝等; 如update_all()
函数就是一个效率很高的接口; 如果确实需要使用apply_edges()
函数在边上保存消息, 则内存占用会非常大;
- 一个通过对节点特征降维来减少消息维度的示例:
- 拼接源节点与目标节点特征, 然后应用一个线性层: W × ( u ∣ ∣ v ) W\times (u||v) W×(u∣∣v);
- 这样源节点与目标节点特征维数较高, 而线性层输出维数较低;
- 代码示例:
import torch import torch.nn as nn linear = nn.Parameter(torch.FloatTensor(size=(1, node_feat_dim * 2))) def concat_message_function(edges): return {'cat_feat': torch.cat([edges.src.ndata['feat'], edges.dst.ndata['feat']])} g.apply_edges(concat_message_function) g.edata['out'] = g.edata['cat_feat'] * linear
- 也可以将先行操作分成两部分, 即分别对源节点特征和目标节点特征进行线性变换后再相加, 即 W l × u + W r × v W_l\times u+W_r\times v Wl×u+Wr×v, 其中 W = ( W l ∣ ∣ W r ) W=(W_l||W_r) W=(Wl∣∣Wr), 这样可能会更加优化:
- 代码示例:
import dgl.function as fn linear_src = nn.Parameter(torch.FloatTensor(size=(1, node_feat_dim))) linear_dst = nn.Parameter(torch.FloatTensor(size=(1, node_feat_dim))) out_src = g.ndata['feat'] * linear_src out_dst = g.ndata['feat'] * linear_dst g.srcdata.update({'out_src': out_src}) g.dstdata.update({'out_dst': out_dst}) g.apply_edges(fn.u_add_v('out_src', 'out_dst', 'out'))
- 这两种方法数学上等价, 但后一种方法更加高效, 因为无需再边上保存
feat_src
和feat_dst
, 空间占用小, 另外加法可以直接用内置函数u_add_v
进行优化, 内置函数的效率一般比自定义函数要高;
2.3 在图的一部分上进行消息传递
之前有提到过, dgl的特征赋值是不能只对部分节点进行的, 但是可以对图中部分节点进行更新, 方法是先构造子图, 然后在子图上调用update_all()
方法即可; 这是Mini-Batch训练中的常用手段, 关于Mini-Batch详见本文Chapter 6中的相关内容; 代码示例如下:
nid = [0, 2, 3, 6, 7, 9]
sg = g.subgraph(nid)
sg.update_all(message_func, reduce_func, apply_node_func)
2.4 在消息传递中使用边权重
图注意力网络(GAT) 以及一些图卷积网络(GCN)的变种 , 这两篇paper里都提到了在消息聚合前使用边的权重, dgl
库中的做法是将权重存为边的特征, 并在消息函数中将边的特征与源节点的特征相乘; 代码示例如下, 其中affinity
即为边的权重, 通常为一个标量, 本质上就是加权聚合, 如注意力机制的方法:
import dgl.function as fn
graph.edata['a'] = affinity
graph.update_all(fn.u_mul_e('ft', 'a', 'm'),
fn.sum('m', 'ft'))
2.5 在异构图上进行消息传递
- 在Chapter 1的第五节已经详细介绍了异构图, 本质上异构图的消息传递与同构图并没有太大区别, 可以分为两步:
- 对每个关系计算和聚合消息;
- 对每个节点聚合来自不同关系的消息;
- 相关接口函数:
DGLGraph.multi_update_all(etype_dict, cross_reducer, apply_node_func=None)
- DGL官方文档 ;
- 参数:
etype_dict
:dict
类型, 键为一种关系, 值为这种关系对应的update_all()
的参数;cross_reducer
:str
类型, 表示跨类型整合函数, 来指定整合不同关系聚合结果的方式, 可以是sum
,min
,max
,mean
,stack
中之一;
- 代码示例:
import dgl.function as fn for c_etype in G.canonical_etypes: srctype, etype, dsttype = c_etype Wh = self.weight[etype](feat_dict[srctype]) # Save it in graph for message passing G.nodes[srctype].data['Wh_%s' % etype] = Wh # Specify per-relation message passing functions: (message_func, reduce_func). # Note that the results are saved to the same destination feature 'h', which # hints the type wise reducer for aggregation. funcs[etype] = (fn.copy_u('Wh_%s' % etype, 'm'), fn.mean('m', 'h')) # Trigger message passing of multiple types. G.multi_update_all(funcs, 'sum') # return the updated node feature dictionary return {ntype : G.nodes[ntype].data['h'] for ntype in G.ntypes}
- 官方文档中的这段代码应该是截取某个类中的函数的一段, 循环部分就是设定节点权重, 然后定义边的消息传递函数, 然后出了循环就调用
multi_update_all
函数一波带, 返回结果是更新过的节点的特征字典;
- 官方文档中的这段代码应该是截取某个类中的函数的一段, 循环部分就是设定节点权重, 然后定义边的消息传递函数, 然后出了循环就调用
Chpater 3: 构建图神经网络(GNN)模块
dgl.nn
模块是用户构建GNN模型的基本模块, 根据不同的dgl
后端, 该模块的父类也会继承自不同的类(如使用torch
, 自然就是继承torch.nn.Module
, 与torch
自定义层或网络是类似的), 于是其构造函数中的参数注册以及前向传播中使用的张量操作也与后端框架一样, 所以本质dgl
就可以视为一个插件可以直接嵌入到后端的深度学习库中, 非常便捷, 区别只在于dgl
定义了消息传递的操作框架;
详细的dgl.nn
模块内容可见https://docs.dgl.ai/api/python/nn.html , 常用的卷积层, 全连接层, 全局池化层以及一些工具函数都在当中定义;
本章将以torch
作为后端进行介绍, 以dgl.nn.pytorch.conv.SAGEConv
层的编写逻辑为例介绍自定义层的写法;
SAGEConv的数学公式如下所示, 这将在第2节中用于构建forward
函数:
h
N
(
d
s
t
)
(
l
+
1
)
=
a
g
g
r
e
g
a
t
e
(
{
h
s
r
c
l
,
∀
s
r
c
∈
N
(
d
s
t
)
}
)
h
d
s
t
l
+
1
=
σ
(
W
⋅
c
o
n
c
a
t
(
h
d
s
t
l
,
h
N
(
d
s
t
)
l
+
1
)
+
b
)
h
d
s
t
l
+
1
=
n
o
r
m
(
h
d
s
t
l
)
h_{\mathcal{N}(dst)}^{(l+1)}={\rm aggregate}(\{h_{src}^l,\forall src\in \mathcal{N}(dst)\})\\h_{dst}^{l+1}=\sigma(W\cdot {\rm concat}(h_{dst}^l,h_{\mathcal{N}(dst)}^{l+1})+b)\\h_{dst}^{l+1}={\rm norm}(h_{dst}^{l})
hN(dst)(l+1)=aggregate({hsrcl,∀src∈N(dst)})hdstl+1=σ(W⋅concat(hdstl,hN(dst)l+1)+b)hdstl+1=norm(hdstl)
3.1 dgl.nn模块的构造函数
- 构造函数中需要包含下面三个内容:
- 设置选项;
- 注册可学习的参数或者子模块;
- 初始化参数;
- 代码示例:
import torch.nn as nn from dgl.utils import expand_as_pair class SAGEConv(nn.Module): def __init__(self, in_feats, out_feats, aggregator_type, bias=True, norm=None, activation=None): super(SAGEConv, self).__init__() self._in_src_feats, self._in_dst_feats = expand_as_pair(in_feats) self._out_feats = out_feats self._aggre_type = aggregator_type self.norm = norm self.activation = activation # aggregator type: mean, max_pool, lstm, gcn if aggregator_type not in ['mean', 'max_pool', 'lstm', 'gcn']: raise KeyError('Aggregator type {} not supported.'.format(aggregator_type)) if aggregator_type == 'max_pool': self.fc_pool = nn.Linear(self._in_src_feats, self._in_src_feats) if aggregator_type == 'lstm': self.lstm = nn.LSTM(self._in_src_feats, self._in_src_feats, batch_first=True) if aggregator_type in ['mean', 'max_pool', 'lstm']: self.fc_self = nn.Linear(self._in_dst_feats, out_feats, bias=bias) self.fc_neigh = nn.Linear(self._in_src_feats, out_feats, bias=bias) self.reset_parameters() def reset_parameters(self): """Reinitialize learnable parameters.""" gain = nn.init.calculate_gain('relu') if self._aggre_type == 'max_pool': nn.init.xavier_uniform_(self.fc_pool.weight, gain=gain) if self._aggre_type == 'lstm': self.lstm.reset_parameters() if self._aggre_type != 'gcn': nn.init.xavier_uniform_(self.fc_self.weight, gain=gain) nn.init.xavier_uniform_(self.fc_neigh.weight, gain=gain)
- 构造函数中首先需要设置数据的维度:
- 输入的维度: GNN中可以分为源节点特征维度和目标节点特征维度;
- 输出的维度;
- 隐层的维度;
- 除了数据维度, GNN的一个典型选项是聚合类型
self._aggre_type
, 常用的选项有mean
,sum
,max
,min
; 一些模块可能会使用更加复杂的聚合函数, 比如lstm
; - 此外
self.norm
是用于特征归一化的函数, 在SAGEConv的定义中, 归一化可以是L2归一化, 即将特征除以它的二范数; - 关于注册参数和子模块:
- SAGEConv中的子模块根据聚合类型的不同而有差异, 这些模块就是纯的
torch.nn.Module
类型, 如torch.nn.Linear
,torch.nn.LSTM
等; - 最后构造函数调用
reset_parameters
进行权重初始化;
- SAGEConv中的子模块根据聚合类型的不同而有差异, 这些模块就是纯的
- 构造函数中首先需要设置数据的维度:
3.2 编写dgl.nn模块的forward函数
- 类似
torch
中的forward
函数, 这里的forward
函数执行实际的消息传递计算, 除了常见的张量运算外, 这里多出一个参数dgl.DGLGraph
; 函数中一般包含以下三个部分:
- 检测输入图对象是否符合规范;
- 消息传递和聚合;
- 聚合后, 更新特征作为输出;
以下将以SAGEConv的
forward
函数为例, 介绍这三个部分;
- 第一步: 输入图对象的规范检验:
- 代码示例:
def forward(self, graph, feat): with graph.local_scope(): # Specify graph type then expand input feature according to graph type feat_src, feat_dst = expand_as_pair(feat, graph) def expand_as_pair(input_, g=None): if isinstance(input_, tuple): # Bipartite graph case return input_ elif g is not None and g.is_block: # Subgraph block case if isinstance(input_, Mapping): input_dst = { k: F.narrow_row(v, 0, g.number_of_dst_nodes(k)) for k, v in input_.items()} else: input_dst = F.narrow_row(input_, 0, g.number_of_dst_nodes()) return input_, input_dst else: # Homogeneous graph case return input_, input_
- 源节点特征
feat_src
和目标节点特征feat_dst
需要根据图类型被指定, 由feat
扩展得到feat_src
和feat_dst
;
- 源节点特征
- 对于同构图上的全图训练, 源节点和目标节点相同, 它们都是图中的所有节点;
- 在异构图的情况下, 图可以分为几个二分图, 每种关系对应一个; 关系表示为
(src_type, edge_type, dst_type)
; 当输入特征feat
是一个元组时, 图将会被视为二分图; 元组中的第一个元素为源节点特征, 第二个元素为目标节点特征; - 在Mini-batch训练中, 计算应用于给定的一堆目标节点所采样的子图; 子图在DGL中称为
block
, 在block
创建的阶段,dst nodes
位于列表的最前面; 通过索引[0:g.number_of_dst_nodes()]
可以找到feat_dst
;- 这里已经开始不知道在说什么玩意儿了… 笔者猜想Mini-batch可能算是一种分布式的并行训练, 在Chapter 7中可能会有所提及, 这里可能还是要具体阅读后面的章节才能知道是什么意思;
- 第二步: 消息传递和聚合:
- 代码示例: 下面的代码执行了消息传递和聚合计算, 注意代码中的所有消息传递均使用
update_all()
和dgl
库内置的消息函数和聚合函数来实现; 这有助于性能优化;import dgl.function as fn import torch.nn.functional as F from dgl.utils import check_eq_shape if self._aggre_type == 'mean': graph.srcdata['h'] = feat_src graph.update_all(fn.copy_u('h', 'm'), fn.mean('m', 'neigh')) h_neigh = graph.dstdata['neigh'] elif self._aggre_type == 'gcn': check_eq_shape(feat) graph.srcdata['h'] = feat_src graph.dstdata['h'] = feat_dst graph.update_all(fn.copy_u('h', 'm'), fn.sum('m', 'neigh')) # divide in_degrees degs = graph.in_degrees().to(feat_dst) h_neigh = (graph.dstdata['neigh'] + graph.dstdata['h']) / (degs.unsqueeze(-1) + 1) elif self._aggre_type == 'max_pool': graph.srcdata['h'] = F.relu(self.fc_pool(feat_src)) graph.update_all(fn.copy_u('h', 'm'), fn.max('m', 'neigh')) h_neigh = graph.dstdata['neigh'] else: raise KeyError('Aggregator type {} not recognized.'.format(self._aggre_type)) # GraphSAGE GCN does not require fc_self. if self._aggre_type == 'gcn': rst = self.fc_neigh(h_neigh) else: rst = self.fc_self(h_self) + self.fc_neigh(h_neigh)
- 第三步: 聚合后, 更新特征作为输出
- 代码示例:
# activation if self.activation is not None: rst = self.activation(rst) # normalization if self.norm is not None: rst = self.norm(rst) return rst
forward
函数的最后一部分是在完成消息聚合后更新节点的特征;- 常见的更新操作是根据构造函数中设置的选项来应用激活函数和进行归一化;
将二三步的代码示例拼接到第一步的代码示例中的
forward
函数中即可;
3.3 异构图上的GraphConv模块
- 相关接口:
dgl.nn.pytorch.HeteroGraphConv(mods, aggregate='sum')
- DGL官方文档 ;
- 用于定义异构图上的GNN模块, 实现逻辑与消息传递级别的
multi_update_all
函数相同, 包括:- 每个关系上的
dgl.nn
模块; - 聚合来自不同关系上的结果;
- 每个关系上的
- 数学定义:
h
d
s
t
l
+
1
=
A
G
G
r
∈
R
,
r
d
s
t
=
d
s
t
(
f
r
(
g
r
,
h
r
s
r
c
l
,
h
r
d
s
t
l
)
)
h_{dst}^{l+1}=AGG_{r\in \mathcal{R},r_dst=dst}(f_r(g_r,h_{r_{src}}^l,h_{r_{dst}}^l))
hdstl+1=AGGr∈R,rdst=dst(fr(gr,hrsrcl,hrdstl))
- 其中
f
r
f_r
fr是对应每个关系
r
r
r的
dgl.nn
模块, A G G AGG AGG是聚合函数;
- 其中
f
r
f_r
fr是对应每个关系
r
r
r的
- HeteroGraphConv的实现逻辑:
- 构造函数代码示例:
import torch.nn as nn class HeteroGraphConv(nn.Module): def __init__(self, mods, aggregate='sum'): super(HeteroGraphConv, self).__init__() self.mods = nn.ModuleDict(mods) if isinstance(aggregate, str): # An internal function to get common aggregation functions self.agg_fn = get_aggregate_fn(aggregate) else: self.agg_fn = aggregate
- 参数
mods
:dict
类型, 字典的键为关系名, 值为作用在该关系上NN模块对象; - 参数
aggregate
: 指定了如何聚合来自不同关系的结果;
- 参数
forward
函数代码示例:def forward(self, g, inputs, mod_args=None, mod_kwargs=None): if mod_args is None: mod_args = {} if mod_kwargs is None: mod_kwargs = {} outputs = {nty : [] for nty in g.dsttypes} if g.is_block: src_inputs = inputs dst_inputs = {k: v[:g.number_of_dst_nodes(k)] for k, v in inputs.items()} else: src_inputs = dst_inputs = inputs for stype, etype, dtype in g.canonical_etypes: rel_graph = g[stype, etype, dtype] if rel_graph.num_edges() == 0: continue if stype not in src_inputs or dtype not in dst_inputs: continue dstdata = self.mods[etype]( rel_graph, (src_inputs[stype], dst_inputs[dtype]), *mod_args.get(etype, ()), **mod_kwargs.get(etype, {})) outputs[dtype].append(dstdata) rsts = {} for nty, alist in outputs.items(): if len(alist) != 0: rsts[nty] = self.agg_fn(alist, nty)
- 除了输入图
g
和输入张量inputs
,forward
函数还使用2个额外的字典参数:mod_args
;mod_kwargs
;- 这2个字典与
self.mods
具有相同的键, 值则为对应dgl.nn
模块的自定义参数;
forward
函数的输出结果也是一个字典类型的对象:- 键为
nty
; - 值为每个目标节点类型
nty
的输出张量的list
; 表示来自不同关系的计算结果; HeteroGraphConv会对这个list
进一步聚合, 并将结果返回给用户;
- 键为
- 参数
g
可以是异构图或来自异构图的子图区块; 和普通的dgl.nn
模块一样,forward
函数需要分别处理不同的输入图类型; - 上述代码中的
for
循环为处理异构图计算的主要逻辑:- 首先我们遍历图中所有的关系(通过调用
canonical_etypes
); - 通过关系名, 我们可以使用
g[stype, etype, dtype]
将只包含该关系的子图(rel_graph
)抽取出来; - 对于二部图, 输入特征将被组织为元组(
src_inputs[stype], dst_inputs[dtype]
); - 接着调用用户预先注册在该关系上的NN模块, 并将结果保存在
outputs
字典中;
- 首先我们遍历图中所有的关系(通过调用
- 最后, HeteroGraphConv会调用用户注册的
self.agg_fn
函数聚合来自多个关系的结果;
- 除了输入图
Chapter 4: 图数据管道
dgl.data
模块中实现了很多常用的图数据集; 它们遵循由dgl.data.DGLDataset
类定义的标准的数据处理管道;
官方文档推荐将图数据处理为dgl.data.DGLDataset
的子类, 因为该类为导入, 处理和保存图数据提供了很多工具函数;
4.1 DGLDataset类
class dgl.data.DGLDataset(name, url=None, raw_dir=None, save_dir=None, hash_key=(), force_reload=False, verbose=False)
: DGL官方文档 ;
DGLDataset
类是处理, 导入和保存dgl.data
模块中定义的图数据集的基类; 它实现了用于处理图数据的基本模板;DGLDataset
模板的工作方式:- 使用模板编写自定义的数据集调用类: 用于处理位于远程服务器或本地的图数据集;
from dgl.data import DGLDataset class MyDataset(DGLDataset): """ Template for customizing graph datasets in DGL. Parameters ---------- url : str URL to download the raw dataset raw_dir : str Specifying the directory that will store the downloaded data or the directory that already stores the input data. Default: ~/.dgl/ save_dir : str Directory to save the processed dataset. Default: the value of `raw_dir` force_reload : bool Whether to reload the dataset. Default: False verbose : bool Whether to print out progress information """ def __init__(self, url=None, raw_dir=None, save_dir=None, force_reload=False, verbose=False): super(MyDataset, self).__init__(name='dataset_name', url=url, raw_dir=raw_dir, save_dir=save_dir, force_reload=force_reload, verbose=verbose) def download(self): # download raw data to local disk pass def process(self): # process raw data to graphs, labels, splitting masks pass def __getitem__(self, idx): # get one example by index pass def __len__(self): # number of data examples pass def save(self): # save processed data to directory `self.save_path` pass def load(self): # load processed data from directory `self.save_path` pass def has_cache(self): # check whether there are processed data in `self.save_path` pass
- 继承
DGLDataset
类时必须实现其中的三个抽象函数:process()
;__getitem__(idx)
;__len__()
;- 另外建议也要实现
save()
和load()
函数, 一般会频繁地用于保存checkpoint, 详细接口可见本章第4节的内容;
- 继承
4.2 下载原始数据(可选)
- 本节是实现第1节中的
download
函数;
- 注意原数据一定要放在类构造参数
raw_dir
中, 这是父类DGLDataset
中的规定; download
函数一般用于下载远程服务器上的数据, 如果数据集是zip
格式的压缩包, 则可以直接继承dgl.data.DGLBuiltinDataset
类编写数据模板, 它支持解压缩zip
文件, 具体可以参考QM7bDataset
类: DGL官方文档 ;- 自定义
download
函数示例:import os from dgl.data.utils import download def download(self): # path to store the file file_path = os.path.join(self.raw_dir, self.name + '.mat') # download file download(self.url, path=file_path)
- 这是将一个
.mat
文件下载到目录self.raw_dir
; - 如果文件时
.gz, .tar, .tar.gz, .tgz
的文件, 则可以使用dgl.data.utils.extract_archive(file, target_dir, overwrite=False)
; BitcoinOTCDataset
类中下载.gz
文件的代码示例:from dgl.data.utils import download, check_sha1 def download(self): # path to store the file # make sure to use the same suffix as the original file name's gz_file_path = os.path.join(self.raw_dir, self.name + '.csv.gz') # download file download(self.url, path=gz_file_path) # check SHA-1 if not check_sha1(gz_file_path, self._sha1_str): raise UserWarning('File {} is downloaded but the content hash does not match.' 'The repo may be outdated or download may be incomplete. ' 'Otherwise you can create an issue for it.'.format(self.name + '.csv.gz')) # extract file to directory `self.name` under `self.raw_dir` self._extract_gz(gz_file_path, self.raw_path)
- 这是将一个
4.3 数据处理
- 本节是实现第1节中的
process
函数;
- 图上的机器学习任务由三种常见类型:
- ① 整图分类: 即将图看作整体预测该图属于什么领域, 通常这种任务下的图规模较小;
- ② 节点分类: 以知识图谱为例, 现在有很多研究在做 ( s , r , o ) (s,r,o) (s,r,o)三元组中某个元素的预测, 以GitHub@RE-Net 为例, 这就是通过知识图谱的时间序列进行关系预测和subject/object预测的任务, 在当前时间点上应该算是state-of-the-art的成果;
- ③ 链接预测: 即link prediction, 可以理解为节点分类的回归问题, 即不需要预测确切的分类, 而是预测出边或节点的特征值;
- 本节主要介绍处理图, 特征和划分掩码的标准方法;
- 处理整图分类数据集:
- 详见本文Chapter 5第4节的内容, 本节仅以内置数据集
QM7bDataset
为例, 5.4中会例举更多的数据集; - 整图分类数据集与典型机器学习任务中的大多数数据集类似, 需要将原始数据处理为
dgl.DGLGraph
对象的列表和标签张量的列表; - 以
QM7bDataset
类的源码示例:from dgl.data import DGLDataset class QM7bDataset(DGLDataset): _url = 'http://deepchem.io.s3-website-us-west-1.amazonaws.com/' \ 'datasets/qm7b.mat' _sha1_str = '4102c744bb9d6fd7b40ac67a300e49cd87e28392' def __init__(self, raw_dir=None, force_reload=False, verbose=False): super(QM7bDataset, self).__init__(name='qm7b', url=self._url, raw_dir=raw_dir, force_reload=force_reload, verbose=verbose) def process(self): mat_path = self.raw_path + '.mat' # process data to a list of graphs and a list of labels self.graphs, self.label = self._load_graph(mat_path) def __getitem__(self, idx): """ Get graph and label by index Parameters ---------- idx : int Item index Returns ------- (dgl.DGLGraph, Tensor) """ return self.graphs[idx], self.label[idx] def __len__(self): """Number of graphs in the dataset""" return len(self.graphs)
- 函数
process
将原始数据处理为图列表和标签列表; - 用户必须实现
__getitem__(idx)
和__len__()
以进行迭代; - 推荐
__getitem__(idx)
返回如上面代码所示的元组(graph, label)
; - 参考
QM7bDataset
源代码 以获得self._load_graph()
和__getitem__
的详细信息;
- 函数
- 可以向类添加属性以指示一些有用的数据集信息; 在
QM7bDataset
中, 用户可以添加属性num_labels
来指示此多任务数据集中的预测任务总数:@property def num_labels(self): """Number of labels for each graph, i.e. number of prediction tasks.""" return 14
- 调用
QM7bDataset
类代码示例: 这里是调用了torch
的数据加载器;import dgl import torch from torch.utils.data import DataLoader # load data dataset = QM7bDataset() num_labels = dataset.num_labels # create collate_fn def _collate_fn(batch): graphs, labels = batch g = dgl.batch(graphs) labels = torch.tensor(labels, dtype=torch.long) return g, labels # create dataloaders dataloader = DataLoader(dataset, batch_size=1, shuffle=True, collate_fn=_collate_fn) # training for epoch in range(100): for g, labels in dataloader: # your training code here pass
- 处理节点分类数据集:
-
节点分类通常在单图上进行, 因此数据集的划分是在图的节点集上进行;
-
官方文档建议使用节点掩码来指定数据集的划分;
-
详细节点分类问题可见本文Chapter 5第1节;
-
所有与节点分类相关的数据集:
-
本节以内置数据集
CitationGraphDataset
为例:from dgl.data import DGLBuiltinDataset from dgl.data.utils import _get_dgl_url, generate_mask_tensor class CitationGraphDataset(DGLBuiltinDataset): _urls = { 'cora_v2' : 'dataset/cora_v2.zip', 'citeseer' : 'dataset/citeseer.zip', 'pubmed' : 'dataset/pubmed.zip', } def __init__(self, name, raw_dir=None, force_reload=False, verbose=True): assert name.lower() in ['cora', 'citeseer', 'pubmed'] if name.lower() == 'cora': name = 'cora_v2' url = _get_dgl_url(self._urls[name]) super(CitationGraphDataset, self).__init__(name, url=url, raw_dir=raw_dir, force_reload=force_reload, verbose=verbose) def process(self): # Skip some processing code # === data processing skipped === # build graph g = dgl.graph(graph) # splitting masks g.ndata['train_mask'] = generate_mask_tensor(train_mask) g.ndata['val_mask'] = generate_mask_tensor(val_mask) g.ndata['test_mask'] = generate_mask_tensor(test_mask) # node labels g.ndata['label'] = torch.tensor(labels) # node features g.ndata['feat'] = torch.tensor(_preprocess_features(features), dtype=F.data_type_dict['float32']) self._num_labels = onehot_labels.shape[1] self._labels = labels self._g = g def __getitem__(self, idx): assert idx == 0, "This dataset has only one graph" return self._g def __len__(self): return 1
process
函数中省略了部分代码, 留下的部分是突出关键部分: 划分掩码; 详细可参考CitationGraphDataset
源码 ;
-
使用
dgl.data.CitationGraphDataset
的子类dgl.data.CiteseerGraphDataset
来调用节点分类数据集:# load data dataset = CiteseerGraphDataset(raw_dir='') graph = dataset[0] # get split masks train_mask = graph.ndata['train_mask'] val_mask = graph.ndata['val_mask'] test_mask = graph.ndata['test_mask'] # get node features feats = graph.ndata['feat'] # get labels labels = graph.ndata['label']
- 处理链接预测数据集:
-
链接预测数据集的处理与节点分类相似, 数据集中通常只有一个图;
-
关于链接预测的详细内容可见本文Chapter 5第3节内容:
- Knowlege graph dataset ;
- 这里提供数个规模不一的知识图谱数据库;
- BitcoinOTC dataset ;
- Knowlege graph dataset ;
-
本节以内置数据集
KnowledgeGraphDataset
为例:# Example for creating Link Prediction datasets class KnowledgeGraphDataset(DGLBuiltinDataset): def __init__(self, name, reverse=True, raw_dir=None, force_reload=False, verbose=True): self._name = name self.reverse = reverse url = _get_dgl_url('dataset/') + '{}.tgz'.format(name) super(KnowledgeGraphDataset, self).__init__(name, url=url, raw_dir=raw_dir, force_reload=force_reload, verbose=verbose) def process(self): # Skip some processing code # === data processing skipped === # splitting mask g.edata['train_mask'] = train_mask g.edata['val_mask'] = val_mask g.edata['test_mask'] = test_mask # edge type g.edata['etype'] = etype # node type g.ndata['ntype'] = ntype self._g = g def __getitem__(self, idx): assert idx == 0, "This dataset has only one graph" return self._g def __len__(self): return 1
- 图的
edata
中存储了划分掩码; 详细源码可见https://docs.dgl.ai/en/0.5.x/_modules/dgl/data/knowledge_graph.html#KnowledgeGraphDataset ;
- 图的
-
使用
KnowledgeGraphDataset
的子类dgl.data.FB15k237Dataset
来调用链接预测数据集:from dgl.data import FB15k237Dataset # load data dataset = FB15k237Dataset() graph = dataset[0] # get training mask train_mask = graph.edata['train_mask'] train_idx = torch.nonzero(train_mask).squeeze() src, dst = graph.edges(train_idx) # get edge types in training set rel = graph.edata['etype'][train_idx]
4.4 保存与加载数据
- 正如上文提到的, 建议实现
save
和load
函数; - 相关工具函数:
dgl.save_graphs(filename, g_list, labels=None)
: 保存DGLGraph
对象; 这在1.5节异构图已经提过了;dgl.load_graphs(filename, idx_list=None)
: 从本地读取DGLGraph
对象; 这在1.5节异构图已经提过了;dgl.data.utils.save_info(path, info)
: 将数据集的有用信息(dict
类型)保存;dgl.data.utils.load_info(path)
: 读取信息;- 代码示例:
import os from dgl import save_graphs, load_graphs from dgl.data.utils import makedirs, save_info, load_info def save(self): # save graphs and labels graph_path = os.path.join(self.save_path, self.mode + '_dgl_graph.bin') save_graphs(graph_path, self.graphs, {'labels': self.labels}) # save other information in python dict info_path = os.path.join(self.save_path, self.mode + '_info.pkl') save_info(info_path, {'num_classes': self.num_classes}) def load(self): # load processed data from directory `self.save_path` graph_path = os.path.join(self.save_path, self.mode + '_dgl_graph.bin') self.graphs, label_dict = load_graphs(graph_path) self.labels = label_dict['labels'] info_path = os.path.join(self.save_path, self.mode + '_info.pkl') self.num_classes = load_info(info_path)['num_classes'] def has_cache(self): # check whether there are processed data in `self.save_path` graph_path = os.path.join(self.save_path, self.mode + '_dgl_graph.bin') info_path = os.path.join(self.save_path, self.mode + '_info.pkl') return os.path.exists(graph_path) and os.path.exists(info_path)
- 注意: 有些情况下不适合保存处理过的数据;
- 在内置数据集
GDELTDataset
中, 处理过的数据很大, 此时在__getitem__(idx)
中处理每个数据实例是更高效的方法;
- 在内置数据集
4.5 使用ogb库加载OGB数据集
ogb
库, 全称开源图基准(Open Graph Benchmark), 是一个图深度学习的基准数据集, 其中内置了用于下载和处理ogb
数据集转为dgl.data.DGLGraph
对象的接口函数; 简单pip
安装即可;
- 加载数据集Graph Property Prediction示例代码: 这是整图分类;
# Load Graph Property Prediction datasets in OGB import dgl import torch from ogb.graphproppred import DglGraphPropPredDataset from torch.utils.data import DataLoader def _collate_fn(batch): # batch is a list of tuple (graph, label) graphs = [e[0] for e in batch] g = dgl.batch(graphs) labels = [e[1] for e in batch] labels = torch.stack(labels, 0) return g, labels # load dataset dataset = DglGraphPropPredDataset(name='ogbg-molhiv') split_idx = dataset.get_idx_split() # dataloader train_loader = DataLoader(dataset[split_idx["train"]], batch_size=32, shuffle=True, collate_fn=_collate_fn) valid_loader = DataLoader(dataset[split_idx["valid"]], batch_size=32, shuffle=False, collate_fn=_collate_fn) test_loader = DataLoader(dataset[split_idx["test"]], batch_size=32, shuffle=False, collate_fn=_collate_fn)
- 加载数据集Node Property Prediction示例代码, 该数据集只有一个图: 这是节点分类;
# Load Node Property Prediction datasets in OGB from ogb.nodeproppred import DglNodePropPredDataset dataset = DglNodePropPredDataset(name='ogbn-proteins') split_idx = dataset.get_idx_split() # there is only one graph in Node Property Prediction datasets g, labels = dataset[0] # get split labels train_label = dataset.labels[split_idx['train']] valid_label = dataset.labels[split_idx['valid']] test_label = dataset.labels[split_idx['test']]
- 加载数据集Link Property Prediction示例代码: 这是链接预测;
# Load Link Property Prediction datasets in OGB from ogb.linkproppred import DglLinkPropPredDataset dataset = DglLinkPropPredDataset(name='ogbl-ppa') split_edge = dataset.get_edge_split() graph = dataset[0] print(split_edge['train'].keys()) print(split_edge['valid'].keys()) print(split_edge['test'].keys())
Chapter 5: 训练图神经网络(GNN)
- 本章主要是讨论在上一章中提到的三种任务(节点分类和边分类, 链接预测, 图分类)中训练GNN模型:
- 使用的方法来自第二章的消息传递以及第三节的自定义
dgl.nn
模块; - 使用的数据集可以直接从上一章中提到的内置数据集中获取, 以获取一个单图数据集为例:
import dgl dataset = dgl.data.CiteseerGraphDataset() graph = dataset[0]
- 本节中使用的后端依然为
torch
; - 关于测试异构图上的三种任务, 可以事先构建一个简单的社交网络异构图数据集:
import numpy as np import torch n_users = 1000 n_items = 500 n_follows = 3000 n_clicks = 5000 n_dislikes = 500 n_hetero_features = 10 n_user_classes = 5 n_max_clicks = 10 follow_src = np.random.randint(0, n_users, n_follows) follow_dst = np.random.randint(0, n_users, n_follows) click_src = np.random.randint(0, n_users, n_clicks) click_dst = np.random.randint(0, n_items, n_clicks) dislike_src = np.random.randint(0, n_users, n_dislikes) dislike_dst = np.random.randint(0, n_items, n_dislikes) hetero_graph = dgl.heterograph({ ('user', 'follow', 'user'): (follow_src, follow_dst), ('user', 'followed-by', 'user'): (follow_dst, follow_src), ('user', 'click', 'item'): (click_src, click_dst), ('item', 'clicked-by', 'user'): (click_dst, click_src), ('user', 'dislike', 'item'): (dislike_src, dislike_dst), ('item', 'disliked-by', 'user'): (dislike_dst, dislike_src)}) hetero_graph.nodes['user'].data['feature'] = torch.randn(n_users, n_hetero_features) hetero_graph.nodes['item'].data['feature'] = torch.randn(n_items, n_hetero_features) hetero_graph.nodes['user'].data['label'] = torch.randint(0, n_user_classes, (n_users,)) hetero_graph.edges['click'].data['label'] = torch.randint(1, n_max_clicks, (n_clicks,)).float() # randomly generate training masks on user nodes and click edges hetero_graph.nodes['user'].data['train_mask'] = torch.zeros(n_users, dtype=torch.bool).bernoulli(0.6) hetero_graph.edges['click'].data['train_mask'] = torch.zeros(n_clicks, dtype=torch.bool).bernoulli(0.6)
- 该异构图
hetero_graph
中包含如下的边:('user', 'follow', 'user')
;('user', 'followed-by', 'user')
;('user', 'click', 'item')
;('item', 'clicked-by', 'user')
;('user', 'dislike', 'item')
;('item', 'disliked-by', 'user')
;
- 该异构图
5.1 节点分类/回归
这是目前图神经网络中最为热门的研究之一; 给定一张图, 请给出所有节点的分类标签; 为了对节点进行分类, 图神经网络需要进行消息传递来利用每个节点自身的特征, 以及它近邻节点和边的特征;
GitHub@RE-Net 的paper中提到的聚合是包括k
级近邻内的聚合, 消息传递未必只是一级的, 可以是多级的, 只是每增加一级会大大增加消息传递的复杂度;
当然可以通过增加消息传递的轮数, 从而实现每个节点和边的信息可以尽可能传递到图中的每一个角落;
- 编写神经网络模型:
dgl
库提供了一些内置的图卷积模块来实现一轮的消息传递;- 本节中以
dgl.nn.pytorch.SAGEConv
类为例:SAGE
类中包含了两个卷积层, 将多个卷积层叠加, 即可实现多轮的消息传递;# Contruct a two-layer GNN model import dgl.nn as dglnn import torch.nn as nn import torch.nn.functional as F class SAGE(nn.Module): def __init__(self, in_feats, hid_feats, out_feats): super().__init__() self.conv1 = dglnn.SAGEConv( in_feats=in_feats, out_feats=hid_feats, aggregator_type='mean') self.conv2 = dglnn.SAGEConv( in_feats=hid_feats, out_feats=out_feats, aggregator_type='mean') def forward(self, graph, inputs): # inputs are features of nodes h = self.conv1(graph, inputs) h = F.relu(h) h = self.conv2(graph, h) return h
- 该模型不仅可以用于节点分类, 也可以训练获得隐层节点表示, 然后用于下面本章几节的几个下游任务;
- 详细的内置网络层详见dgl.nn ;
- 在本文第三章已经描述了如何编写自定义的GNN网络层;
- 训练模型的循环:
- 训练模型包括以下几步: 与
torch
的模型训练并无区别;- ① 调用模型的前向传播函数
forward
; - ② 计算损失函数: 通过模型预测值和真实值;
- ③ (可选)计算当前模型评估指标, 如精确度, 可用于输出查看实时训练效果;
- ④ 优化器梯度清零:
optimizer.zero_grad()
; - ⑤ 损失函数反向传播:
loss.backward()
; - ⑥ 优化器迭进行学习迭代:
optimizer.step()
;
- ① 调用模型的前向传播函数
- 本节以内置数据集
dgl.data.CiteseerGraphDataset
为例, 介绍训练流程; - 读取数据集: 得到
graph
, 详细略, 可见本文第四章关于数据集加载的方法; - 获取数据集中的特征:
node_features = graph.ndata['feat'] node_labels = graph.ndata['label'] train_mask = graph.ndata['train_mask'] valid_mask = graph.ndata['val_mask'] test_mask = graph.ndata['test_mask'] n_features = node_features.shape[1] n_labels = int(node_labels.max().item() + 1)
- 用于验证模型精确度的工具函数:
def evaluate(model, graph, features, labels, mask): model.eval() with torch.no_grad(): logits = model(graph, features) logits = logits[mask] labels = labels[mask] _, indices = torch.max(logits, dim=1) correct = torch.sum(indices == labels) return correct.item() * 1.0 / len(labels)
- 模型训练循环:
model = SAGE(in_feats=n_features, hid_feats=100, out_feats=n_labels) opt = torch.optim.Adam(model.parameters()) for epoch in range(10): model.train() # forward propagation by using all nodes logits = model(graph, node_features) # compute loss loss = F.cross_entropy(logits[train_mask], node_labels[train_mask]) # compute validation accuracy acc = evaluate(model, graph, node_features, node_labels, valid_mask) # backward propagation opt.zero_grad() loss.backward() opt.step() print(loss.item()) # Save model if necessary. Omitted in this example.
- 异构图的模型训练:
- 异构图上可能需要把与某个节点相关的所有类型的边都要进行消息聚合, 这可以通过使用内置的
dgl.nn.pytorch.HeteroGraphConv
来实现; - 下面的代码示例定义了一种异构图的卷积模块, 该模块先对每种类型的边分别进行一次图卷积(
self.conv1
), 然后将每种类型的边的消息聚合结果累和作为所有节点类型的最终结果(self.conv2
):# Define a Heterograph Conv model import dgl.nn as dglnn class RGCN(nn.Module): def __init__(self, in_feats, hid_feats, out_feats, rel_names): super().__init__() self.conv1 = dglnn.HeteroGraphConv({ rel: dglnn.GraphConv(in_feats, hid_feats) for rel in rel_names}, aggregate='sum') self.conv2 = dglnn.HeteroGraphConv({ rel: dglnn.GraphConv(hid_feats, out_feats) for rel in rel_names}, aggregate='sum') def forward(self, graph, inputs): # inputs are features of nodes h = self.conv1(graph, inputs) h = {k: F.relu(v) for k, v in h.items()} h = self.conv2(graph, h) return h model = RGCN(n_hetero_features, 20, n_user_classes, hetero_graph.etypes) user_feats = hetero_graph.nodes['user'].data['feature'] item_feats = hetero_graph.nodes['item'].data['feature'] labels = hetero_graph.nodes['user'].data['label'] train_mask = hetero_graph.nodes['user'].data['train_mask'] node_features = {'user': user_feats, 'item': item_feats} h_dict = model(hetero_graph, {'user': user_feats, 'item': item_feats}) h_user = h_dict['user'] h_item = h_dict['item'] opt = torch.optim.Adam(model.parameters()) for epoch in range(5): model.train() # forward propagation by using all nodes and extracting the user embeddings logits = model(hetero_graph, node_features)['user'] # compute loss loss = F.cross_entropy(logits[train_mask], labels[train_mask]) # Compute validation accuracy. Omitted in this example. # backward propagation opt.zero_grad() loss.backward() opt.step() print(loss.item()) # Save model if necessary. Omitted in the example.
- 关于RGCN的end-to-end用于节点分类的例子可见官方示例dgl@GitHub ;
- 关于异构图卷积层
RelGraphConvLayer
类的实现代码可见dgl@GitHub ;
5.2 边分类/回归
常见的边分类问题就是知识图谱中的关系预测; 本质上边分类/回归于节点分类/回归是大致相似的, 因为边的预测可以从邻近节点的特征表示通过某种聚合得到; 但是仍然有一些的不同;
- 本节生成一个简单随机图来作为边分类/回归问题的示例数据集:
- 代码示例:
import dgl import numpy as np src = np.random.randint(0, 100, 500) dst = np.random.randint(0, 100, 500) # make it symmetric edge_pred_graph = dgl.graph((np.concatenate([src, dst]), np.concatenate([dst, src]))) # synthetic node and edge features, as well as edge labels edge_pred_graph.ndata['feature'] = torch.randn(100, 10) edge_pred_graph.edata['feature'] = torch.randn(1000, 10) edge_pred_graph.edata['label'] = torch.randn(1000) # synthetic train-validation-test splits edge_pred_graph.edata['train_mask'] = torch.zeros(1000, dtype=torch.bool).bernoulli(0.6)
- 模型实现上与节点分类的差异:
- 假设你已经在上一章节计算出了节点表示, 那么只需要编写一个
apply_edges()
方法来计算边的预测值即可; 简单的一个例子即直接将边的两个端点的特征表示点乘得到边的预测特征:import dgl.function as fn class DotProductPredictor(nn.Module): def forward(self, graph, h): # h contains the node representations computed from the GNN defined # in the node classification section (Section 5.1). with graph.local_scope(): graph.ndata['h'] = h graph.apply_edges(fn.u_dot_v('h', 'h', 'score')) return graph.edata['score']
- 也可以使用一个MLP来预测每条边的向量表示, 边的向量表示可以用于进一步的下游任务, 例如作为类别分布(categorical distribution)的logits值;
class MLPPredictor(nn.Module): def __init__(self, in_features, out_classes): super().__init__() self.W = nn.Linear(in_features * 2, out_classes) def apply_edges(self, edges): h_u = edges.src['h'] h_v = edges.dst['h'] score = self.W(torch.cat([h_u, h_v], 1)) return {'score': score} def forward(self, graph, h): # h contains the node representations computed from the GNN defined # in the node classification section (Section 5.1). with graph.local_scope(): graph.ndata['h'] = h graph.apply_edges(self.apply_edges) return graph.edata['score']
- 模型训练的循环:
- 给定一个节点表示计算模型和边预测模型, 就可以很容易地写出一个用于计算所有边预测地全图训练循环;
- 下面地代码示例使用地是上一节中提到过的
SAGE
模型作为节点表示计算模型, 并使用DotPredictor
作为边预测模型:class Model(nn.Module): def __init__(self, in_features, hidden_features, out_features): super().__init__() self.sage = SAGE(in_features, hidden_features, out_features) self.pred = DotProductPredictor() def forward(self, g, x): h = self.sage(g, x) return self.pred(g, h) node_features = edge_pred_graph.ndata['feature'] edge_label = edge_pred_graph.edata['label'] train_mask = edge_pred_graph.edata['train_mask'] model = Model(10, 20, 5) opt = torch.optim.Adam(model.parameters()) for epoch in range(10): pred = model(edge_pred_graph, node_features) loss = ((pred[train_mask] - edge_label[train_mask]) ** 2).mean() opt.zero_grad() loss.backward() opt.step() print(loss.item())
- 在上述模型中假设了数据集上的边集都是通过边上布尔型的掩码进行识别(edge sets are identified by boolean masks on edges), 且本例没有使用early-stopping以及保存模型;
- 异构图上的模型定义与训练:
- 边预测在同构图与提构图上的区别不大, 只需要在
apply_edges
方法中额外指定边类型即可; - 仍然以上面
DotPredictor
的例子, 将它转为异构图的情况:from dgl import function as fn class HeteroDotProductPredictor(nn.Module): def forward(self, graph, h, etype): # h contains the node representations for each edge type computed from # the GNN for heterogeneous graphs defined in the node classification # section (Section 5.1). with graph.local_scope(): graph.ndata['h'] = h # assigns 'h' of all node types in one shot graph.apply_edges(fn.u_dot_v('h', 'h', 'score'), etype=etype) return graph.edges[etype].data['score']
- 同理可以照着写一个
HeteroMLPPredictor
:class MLPPredictor(nn.Module): def __init__(self, in_features, out_classes): super().__init__() self.W = nn.Linear(in_features * 2, out_classes) def apply_edges(self, edges): h_u = edges.src['h'] h_v = edges.dst['h'] score = self.W(torch.cat([h_u, h_v], 1)) return {'score': score} def forward(self, graph, h, etype): # h contains the node representations for each edge type computed from # the GNN for heterogeneous graphs defined in the node classification # section (Section 5.1). with graph.local_scope(): graph.ndata['h'] = h # assigns 'h' of all node types in one shot graph.apply_edges(self.apply_edges, etype=etype) return graph.edges[etype].data['score']
- 用于预测每个边得分的end-to-end模型可以这样写:
class Model(nn.Module): def __init__(self, in_features, hidden_features, out_features, rel_names): super().__init__() self.sage = RGCN(in_features, hidden_features, out_features, rel_names) self.pred = HeteroDotProductPredictor() def forward(self, g, x, etype): h = self.sage(g, x) return self.pred(g, h, etype)
- 然后可以得到进行与上文提过的类似的模型训练循环:
model = Model(10, 20, 5, hetero_graph.etypes) user_feats = hetero_graph.nodes['user'].data['feature'] item_feats = hetero_graph.nodes['item'].data['feature'] label = hetero_graph.edges['click'].data['label'] train_mask = hetero_graph.edges['click'].data['train_mask'] node_features = {'user': user_feats, 'item': item_feats} opt = torch.optim.Adam(model.parameters()) for epoch in range(10): pred = model(hetero_graph, node_features, 'click') loss = ((pred[train_mask] - label[train_mask]) ** 2).mean() opt.zero_grad() loss.backward() opt.step() print(loss.item())
- 预测异构图上某个边的类别:
- 以本章开头的异构图代码为例, 任务是给定user与item间连接的边, 来预测user到底是会click还是dislike这件item; 这在推荐系统中是非常常见的问题;
- 为了获取节点的特征表示, 可能需要一些异构图的卷积网络, 如本章第1节定义的
RGCN
类; - 为了预测边的类型, 可以简单使用上文提到的
HeteroDotProductPredictor
; - 代码示例:
dec_graph = hetero_graph['user', :, 'item'] edge_label = dec_graph.edata[dgl.ETYPE]
- 第一行返回一个带有两种节点类型(
user
和item
)的异构图; - 第二行返回边的真实标签, 直接从特征名为
dgl.ETYPE
的里面取;
- 第一行返回一个带有两种节点类型(
- 拿到可以用于输入模型的图后, 再编写预测器模块:
class HeteroMLPPredictor(nn.Module): def __init__(self, in_dims, n_classes): super().__init__() self.W = nn.Linear(in_dims * 2, n_classes) def apply_edges(self, edges): x = torch.cat([edges.src['h'], edges.dst['h']], 1) y = self.W(x) return {'score': y} def forward(self, graph, h): # h contains the node representations for each edge type computed from # the GNN for heterogeneous graphs defined in the node classification # section (Section 5.1). with graph.local_scope(): graph.ndata['h'] = h # assigns 'h' of all node types in one shot graph.apply_edges(self.apply_edges) return graph.edata['score']
- 模型定义及训练代码示例:
class Model(nn.Module): def __init__(self, in_features, hidden_features, out_features, rel_names): super().__init__() self.sage = RGCN(in_features, hidden_features, out_features, rel_names) self.pred = HeteroMLPPredictor(out_features, len(rel_names)) def forward(self, g, x, dec_graph): h = self.sage(g, x) return self.pred(dec_graph, h) model = Model(10, 20, 5, hetero_graph.etypes) user_feats = hetero_graph.nodes['user'].data['feature'] item_feats = hetero_graph.nodes['item'].data['feature'] node_features = {'user': user_feats, 'item': item_feats} opt = torch.optim.Adam(model.parameters()) for epoch in range(10): logits = model(hetero_graph, node_features, dec_graph) loss = F.cross_entropy(logits, edge_label) opt.zero_grad() loss.backward() opt.step() print(loss.item())
官方示例@GitHub ;
这一部分都省略了读取数据的部分, 学到这里可能已经忘了上面是怎么读取数据的了, 因为大部分dgl
内置的数据集都需要从外网下载, 速度比较慢, 所以选一两个典型的做个测试即可, 以后项目里的数据集还是需要自己处理生成的;
本章大部分异构图数据集的例子都是取自本章开头的那段代码;
5.3 链接预测
- 概述:
- 所谓链接预测即预测两个节点间是否存在一条边将它们连接;
- 基于GNN链接预测模型表征了两个节点
u
u
u和
v
v
v间联系性的似然值(likelihoodd of connectivity), 这里用
h
u
(
L
)
h_u^{(L)}
hu(L)与
h
u
(
L
)
h_u^{(L)}
hu(L)表示, 他们的节点表示可以用多层GNN计算得到:
y
u
,
v
=
ϕ
(
h
u
(
L
)
,
h
v
(
L
)
)
y_{u,v}=\phi(h_u^{(L)},h_v^{(L)})
yu,v=ϕ(hu(L),hv(L))
- 本节中定义 y u , v y_{u,v} yu,v为两个节点 u u u和 v v v间的得分;
- 训练链接预测模型的手段是: 比较存在边连接的一对节点间的得分与任意一对节点间的得分;
- 显然我们希望前者的得分要比后者高, 即如果 u u u和 v v v间存在边连接, 那么 y u , v y_{u,v} yu,v应当大于 y u , v ′ y_{u,v^{\prime}} yu,v′, 其中 v ′ v^{\prime} v′是一个任意噪声分布(arbitrary noise distribution) P n ( v ) P_n(v) Pn(v)中采样得到的点, 这种采样方法称为负采样(negative sampling);
- 有很多损失函数可以取得上述的效果, 这里举几个例子:
- 交叉熵损失: L = − log σ ( y u , v ) − ∑ v i ∼ P n ( v ) , i = 1 , 2 , . . . , k log ( 1 − σ ( y u , v i ) ) \mathcal{L}=-\log\sigma(y_{u,v})-\sum_{v_i\sim P_n(v),i=1,2,...,k}\log(1-\sigma(y_{u,v_i})) L=−logσ(yu,v)−∑vi∼Pn(v),i=1,2,...,klog(1−σ(yu,vi))
- BRP损失: L = ∑ v i ∼ P n ( v ) , i = 1 , 2 , . . . , k − log σ ( y u , v − y u , v i ) \mathcal{L}=\sum_{v_i\sim P_n(v),i=1,2,...,k}-\log\sigma(y_{u,v}-y_{u,v_i}) L=∑vi∼Pn(v),i=1,2,...,k−logσ(yu,v−yu,vi)
- 边际(margin)损失: L = ∑ v i ∼ P n ( v ) , i = 1 , 2 , . . . , k max ( 0 , M − y u , v + y u , v i ) \mathcal{L}=\sum_{v_i\sim P_n(v),i=1,2,...,k}\max(0,M-y_{u,v}+y_{u,v_i}) L=∑vi∼Pn(v),i=1,2,...,kmax(0,M−yu,v+yu,vi), 其中 M M M为常数;
- 如果想要详细了解这些知识可以查阅下面两篇paper:
- 隐式反馈: implicit feedback ;
- 噪声对比估计: noise-contrastive estimation ;
- 模型实现上与边分类的差异:
- 用于计算两个节点 u u u和 v v v间得分的神经网络模型与上一节边回归模型是完全相同的;
- 下面是一个用点积来计算得分的示例:
class DotProductPredictor(nn.Module): def forward(self, graph, h): # h contains the node representations computed from the GNN defined # in the node classification section (Section 5.1). with graph.local_scope(): graph.ndata['h'] = h graph.apply_edges(fn.u_dot_v('h', 'h', 'score')) return graph.edata['score']
- 模型训练的循环:
- 因为得分预测模型需要表达负样本的例子, 所以需要生成一张包含负样本节点对的图;
- 下面的代码给出一种采样负样本图的方法:
def construct_negative_graph(graph, k): src, dst = graph.edges() neg_src = src.repeat_interleave(k) neg_dst = torch.randint(0, graph.number_of_nodes(), (len(src) * k,)) return dgl.graph((neg_src, neg_dst), num_nodes=graph.number_of_nodes())
- 模型定义与训练的部分的步骤与上一节基本没有差别:
class Model(nn.Module): def __init__(self, in_features, hidden_features, out_features): super().__init__() self.sage = SAGE(in_features, hidden_features, out_features) self.pred = DotProductPredictor() def forward(self, g, neg_g, x): h = self.sage(g, x) return self.pred(g, h), self.pred(neg_g, h) def compute_loss(pos_score, neg_score): # Margin loss n_edges = pos_score.shape[0] return (1 - neg_score.view(n_edges, -1) + pos_score.unsqueeze(1)).clamp(min=0).mean() node_features = graph.ndata['feat'] n_features = node_features.shape[1] k = 5 model = Model(n_features, 100, 100) opt = torch.optim.Adam(model.parameters()) for epoch in range(10): negative_graph = construct_negative_graph(graph, k) pos_score, neg_score = model(graph, negative_graph, node_features) loss = compute_loss(pos_score, neg_score) opt.zero_grad() loss.backward() opt.step() print(loss.item())
- 有很多使用节点嵌入(node embeddings)的方法, 这里不再赘述, 即上述代码中
graph.ndata['feat']
的部分, 简单使用内置的数据集即可;
- 有很多使用节点嵌入(node embeddings)的方法, 这里不再赘述, 即上述代码中
- 异构图上的链接预测:
- 同理这跟同构图的差别也很小, 这里还是用
HeteroDotProductPredictor
的例子:class HeteroDotProductPredictor(nn.Module): def forward(self, graph, h, etype): # h contains the node representations for each node type computed from # the GNN defined in the previous section (Section 5.1). with graph.local_scope(): graph.ndata['h'] = h graph.apply_edges(fn.u_dot_v('h', 'h', 'score'), etype=etype) return graph.edges[etype].data['score']
- 负采样图的构建:
def construct_negative_graph(graph, k, etype): utype, _, vtype = etype src, dst = graph.edges(etype=etype) neg_src = src.repeat_interleave(k) neg_dst = torch.randint(0, graph.number_of_nodes(vtype), (len(src) * k,)) return dgl.heterograph( {etype: (neg_src, neg_dst)}, num_nodes_dict={ntype: graph.number_of_nodes(ntype) for ntype in graph.ntypes})
- 模型定义与训练过程基本与同构图的一样:
class Model(nn.Module): def __init__(self, in_features, hidden_features, out_features, rel_names): super().__init__() self.sage = RGCN(in_features, hidden_features, out_features, rel_names) self.pred = HeteroDotProductPredictor() def forward(self, g, neg_g, x, etype): h = self.sage(g, x) return self.pred(g, h, etype), self.pred(neg_g, h, etype) def compute_loss(pos_score, neg_score): # Margin loss n_edges = pos_score.shape[0] return (1 - neg_score.view(n_edges, -1) + pos_score.unsqueeze(1)).clamp(min=0).mean() k = 5 model = Model(10, 20, 5, hetero_graph.etypes) user_feats = hetero_graph.nodes['user'].data['feature'] item_feats = hetero_graph.nodes['item'].data['feature'] node_features = {'user': user_feats, 'item': item_feats} opt = torch.optim.Adam(model.parameters()) for epoch in range(10): negative_graph = construct_negative_graph(hetero_graph, k, ('user', 'click', 'item')) pos_score, neg_score = model(hetero_graph, negative_graph, node_features, ('user', 'click', 'item')) loss = compute_loss(pos_score, neg_score) opt.zero_grad() loss.backward() opt.step() print(loss.item())
5.4 整图分类
有时需要在多图上做分类问题, 如将人分为不同的群体, 通过定义不同群体中的人际关系, 可以得到很多张图来用于分类;
- 概述:
- 整图分类与之前三节所阐述的任务的不同之处在于预测结果需要表征整个输入图的性质, 即需要学习出一张图的张量表示; 当然整图分类中依然存在消息传递;
- 官方文档给出一张用于描述图分类处理的流程图:
- ① 准备一批图;
- ② 在这批图上执行消息传递来更新节点和边的特征;
- ③ 聚合所有边和节点的特征以得到图级别的表示;
- ④ 根据图级别的表示来进行图分类;
- 使用一批图的原因是一张张的训练实在是太慢了, 而
dgl
库中的dgl.batch
函数本质上是把一批图直接当成一个大图来处理, 形象地可以用下面地图来表示: - 关于Graph Readout:
- 每个图都有其独特地特征, 以及节点和边地特征, 所以为了得到一个单一的预测值, 通常需要聚合和总结所有可能的信息, 这种操作称为readout, 常见的readout操作包括再所有节点或边的特征上求和, 取最大值, 取最小值; 比如用所有节点特征的均值作为图的表示: h g = 1 ∣ V ∣ ∑ v ∈ V h v h_g=\frac{1}{|\mathcal{V}|}\sum_{v\in\mathcal{V}}h_v hg=∣V∣1v∈V∑hv
dgl
库提供了一系列内置的readout操作, 如dgl.readout_nodes()
;- 得到了图表示后就可以直接输入到神经网络里训练了;
- 编写神经网络模型:
- 模型的输入是一批图以及节点和边的特征;
- ① 一批图上的计算:
import dgl import torch g1 = dgl.graph(([0, 1], [1, 0])) g1.ndata['h'] = torch.tensor([1., 2.]) g2 = dgl.graph(([0, 1], [1, 2])) g2.ndata['h'] = torch.tensor([1., 2., 3.]) print(dgl.readout_nodes(g1, 'h')) # tensor([3.]) # 1 + 2 bg = dgl.batch([g1, g2]) print(dgl.readout_nodes(bg, 'h')) # tensor([3., 6.]) # [1 + 2, 1 + 2 + 3] print(bg.ndata['h']) # tensor([1., 2., 1., 2., 3.])
- 首先批次中不同的图是完全分离的, 即任意两个图之间不存在边, 因此这种好的性质使得消息传递函数仍然可以使用;
- 其次readout函数也是在不同的图上分开执行的, 假设batchsize为 B B B, 聚合后的图表示的维度为 D D D, 则readout函数输出的形状就是 ( B , D ) (B,D) (B,D);
- 最后一批图中每个点和边的特征可以通过依次拼接所有图中对应的特征得到;
- ② 模型定义:
import dgl.nn.pytorch as dglnn import torch.nn as nn class Classifier(nn.Module): def __init__(self, in_dim, hidden_dim, n_classes): super(Classifier, self).__init__() self.conv1 = dglnn.GraphConv(in_dim, hidden_dim) self.conv2 = dglnn.GraphConv(hidden_dim, hidden_dim) self.classify = nn.Linear(hidden_dim, n_classes) def forward(self, g, h): # Apply graph convolution and activation. h = F.relu(self.conv1(g, h)) h = F.relu(self.conv2(g, h)) with g.local_scope(): g.ndata['h'] = h # Calculate graph representation by average readout. hg = dgl.mean_nodes(g, 'h') return self.classify(hg)
- 模型训练的循环:
- ① 载入数据集:
import dgl.data dataset = dgl.data.GINDataset('MUTAG', False) def collate(samples): graphs, labels = map(list, zip(*samples)) batched_graph = dgl.batch(graphs) batched_labels = torch.tensor(labels) return batched_graph, batched_labels from torch.utils.data import DataLoader dataloader = DataLoader( dataset, batch_size=1024, collate_fn=collate, drop_last=False, shuffle=True)
- ② 训练循环:
import torch.nn.functional as F # Only an example, 7 is the input feature size model = Classifier(7, 20, 5) opt = torch.optim.Adam(model.parameters()) for epoch in range(20): for batched_graph, labels in dataloader: feats = batched_graph.ndata['attr'].float() logits = model(batched_graph, feats) loss = F.cross_entropy(logits, labels) opt.zero_grad() loss.backward() opt.step()
- 官方示例: DGL’s GIN example 中是一个end-to-end的图分类例子;
- 异构图上的整图分类:
- 跟上面一样, 和同构图的差别不大;
- 依然使用
RGCN
的代码作为示例:class RGCN(nn.Module): def __init__(self, in_feats, hid_feats, out_feats, rel_names): super().__init__() self.conv1 = dglnn.HeteroGraphConv({ rel: dglnn.GraphConv(in_feats, hid_feats) for rel in rel_names}, aggregate='sum') self.conv2 = dglnn.HeteroGraphConv({ rel: dglnn.GraphConv(hid_feats, out_feats) for rel in rel_names}, aggregate='sum') def forward(self, graph, inputs): # inputs is features of nodes h = self.conv1(graph, inputs) h = {k: F.relu(v) for k, v in h.items()} h = self.conv2(graph, h) return h class HeteroClassifier(nn.Module): def __init__(self, in_dim, hidden_dim, n_classes, rel_names): super().__init__() self.rgcn = RGCN(in_dim, hidden_dim, hidden_dim, rel_names) self.classify = nn.Linear(hidden_dim, n_classes) def forward(self, g): h = g.ndata['feat'] h = self.rgcn(g, h) with g.local_scope(): g.ndata['h'] = h # Calculate graph representation by average readout. hg = 0 for ntype in g.ntypes: hg = hg + dgl.mean_nodes(g, 'h', ntype=ntype) return self.classify(hg) # etypes is the list of edge types as strings. model = HeteroClassifier(10, 20, 5, etypes) opt = torch.optim.Adam(model.parameters()) for epoch in range(20): for batched_graph, labels in dataloader: logits = model(batched_graph) loss = F.cross_entropy(logits, labels) opt.zero_grad() loss.backward() opt.step()
Chapter 6: 大规模图上的随机训练
- 如果图的规模很大, b比如有上百万乃至数十亿的边和节点, 则上一章中提到的训练方法就无法起效了:
- 假设一个 L L L层的GCN, 隐层状态节点有 H H H个, 然后在一张有 N N N个节点的图上运行, 则需要至少 O ( N L H ) O(NLH) O(NLH)的显存空间, 一旦 N N N很大就会直接溢出;
-
因此本章主要介绍执行随机小批量训练的方法, 这样就不需要把整张图里的节点特征都输入到GPU中;
-
近邻采样方法(Neighborhood Sampling Approaches):
- 在每一次梯度下降优化中, 选择计算一小批的节点来计算它们在网络中的最终层(假设为第 L L L层)表示;
- 然后在第 L − 1 L-1 L−1层选择这些节点的所有或一些近邻;
- 这样一层层地从后往前迭代, 直到输入层;
- 可以用这张图片来描述这个过程:
- 这样就可以保存workload于计算资源, 然后在一张大图上训练GNN模型
dgl.sampling
: https://docs.dgl.ai/api/python/dgl.sampling.html ;dgl.sampling
模块中提供了一些近邻采样方法;
- 本章前三节将介绍不同场景下训练GNN模型的随机方法, 后三节则是一些比较高级的话题, 对于那些想要开发新的采样方法, 新的GNN模块来适应Mini-batch训练的人来说是可以查阅的, 后三节介绍了在Mini-batch中将如何评估以及推断模型的性能;
6.1 训练GNN来进行近邻采样的节点分类
为了实现随机训练, 需要进行三步走:
① 定义一个近邻采样器;
② 定义一个能够进行Mini-batch训练的模型;
③ 调整模型训练循环中的逻辑;
- 定义近邻采样器和数据加载器:
dgl
库中定义了几个内置的近邻采样器类, 以MultiLayerFullNeighborSampler
为例, 该采样器可以使节点聚合所有近邻的消息;- 此外使用
dgl
库的采样器时也必须和NodeDataLoader
结合使用, 该类是用于迭代minibatch上的节点集合; - 下面的代码定义了一个用于在批次中的训练节点ID数组
train_nids
上进行迭代, 并将生成的区块列表加载到GPU上的dataloader;import dgl import dgl.nn as dglnn import torch import torch.nn as nn import torch.nn.functional as F sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2) dataloader = dgl.dataloading.NodeDataLoader( g, train_nids, sampler, batch_size=1024, shuffle=True, drop_last=False, num_workers=4) input_nodes, output_nodes, blocks = next(iter(dataloader)) print(blocks)
class dgl.dataloading.neighbor.MultiLayerFullNeighborSampler(n_layers, return_eids=False)
;class dgl.dataloading.pytorch.NodeDataLoader(g, nids, block_sampler, **kwargs)
nids
就是用于计算输出节点的输入节点, 就是在最后一层选定这些nids
, 那么在输入的实际采样应该是哪些;
- 生成器每次生成三个变量:
- ①
input_nodes
是需要用来计算output_nodes
的表示的节点; - ②
output_nodes
就是输出 - ③
blocks
描述了每个GNN层中, 哪些节点表示是被计算为输出, 哪些节点表示是被需要当作输入, 以及输入节点的表示是如何传播到输出节点的;
- ①
- 详细的内置采样器, 可以参考neighborhood sampler API reference ;
- 本章第4节将说明如何自定义近邻采样器;
关于这个代码似乎很难跑通, 笔者用的是下面这个karate club problem的图, 但是还是跑不通, 设了很多不同的train_ids, 但是还是一直报很长的
Runtime Error
错误, 报错的最后一行是:RuntimeError: DataLoader worker (pid(s) 11136, 10940, 4672, 1352) exited unexpectedly
, 代码示例如下, 之后再来找问题了, 暂时还是搞不通, 不卡在这里了;
def build_karate_club_graph():
# All 78 edges are stored in two numpy arrays. One for source endpoints
# while the other for destination endpoints.
src = np.array([1, 2, 2, 3, 3, 3, 4, 5, 6, 6, 6, 7, 7, 7, 7, 8, 8, 9, 10, 10,
10, 11, 12, 12, 13, 13, 13, 13, 16, 16, 17, 17, 19, 19, 21, 21,
25, 25, 27, 27, 27, 28, 29, 29, 30, 30, 31, 31, 31, 31, 32, 32,
32, 32, 32, 32, 32, 32, 32, 32, 32, 33, 33, 33, 33, 33, 33, 33,
33, 33, 33, 33, 33, 33, 33, 33, 33, 33])
dst = np.array([0, 0, 1, 0, 1, 2, 0, 0, 0, 4, 5, 0, 1, 2, 3, 0, 2, 2, 0, 4,
5, 0, 0, 3, 0, 1, 2, 3, 5, 6, 0, 1, 0, 1, 0, 1, 23, 24, 2, 23,
24, 2, 23, 26, 1, 8, 0, 24, 25, 28, 2, 8, 14, 15, 18, 20, 22, 23,
29, 30, 31, 8, 9, 13, 14, 15, 18, 19, 20, 22, 23, 26, 27, 28, 29, 30,
31, 32])
# Edges are directional in DGL; Make them bi-directional.
u = np.concatenate([src, dst])
v = np.concatenate([dst, src])
# Construct a DGLGraph
return dgl.DGLGraph((u, v))
g = build_karate_club_graph()
train_nids = torch.tensor([0])
- 定义一个能够进行Mini-batch训练的模型;
- 如果所有消息传递模块都是使用的
dgl
库内置的接口函数, 则将模型调整为适应Mini-batch是非常简单的, 以二层GCN为例:class TwoLayerGCN(nn.Module): def __init__(self, in_features, hidden_features, out_features): super().__init__() self.conv1 = dglnn.GraphConv(in_features, hidden_features) self.conv2 = dglnn.GraphConv(hidden_features, out_features) def forward(self, g, x): x = F.relu(self.conv1(g, x)) x = F.relu(self.conv2(g, x)) return x
- 只需要将所有的
g
替换为上一点中生成的blocks
即可;class StochasticTwoLayerGCN(nn.Module): def __init__(self, in_features, hidden_features, out_features): super().__init__() self.conv1 = dgl.nn.GraphConv(in_features, hidden_features) self.conv2 = dgl.nn.GraphConv(hidden_features, out_features) def forward(self, blocks, x): x = F.relu(self.conv1(blocks[0], x)) x = F.relu(self.conv2(blocks[1], x)) return x
- 通过查阅
dgl.nn
中各个模块的说明 , 可以知道每个模块是否接受blocks
作为参数, 并不是所有的都可以这样改的, 但是大部分都可以, 比如GraphConv
模块; - 本章第5节介绍如何在自定义了消息传递模块的情况下将模型调整为适应Mini-batch训练;
- 模型训练的循环:
- 模型训练的循环由数据集上自定义的batch生成器构成, 在生成器的每一次迭代中会生成一个blocks的列表, 需要做以下四步操作:
model = StochasticTwoLayerGCN(in_features, hidden_features, out_features) model = model.cuda() opt = torch.optim.Adam(model.parameters()) for input_nodes, output_nodes, blocks in dataloader: blocks = [b.to(torch.device('cuda')) for b in blocks] input_features = blocks[0].srcdata['features'] output_labels = blocks[-1].dstdata['label'] output_predictions = model(blocks, input_features) loss = compute_loss(output_labels, output_predictions) opt.zero_grad() loss.backward() opt.step()
- ① 加载对应的输入节点特征到GPU上; 注意只需要加载输入节点的特征, 非批训练时会将所有节点特征都载入;
- 如果特征是保存在
g.ndata
中, 则可以通过blocks[0].srcdata
获得, 即输入节点的特征是在第一个block里;
- 如果特征是保存在
- ② 将blocks列表和输入节点特征一起输入进多层GNN中得到输出;
- ③ 加载对应的输出节点标签到GPU上, 注意只需要加载输出节点的标签, 非批训练时会将所有节标签都载入;
- 如果标签是保存在
g.ndata
中, 则可以通过blocks[-1].srcdata
获得, 即输入节点的特征是在最后一个block里;
- 如果标签是保存在
- ④ 计算损失函数于反向传播;
- ① 加载对应的输入节点特征到GPU上; 注意只需要加载输入节点的特征, 非批训练时会将所有节点特征都载入;
- 异构图上的处理:
- 同样是类似同构图的处理;
- 这里调用Chapter 5第1节中异构图部分的两层RGCN代码, 这里稍作修改:
class StochasticTwoLayerRGCN(nn.Module): def __init__(self, in_feat, hidden_feat, out_feat, rel_names): super().__init__() self.conv1 = dglnn.HeteroGraphConv({ rel : dglnn.GraphConv(in_feat, hidden_feat, norm='right') for rel in rel_names }) self.conv2 = dglnn.HeteroGraphConv({ rel : dglnn.GraphConv(hidden_feat, out_feat, norm='right') for rel in rel_names }) def forward(self, blocks, x): x = self.conv1(blocks[0], x) x = self.conv2(blocks[1], x) return x
- 定义采样器和数据加载器: 方法与同构图无异;
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2) dataloader = dgl.dataloading.NodeDataLoader( g, train_nid_dict, sampler, batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
- 模型训练的循环:
model = StochasticTwoLayerRGCN(in_features, hidden_features, out_features, etypes) model = model.cuda() opt = torch.optim.Adam(model.parameters()) for input_nodes, output_nodes, blocks in dataloader: blocks = [b.to(torch.device('cuda')) for b in blocks] input_features = blocks[0].srcdata # returns a dict output_labels = blocks[-1].dstdata # returns a dict output_predictions = model(blocks, input_features) loss = compute_loss(output_labels, output_predictions) opt.zero_grad() loss.backward() opt.step()
- 官方提供了end-to-end的随机训练示例: RGCN implementation ;
6.2 训练GNN来进行近邻采样的边分类
边分类基本与节点分类类似;
- 定义近邻采样器与数据加载器:
- 基本与上一节完全一致,
train_nids
替换成train_eids
, 其余细节不再赘述, 详细可见上一节对应部分:sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2) dataloader = dgl.dataloading.EdgeDataLoader( g, train_eid_dict, sampler, batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
class dgl.dataloading.pytorch.EdgeDataLoader(g, eids, block_sampler, **kwargs)
: DGL官方文档 ;
- 从用于近邻采样的原始图中移除minibatch里的边:
- 训练边分类模型时, 有时需要在计算依赖(computation dependency)中移除训练数据中出现的一些边, 否则模型将会提前知道两个节点间存在边的事实, 于是就会虚假地提升模型性能;
- 似乎有些晦涩难懂, 原文如下:
When training edge classification models, sometimes you wish to remove the edges appearing in the training data from the computation dependency as if they never existed. Otherwise, the model will ‘know’ the fact that an edge exists between the two nodes, and potentially use it for advantage.
- 似乎有些晦涩难懂, 原文如下:
- 可以通过设置
EdgeDataLoader
地构造参数exclude='reverse_id'
来实现这种效果:n_edges = g.number_of_edges() dataloader = dgl.dataloading.EdgeDataLoader( g, train_eid_dict, sampler, # The following two arguments are specifically for excluding the minibatch # edges and their reverse edges from the original graph for neighborhood # sampling. exclude='reverse_id', reverse_eids=torch.cat([ torch.arange(n_edges // 2, n_edges), torch.arange(0, n_edges // 2)]), batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
- 调整模型适应minibatch训练:
- 边分类模型由两部分构成:
- ① 第一部分获取事件节点(incident nodes)地特征表示;
class StochasticTwoLayerGCN(nn.Module): def __init__(self, in_features, hidden_features, out_features): super().__init__() self.conv1 = dglnn.GraphConv(in_features, hidden_features) self.conv2 = dglnn.GraphConv(hidden_features, out_features) def forward(self, blocks, x): x = F.relu(self.conv1(blocks[0], x)) x = F.relu(self.conv2(blocks[1], x)) return x
- ② 第二部分根据①中的特征表示计算边的得分;
class ScorePredictor(nn.Module): def __init__(self, num_classes, in_features): super().__init__() self.W = nn.Linear(2 * in_features, num_classes) def apply_edges(self, edges): data = torch.cat([edges.src['x'], edges.dst['x']]) return {'score': self.W(data)} def forward(self, edge_subgraph, x): with edge_subgraph.local_scope(): edge_subgraph.ndata['x'] = x edge_subgraph.apply_edges(self.apply_edges) return edge_subgraph.edata['score']
- 第一部分与上一节的节点分类时完全相同的, 因此可以直接重复使用, 它的输入依旧是数据加载器生成的blocks列表, 以及输入特征;
- 第二部分的输入通常是第一部分的输出结果, 以及通过minibatch中的边推导出的原始图的子图;
- 通常子图也是可以从相同的数据加载器得到;
- 通过调用
dgl.DGLHeteroGraph.apply_edges()
可以计算出子图上边的得分;
- 模型代码示例:
class Model(nn.Module): def __init__(self, in_features, hidden_features, out_features, num_classes): super().__init__() self.gcn = StochasticTwoLayerGCN( in_features, hidden_features, out_features) self.predictor = ScorePredictor(num_classes, out_features) def forward(self, edge_subgraph, blocks, x): x = self.gcn(blocks, x) return self.predictor(edge_subgraph, x)
- 模型输入依然是blocks列表和数据加载器生成的边子图, 以及输入特征;
dgl
确保边子图中的节点与最后一个block的输出节点是相同的;
- 模型训练的循环:
- 代码示例:
model = Model(in_features, hidden_features, out_features, num_classes) model = model.cuda() opt = torch.optim.Adam(model.parameters()) for input_nodes, edge_subgraph, blocks in dataloader: blocks = [b.to(torch.device('cuda')) for b in blocks] edge_subgraph = edge_subgraph.to(torch.device('cuda')) input_features = blocks[0].srcdata['features'] edge_labels = edge_subgraph.edata['labels'] edge_predictions = model(edge_subgraph, blocks, input_features) loss = compute_loss(edge_labels, edge_predictions) opt.zero_grad() loss.backward() opt.step()
- 每次循环得到一个由minibatch推导出的子图, 以及必要的blocks列表用于计算它们incident node的表示;
- 异构图上的边分类处理:
- 这与异构图上的节点分类类似;
- 首先定义类似的二层RGCN:
class StochasticTwoLayerRGCN(nn.Module): def __init__(self, in_feat, hidden_feat, out_feat, rel_names): super().__init__() self.conv1 = dglnn.HeteroGraphConv({ rel : dglnn.GraphConv(in_feat, hidden_feat, norm='right') for rel in rel_names }) self.conv2 = dglnn.HeteroGraphConv({ rel : dglnn.GraphConv(hidden_feat, out_feat, norm='right') for rel in rel_names }) def forward(self, blocks, x): x = self.conv1(blocks[0], x) x = self.conv2(blocks[1], x) return x
- 得分预测器: 与同构图的区别是需要循环各个类型的边进行
apply_edges()
调用:class ScorePredictor(nn.Module): def __init__(self, num_classes, in_features): super().__init__() self.W = nn.Linear(2 * in_features, num_classes) def apply_edges(self, edges): data = torch.cat([edges.src['x'], edges.dst['x']]) return {'score': self.W(data)} def forward(self, edge_subgraph, x): with edge_subgraph.local_scope(): edge_subgraph.ndata['x'] = x for etype in edge_subgraph.canonical_etypes: edge_subgraph.apply_edges(self.apply_edges, etype=etype) return edge_subgraph.edata['score'] class Model(nn.Module): def __init__(self, in_features, hidden_features, out_features, num_classes, etypes): super().__init__() self.rgcn = StochasticTwoLayerRGCN( in_features, hidden_features, out_features, etypes) self.pred = ScorePredictor(num_classes, out_features) def forward(self, edge_subgraph, blocks, x): x = self.rgcn(blocks, x) return self.pred(edge_subgraph, x)
- 得分预测器与节点分类时的异构图处理类似, 只是将
NodeDataLoader
替换成EdgeDataLoader
:sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2) dataloader = dgl.dataloading.EdgeDataLoader( g, train_eid_dict, sampler, batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
- 如果需要移除异构图上的一些边(正如本节第2点中所述的同构图情况), 就与同构图稍有区别了:
- 异构图上如果将边反向(reverse), 则可能类型就变了: 比如follow就会变成followed by, 所以不能简单的直接反向, 需要定义每种类型的边反向后的类型:
dataloader = dgl.dataloading.EdgeDataLoader( g, train_eid_dict, sampler, # The following two arguments are specifically for excluding the minibatch # edges and their reverse edges from the original graph for neighborhood # sampling. exclude='reverse_types', reverse_etypes={'follow': 'followed by', 'followed by': 'follow', 'purchase': 'purchased by', 'purchased by': 'purchase'} batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
- 模型训练的循环基本与同构图类似:
model = Model(in_features, hidden_features, out_features, num_classes, etypes) model = model.cuda() opt = torch.optim.Adam(model.parameters()) for input_nodes, edge_subgraph, blocks in dataloader: blocks = [b.to(torch.device('cuda')) for b in blocks] edge_subgraph = edge_subgraph.to(torch.device('cuda')) input_features = blocks[0].srcdata['features'] edge_labels = edge_subgraph.edata['labels'] edge_predictions = model(edge_subgraph, blocks, input_features) loss = compute_loss(edge_labels, edge_predictions) opt.zero_grad() loss.backward() opt.step()
- 官方的异构图(一个异构的二分图)边分类示例: GCMC ;
6.3 训练GNN来进行近邻采样的链接预测
- 定义近邻采样器与数据加载器:
- 代码示例:
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2) dataloader = dgl.dataloading.EdgeDataLoader( g, train_seeds, sampler, negative_sampler=dgl.dataloading.negative_sampler.Uniform(5), batch_size=args.batch_size, shuffle=True, drop_last=False, pin_memory=True, num_workers=args.num_workers)
- 这里的不同之处是数据加载器的构造参数不是
train_nids
或train.eids
, 而是train_seeds
; dgl.dataloading.negative_sampler.Uniform
可以用于均匀采样;
- 这里的不同之处是数据加载器的构造参数不是
- 自定义负采样器:
class NegativeSampler(object): def __init__(self, g, k): # caches the probability distribution self.weights = g.in_degrees().float() ** 0.75 self.k = k def __call__(self, g, eids): src, _ = g.find_edges(eids) src = src.repeat_interleave(self.k) dst = self.weights.multinomial(len(src), replacement=True) return src, dst dataloader = dgl.dataloading.EdgeDataLoader( g, train_seeds, sampler, negative_sampler=NegativeSampler(g, 5), batch_size=args.batch_size, shuffle=True, drop_last=False, pin_memory=True, num_workers=args.num_workers)
- 对于每条边的每个源节点, 将会采样 k k k个负样本的目标节点;
- 当一个负采样器被当作数据加载器的构造参数, 则每次迭代会在minibatch中生成三个变量:
- ① 正图(positive graph): 包含所有在minibatch中采样的边;
- ② 负图(negative graph): 包含所有负采样器生成的不存在的边;
- ③ 由近邻采样器生成的blocks列表;
- 调整模型适应minibatch训练:
- 正如Chapter 5的第3节所属, 链接预测是通过比较正样本边的得分与负样本边的得分实现的, 所以需要实现得分预测器, 以及与前两节相同的二层RGCN模块:
class StochasticTwoLayerGCN(nn.Module): def __init__(self, in_features, hidden_features, out_features): super().__init__() self.conv1 = dgl.nn.GraphConv(in_features, hidden_features) self.conv2 = dgl.nn.GraphConv(hidden_features, out_features) def forward(self, blocks, x): x = F.relu(self.conv1(blocks[0], x)) x = F.relu(self.conv2(blocks[1], x)) return x class ScorePredictor(nn.Module): def forward(self, edge_subgraph, x): with edge_subgraph.local_scope(): edge_subgraph.ndata['x'] = x edge_subgraph.apply_edges(dgl.function.u_dot_v('x', 'x', 'score')) return edge_subgraph.edata['score']
- 得分预测器里只需要预测一个标量即可, 无需预测概率分布;
- 模型定义代码示例:
class Model(nn.Module): def __init__(self, in_features, hidden_features, out_features): super().__init__() self.gcn = StochasticTwoLayerGCN( in_features, hidden_features, out_features) def forward(self, positive_graph, negative_graph, blocks, x): x = self.gcn(blocks, x) pos_score = self.predictor(positive_graph, x) neg_score = self.predictor(negative_graph, x) return pos_score, neg_score
- 模型训练的循环:
- 代码示例:
model = Model(in_features, hidden_features, out_features) model = model.cuda() opt = torch.optim.Adam(model.parameters()) for input_nodes, positive_graph, negative_graph, blocks in dataloader: blocks = [b.to(torch.device('cuda')) for b in blocks] positive_graph = positive_graph.to(torch.device('cuda')) negative_graph = negative_graph.to(torch.device('cuda')) input_features = blocks[0].srcdata['features'] pos_score, neg_score = model(positive_graph, negative_graph, blocks, input_features) loss = compute_loss(pos_score, neg_score) opt.zero_grad() loss.backward() opt.step()
- 同构图上的链接预测官方示例: unsupervised learning GraphSAGE ;
- 异构图上的链接预测处理:
- 基本类似前两节对异构图的处理;
- 二层RGCN模块:
class StochasticTwoLayerRGCN(nn.Module): def __init__(self, in_feat, hidden_feat, out_feat, rel_names): super().__init__() self.conv1 = dglnn.HeteroGraphConv({ rel : dglnn.GraphConv(in_feat, hidden_feat, norm='right') for rel in rel_names }) self.conv2 = dglnn.HeteroGraphConv({ rel : dglnn.GraphConv(hidden_feat, out_feat, norm='right') for rel in rel_names }) def forward(self, blocks, x): x = self.conv1(blocks[0], x) x = self.conv2(blocks[1], x) return x
- 得分预测器及模型定义: 与同构图的不同之处在于需要循环每一种边的类型进行
dgl.DGLHeteroGraph.apply_edges()
;class ScorePredictor(nn.Module): def forward(self, edge_subgraph, x): with edge_subgraph.local_scope(): edge_subgraph.ndata['x'] = x for etype in edge_subgraph.canonical_etypes: edge_subgraph.apply_edges( dgl.function.u_dot_v('x', 'x', 'score'), etype=etype) return edge_subgraph.edata['score'] class Model(nn.Module): def __init__(self, in_features, hidden_features, out_features, num_classes, etypes): super().__init__() self.rgcn = StochasticTwoLayerRGCN( in_features, hidden_features, out_features, etypes) self.pred = ScorePredictor() def forward(self, positive_graph, negative_graph, blocks, x): x = self.rgcn(blocks, x) pos_score = self.pred(positive_graph, x) neg_score = self.pred(negative_graph, x) return pos_score, neg_score
- 数据加载器: 基本与前两节的异构图处理相同, 唯一的区别是需要提供负采样器, 以及需要提供边类型的字典以及边ID的张量, 而非提供节点类型的字典以及节点ID的张量:
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2) dataloader = dgl.dataloading.EdgeDataLoader( g, train_eid_dict, sampler, negative_sampler=dgl.dataloading.negative_sampler.Uniform(5), batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
- 自定义负采样器:
class NegativeSampler(object): def __init__(self, g, k): # caches the probability distribution self.weights = { etype: g.in_degrees(etype=etype).float() ** 0.75 for etype in g.canonical_etypes} self.k = k def __call__(self, g, eids_dict): result_dict = {} for etype, eids in eids_dict.items(): src, _ = g.find_edges(eids, etype=etype) src = src.repeat_interleave(self.k) dst = self.weights.multinomial(len(src), replacement=True) result_dict[etype] = (src, dst) return result_dict dataloader = dgl.dataloading.EdgeDataLoader( g, train_eid_dict, sampler, negative_sampler=NegativeSampler(g, 5), batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
- 构造参数应当是原始图以及边类型的字典以及边ID的张量;
- 返回值应当为便类型的字典和source-destination的数组对;
- 模型训练的循环:
model = Model(in_features, hidden_features, out_features, num_classes, etypes) model = model.cuda() opt = torch.optim.Adam(model.parameters()) for input_nodes, positive_graph, negative_graph, blocks in dataloader: blocks = [b.to(torch.device('cuda')) for b in blocks] positive_graph = positive_graph.to(torch.device('cuda')) negative_graph = negative_graph.to(torch.device('cuda')) input_features = blocks[0].srcdata['features'] pos_score, neg_score = model(positive_graph, negative_graph, blocks, input_features) loss = compute_loss(pos_score, neg_score) opt.zero_grad() loss.backward() opt.step()
6.4 自定义近邻采样器
进阶篇章, 通常不建议自定义近邻采样器, 除非做相关研究;
- 在paper How Powerful are Graph Neural Networks 中, 定义了消息传递: a v ( l ) = ρ ( l ) ( { h u ( l − 1 ) : u ∈ N ( v ) } ) h v ( l ) = ϕ ( l ) ( h v ( l − 1 ) , a v ( l ) ) a_v^{(l)}=\rho^{(l)}(\{h_u^{(l-1):u\in\mathcal{N}(v)}\})\\h_v^{(l)}=\phi^{(l)}(h_v^{(l-1)},a_v^{(l)}) av(l)=ρ(l)({hu(l−1):u∈N(v)})hv(l)=ϕ(l)(hv(l−1),av(l))
- 其中 ρ ( l ) \rho^{(l)} ρ(l)与 ϕ ( l ) \phi^{(l)} ϕ(l)是参数化的函数;
- N ( v ) \mathcal{N}(v) N(v)的定义是在图 G \mathcal{G} G上的 v v v的先祖(predecessors, 即指向 v v v的集合, 无向图就是近邻);
- 图解消息传递:
- 利用纸和笔来做近邻采样:
- 接下来考虑多层消息传递是如何工作的, 在输入为一个节点(种子节点)时, 输出节点是哪些; 下文将说明哪些节点应当被当作输入的种子节点;
- 首先构建一张图, 随机赋予节点一些特征值
x
和y
;import torch import dgl src = torch.LongTensor( [0, 0, 0, 1, 2, 2, 2, 3, 3, 4, 4, 5, 5, 6, 7, 7, 8, 9, 10, 1, 2, 3, 3, 3, 4, 5, 5, 6, 5, 8, 6, 8, 9, 8, 11, 11, 10, 11]) dst = torch.LongTensor( [1, 2, 3, 3, 3, 4, 5, 5, 6, 5, 8, 6, 8, 9, 8, 11, 11, 10, 11, 0, 0, 0, 1, 2, 2, 2, 3, 3, 4, 4, 5, 5, 6, 7, 7, 8, 9, 10]) g = dgl.graph((src, dst)) g.ndata['x'] = torch.randn(12, 5) g.ndata['y'] = torch.randn(12, 1)
- 找到消息传递的依赖:
- 考虑二层GNN, 种子节点为8(图中红色):
- 带入paper中的消息传递公式: a 8 ( 2 ) = ρ ( 2 ) ( { h u 1 : i ∈ N ( 8 ) } ) = ρ ( 2 ) ( { h 4 ( 1 ) , h 5 ( 1 ) , h 7 ( 1 ) , h 11 ( 1 ) } ) h 8 ( 2 ) = ϕ ( 2 ) ( h 8 ( 1 ) , a 8 ( 2 ) ) a_8^{(2)}=\rho^{(2)}(\{h_u^{1}:i\in\mathcal{N}(8)\})=\rho^{(2)}(\{h_4^{(1)},h_5^{(1)},h_7^{(1)},h_{11}^{(1)}\})\\h_8^{(2)}=\phi^{(2)}(h_8^{(1)},a_8^{(2)}) a8(2)=ρ(2)({hu1:i∈N(8)})=ρ(2)({h4(1),h5(1),h7(1),h11(1)})h8(2)=ϕ(2)(h8(1),a8(2))
- 可以直接用下图表示本轮消息传递:
- 该图中保留了所有节点, 但是只保留了必要的用于消息传递的边, 我们将这张子图称为红色节点在GNN第二层的frontier;
dgl
库内置了一些函数来生成frontier, 如dgl.in_subgraph()
, 可以用于推断出一个带有原始图中所有节点的子图, 但只保留必要的一些边:frontier = dgl.in_subgraph(g, [8]) print(frontier.all_edges())
- 详细接口函数可以查阅Subgraph Extraction Ops 和dgl.sampling ;
- 本节后面将会介绍如何实现自定义的近邻采样器;
- 用于多层minibatch消息传递的二分图结构
- 问题在于通过 h ⋅ ( 1 ) h_{\cdot}^{(1)} h⋅(1)计算 h 8 ( 2 ) h_8^{(2)} h8(2)时, 不能简单地在frontier上直接进行消息传递, 因为它还有很多原始图的节点保留, 但却没有了边;
- 事实上消息传递中只需要 ( 4 , 5 , 7 , 8 , 11 ) (4,5,7,8,11) (4,5,7,8,11)这些节点作为输入, 以及 8 8 8作为输出, 由于输入输出不一致, 就需要在一个规模较小的具有二分结构的图上进行消息传递:
- 我们将这种二分图称为一个block, 它只包含必要的输入节点和输出节点;
- 注意输出节点8出现在了输入节点中, 原因是消息传递后, 节点8需要用来进行特征合并, 比如在 ϕ ( 2 ) \phi^{(2)} ϕ(2)函数中节点8时作为输入的;
dgl.to_block(g, dst_nodes=None, include_dst_in_src=True)
可以将任何frontier转为一个block, 参数g
即为子图frontier;output_nodes = torch.LongTensor([8]) block = dgl.to_block(frontier, output_nodes)
- 使用
dgl.DGLHeteroGraph.number_of_src_nodes()
和dgl.DGLHeteroGraph.number_of_dst_nodes()
来确定输入节点和输出节点的数量:num_input_nodes, num_output_nodes = block.number_of_src_nodes(), block.number_of_dst_nodes() print(num_input_nodes, num_output_nodes)
- 使用
dgl.DGLHeteroGraph.srcdata
,dgl.DGLHeteroGraph.srcnodes
可以获取block的输入节点特征, 输出节点特征则可以通过dgl.DGLHeteroGraph.dstdata
,dgl.DGLHeteroGraph.dstnodes
得到;srcdata/dstdata
,srcnodes/dstnodes
本身与dgl.DGLHeteroGraph.ndata
与dgl.DGLHeteroGraph.ndata
是相同的; 并且可以通过srcdata/dstdata
来获取block的输入节点和输出节点;block.srcdata['h'] = torch.randn(num_input_nodes, 5) block.dstdata['h'] = torch.randn(num_output_nodes, 5) print(block.srcdata['x']) print(block.dstdata['y'])
- 通过
dgl.NID
和dgl.EID
可以获取输入输出节点的特征:input_nodes = block.srcdata[dgl.NID] output_nodes = block.dstdata[dgl.NID] assert torch.equal(input_nodes[:len(output_nodes)], output_nodes)
dgl
库确保一个block中的输出节点总是出现在输入节点中, 且永远是在输入节点的最一开始的位置, 正如这段代码所断言的那样;- 总之输入节点一定包含输出节点;
- 一个例子: 考虑如下的一个frontier
- 所有的红色节点和绿色节点( ( 4 , 5 , 7 , 8 , 11 ) (4,5,7,8,11) (4,5,7,8,11))都是某个边的目标节点, 下述代码将会报错:
dgl.to_block(frontier2, torch.LongTensor([4, 5])) # ERROR
- 原因是输出节点没有包含输入节点;
- 输出节点可以包含离群点(没有任何边与它连接):
# Node 3 is an isolated node that do not have any edge pointing to it. block3 = dgl.to_block(frontier2, torch.LongTensor([4, 5, 7, 8, 11, 3])) print(block3.srcdata[dgl.NID]) print(block3.dstdata[dgl.NID])
- 异构图中的block:
- block在异构图中也是可以工作的, 以下是一个frontier:
hetero_frontier = dgl.heterograph({ ('user', 'follow', 'user'): ([1, 3, 7], [3, 6, 8]), ('user', 'play', 'game'): ([5, 5, 4], [6, 6, 2]), ('game', 'played-by', 'user'): ([2], [6]) }, num_nodes_dict={'user': 10, 'game': 10})
- 可以创建一个block:
hetero_block = dgl.to_block(hetero_frontier, {'user': [3, 6, 8], 'block': [2, 6]})
- 获取输出与输入节点的类型:
# input users and games print(hetero_block.srcnodes['user'].data[dgl.NID], hetero_block.srcnodes['game'].data[dgl.NID]) # output users and games print(hetero_block.dstnodes['user'].data[dgl.NID], hetero_block.dstnodes['game'].data[dgl.NID])
- 实现一个自定义的近邻采样器:
- 以
MultiLayerFullNeighborSampler
为例, 它的父类是BlockSampler
;BlockSampler
是用于从最后义层生成blocks列表的类, 使用了sample_blocks()
方法, 默认的方法实现就是进行反向迭代, 生成frontier再把它们转为blocks;
- 因此对于近邻采样, 只需要实现
sample_frontier()
方法, 给定采样器再哪个层生成frontier, 以及原始图和节点用于计算特征表示; - 同时需要向父类传递GNN的总层数;
- 代码示例:
MultiLayerFullNeighborSampler
;class MultiLayerFullNeighborSampler(dgl.dataloading.BlockSampler): def __init__(self, n_layers): super().__init__(n_layers) def sample_frontier(self, block_id, g, seed_nodes): frontier = dgl.in_subgraph(g, seed_nodes) return frontier
- 代码示例: dgl.dataloading.neighbor.MultiLayerNeighborSampler ;
class MultiLayerNeighborSampler(dgl.dataloading.BlockSampler): def __init__(self, fanouts): super().__init__(len(fanouts)) self.fanouts = fanouts def sample_frontier(self, block_id, g, seed_nodes): fanout = self.fanouts[block_id] if fanout is None: frontier = dgl.in_subgraph(g, seed_nodes) else: frontier = dgl.sampling.sample_neighbors(g, seed_nodes, fanout) return frontier
- 这是一个更复杂的近邻采样器类;
- 虽然这些函数可以生成frontier, 但是任何一个图带有相同节点也可以当作一个frontier, 所以如果想要随机丢弃一些连接到种子节点的inbound edges, 就可以定义如下的一个采样器:
class MultiLayerDropoutSampler(dgl.dataloading.BlockSampler): def __init__(self, p, n_layers): super().__init__() self.n_layers = n_layers self.p = p def sample_frontier(self, block_id, g, seed_nodes, *args, **kwargs): # Get all inbound edges to `seed_nodes` src, dst = dgl.in_subgraph(g, seed_nodes).all_edges() # Randomly select edges with a probability of p mask = torch.zeros_like(src).bernoulli_(self.p) src = src[mask] dst = dst[mask] # Return a new graph with the same nodes as the original graph as a # frontier frontier = dgl.graph((src, dst), num_nodes=g.number_of_nodes()) return frontier def __len__(self): return self.n_layers
- 实现了自定义好采样器后就可以通过数据加载器进行测试:
sampler = MultiLayerDropoutSampler(0.5, 2) dataloader = dgl.dataloading.NodeDataLoader( g, train_nids, sampler, batch_size=1024, shuffle=True, drop_last=False, num_workers=4) model = StochasticTwoLayerRGCN(in_features, hidden_features, out_features) model = model.cuda() opt = torch.optim.Adam(model.parameters()) for input_nodes, blocks in dataloader: blocks = [b.to(torch.device('cuda')) for b in blocks] input_features = blocks[0].srcdata # returns a dict output_labels = blocks[-1].dstdata # returns a dict output_predictions = model(blocks, input_features) loss = compute_loss(output_labels, output_predictions) opt.zero_grad() loss.backward() opt.step()
- 异构图上的自定义采样器:
- 本质上与同构图并没有说明不同, 这里以
MultiLayerDropoutSampler
为例改写一个适用于异构图的MultiLayerDropoutSampler
:
class MultiLayerDropoutSampler(dgl.dataloading.BlockSampler): def __init__(self, p, n_layers): super().__init__() self.n_layers = n_layers self.p = p def sample_frontier(self, block_id, g, seed_nodes, *args, **kwargs): # Get all inbound edges to `seed_nodes` sg = dgl.in_subgraph(g, seed_nodes) new_edges_masks = {} # Iterate over all edge types for etype in sg.canonical_etypes: edge_mask = torch.zeros(sg.number_of_edges(etype)) edge_mask.bernoulli_(self.p) new_edges_masks[etype] = edge_mask.bool() # Return a new graph with the same nodes as the original graph as a # frontier frontier = dgl.edge_subgraph(new_edge_masks, preserve_nodes=True) return frontier def __len__(self): return self.n_layers
- 本质上与同构图并没有说明不同, 这里以
6.5 实现用于mini-batch训练的自定义GNN模块
卧槽上一节看吐了, 长的离谱; 后两节的长度终于正常的…
Chapter 3中自定义GNN模块与本节自定义GNN模块基本是相似的, 区别在于本节是需要再blocks上进行运算, 本质和图的运算时差不多的;
- 对比用于全图的自定义GNN模块与用于blocks的自定义GNN模块:
- 全图代码示例:
class CustomGraphConv(nn.Module): def __init__(self, in_feats, out_feats): super().__init__() self.W = nn.Linear(in_feats * 2, out_feats) def forward(self, g, h): with g.local_scope(): g.ndata['h'] = h g.update_all(fn.copy_u('h', 'm'), fn.mean('m', 'h_neigh')) return self.W(torch.cat([g.ndata['h'], g.ndata['h_neigh']], 1))
- 用于blocks的代码示例:
class CustomGraphConv(nn.Module): def __init__(self, in_feats, out_feats): super().__init__() self.W = nn.Linear(in_feats * 2, out_feats) # h is now a pair of feature tensors for input and output nodes, instead of # a single feature tensor. # def forward(self, g, h): def forward(self, block, h): # with g.local_scope(): with block.local_scope(): # g.ndata['h'] = h h_src = h h_dst = h[:block.number_of_dst_nodes()] block.srcdata['h'] = h_src block.dstdata['h'] = h_dst # g.update_all(fn.copy_u('h', 'm'), fn.mean('m', 'h_neigh')) block.update_all(fn.copy_u('h', 'm'), fn.mean('m', 'h_neigh')) # return self.W(torch.cat([g.ndata['h'], g.ndata['h_neigh']], 1)) return self.W(torch.cat( [block.dstdata['h'], block.dstdata['h_neigh']], 1))
- 总体而言, 从全图转为blocks需要做下面四件事:
- ① 通过对位于头部的几行进行切片, 来从输入特征获得输出节点的特征, 切片的行数直接可以用
block.number_of_dst_nodes
获得; - ② 将
g.ndata
替换为block.srcdata
(输入节点)或block.dstdata
(输出节点), 前提这是同构图, 即只有一种类型的节点; - ③ 将
g.nodes
替换为block.srcnodes
(输入节点)或block.dstnodes
(输出节点), 前提这是同构图, 即只有一种类型的节点; - ④ 将
g.number_of_nodes
替换为block.number_of_src_nodes
(输入节点)或block.number_of_dst_nodes
(输出节点);
- ① 通过对位于头部的几行进行切片, 来从输入特征获得输出节点的特征, 切片的行数直接可以用
- 异构图的情况:
- 规则跟第1点中提到的差不多;
- 代码示例一:
class CustomHeteroGraphConv(nn.Module): def __init__(self, g, in_feats, out_feats): super().__init__() self.Ws = nn.ModuleDict() for etype in g.canonical_etypes: utype, _, vtype = etype self.Ws[etype] = nn.Linear(in_feats[utype], out_feats[vtype]) for ntype in g.ntypes: self.Vs[ntype] = nn.Linear(in_feats[ntype], out_feats[ntype]) def forward(self, g, h): with g.local_scope(): for ntype in g.ntypes: g.nodes[ntype].data['h_dst'] = self.Vs[ntype](h[ntype]) g.nodes[ntype].data['h_src'] = h[ntype] for etype in g.canonical_etypes: utype, _, vtype = etype g.update_all( fn.copy_u('h_src', 'm'), fn.mean('m', 'h_neigh'), etype=etype) g.nodes[vtype].data['h_dst'] = g.nodes[vtype].data['h_dst'] + \ self.Ws[etype](g.nodes[vtype].data['h_neigh']) return {ntype: g.nodes[ntype].data['h_dst'] for ntype in g.ntypes}
- 代码示例二:
class CustomHeteroGraphConv(nn.Module): def __init__(self, g, in_feats, out_feats): super().__init__() self.Ws = nn.ModuleDict() for etype in g.canonical_etypes: utype, _, vtype = etype self.Ws[etype] = nn.Linear(in_feats[utype], out_feats[vtype]) for ntype in g.ntypes: self.Vs[ntype] = nn.Linear(in_feats[ntype], out_feats[ntype]) def forward(self, g, h): with g.local_scope(): for ntype in g.ntypes: h_src, h_dst = h[ntype] g.dstnodes[ntype].data['h_dst'] = self.Vs[ntype](h[ntype]) g.srcnodes[ntype].data['h_src'] = h[ntype] for etype in g.canonical_etypes: utype, _, vtype = etype g.update_all( fn.copy_u('h_src', 'm'), fn.mean('m', 'h_neigh'), etype=etype) g.dstnodes[vtype].data['h_dst'] = \ g.dstnodes[vtype].data['h_dst'] + \ self.Ws[etype](g.dstnodes[vtype].data['h_neigh']) return {ntype: g.dstnodes[ntype].data['h_dst'] for ntype in g.ntypes}
- 编写自定义模块使得能够再同构图, 二分图和block上工作:
dgl
库中内置的所有消息传递模块都可以在同构图, 无向二分图(包含两种节点类型和一种边类型), 只有一种边类型的blocks上; 因此内置的dgl
神经网络模块的输入必须是上述三种图之一:- ① 如果输入特征是一对张量, 则输入图必须是无向二分图;
- ② 如果输入特征是一个张量且输入图是一个block, 则
dgl
库会自动将输出节点上的特征作为输入节点特征的前几行; - ③ 如果输入特征必须是一个张量且输入图不是一个block, 则输入图必须是同构图;
- 以
dgl.nn.pytorch.SAGEConv
为例:import dgl.function as fn class SAGEConv(nn.Module): def __init__(self, in_feats, out_feats): super().__init__() self.W = nn.Linear(in_feats * 2, out_feats) def forward(self, g, h): if isinstance(h, tuple): h_src, h_dst = h elif g.is_block: h_src = h h_dst = h[:g.number_of_dst_nodes()] else: h_src = h_dst = h g.srcdata['h'] = h_src g.dstdata['h'] = h_dst g.update_all(fn.copy_u('h', 'm'), fn.sum('m', 'h_neigh')) return F.relu( self.W(torch.cat([g.dstdata['h'], g.dstdata['h_neigh']], 1)))
- 在Chapter 3中已经对
dgl.nn.pytorch.SAGEConv
做了完整的一遍梳理, 它可以在上面提到的三种图上工作;
- 在Chapter 3中已经对
6.6 大规模图上的精确线下推断
- 概述:
- 子图采样和邻域采样都可以减少使用GPU训练GNN的内存和时间消耗; 执行推理时, 通常最好在所有邻居上进行实际汇总, 而不要摆脱采样带来的随机性; 但是, 由于内存有限, 在GPU上全图正向传播通常是不可行的, 而由于计算速度较慢, 在CPU上则无法进行全图正向传播; 本节介绍了通过minibatch和邻域采样在有限的GPU内存下进行全图正向传播的方法;
- 推理算法与训练算法不同, 因为所有节点的表示应从第一层开始逐层计算; 具体来说, 对于特定层, 我们需要以小批量的形式计算该GNN层中所有节点的输出表示; 结果是, 推理算法将具有在层上迭代的外循环和在节点的小批处理上迭代的内循环; 相反, 训练算法具有在节点的小批上迭代的外循环, 以及在层上迭代的内循环, 用于邻域采样和消息传递;
- 下面的动画显示了计算的样子(请注意, 对于每一层, 只绘制了前三个小批处理):
- 实现线下推断:
- 以前文数次提到的二层GCN为例, 实现线下推断的方法主要就是使用
MultiLayerFullNeighborSampler
, 但是只对每层采样一次; - 注意线下推断被实现为GNN模块的一个方法, 原因是一层的计算也依赖于消息是如何被聚合与合并的;
class StochasticTwoLayerGCN(nn.Module): def __init__(self, in_features, hidden_features, out_features): super().__init__() self.hidden_features = hidden_features self.out_features = out_features self.conv1 = dgl.nn.GraphConv(in_features, hidden_features) self.conv2 = dgl.nn.GraphConv(hidden_features, out_features) self.n_layers = 2 def forward(self, blocks, x): x_dst = x[:blocks[0].number_of_dst_nodes()] x = F.relu(self.conv1(blocks[0], (x, x_dst))) x_dst = x[:blocks[1].number_of_dst_nodes()] x = F.relu(self.conv2(blocks[1], (x, x_dst))) return x def inference(self, g, x, batch_size, device): """ Offline inference with this module """ # Compute representations layer by layer for l, layer in enumerate([self.conv1, self.conv2]): y = torch.zeros(g.number_of_nodes(), self.hidden_features if l != self.n_layers - 1 else self.out_features) sampler = dgl.dataloading.MultiLayerFullNeighborSampler(1) dataloader = dgl.dataloading.NodeDataLoader( g, torch.arange(g.number_of_nodes()), sampler, batch_size=batch_size, shuffle=True, drop_last=False) # Within a layer, iterate over nodes in batches for input_nodes, output_nodes, blocks in dataloader: block = blocks[0] # Copy the features of necessary input nodes to GPU h = x[input_nodes].to(device) # Compute output. Note that this computation is the same # but only for a single layer. h_dst = h[:block.number_of_dst_nodes()] h = F.relu(layer(block, (h, h_dst))) # Copy to output back to CPU. y[output_nodes] = h.cpu() x = y return y
- 注意为了在模型选择的验证集上计算评估指标, 我们通常不必计算确切的离线推断; 原因是我们需要计算每个单层上每个单个节点的表示形式, 这通常是成本巨大的, 尤其是在具有大量未标记数据的半监督方案中; 邻域采样将很好地用于模型选择和验证;
- 线下推断的官方案例:
Chapter 7: 分布式训练
- DGL采用完全分布式的方法, 可将数据和计算同时分布在一组计算资源中;
- 在本节的上下文中, 我们将假设一个群集设置(即一组计算机);
- DGL将图划分为子图, 并且群集中的每台计算机负责一个子图(分区);
- DGL在群集中的所有计算机上运行相同的训练脚本以并行化计算, 并在同一计算机上运行服务器以将分区数据提供给训练人员;
- 对于训练脚本, DGL提供了类似于微型批次训练的分布式API; 这使得分布式训练仅需要对单个机器上的小批量训练进行少量代码修改即可; 下面显示了以分布式方式训练GraphSage的示例; 唯一的代码修改位于4-7行:
- ① 初始化DGL的分布式模块;
- ② 创建一个分布式图形对象;
- ③ 拆分训练集并计算本地过程的节点;
- 其余代码(包括采样器创建, 模型定义, 训练循环)与minibatch训练(即上一章的内容)相同;
- 代码示例:
import dgl import torch as th dgl.distributed.initialize('ip_config.txt', num_servers, num_workers) th.distributed.init_process_group(backend='gloo') g = dgl.distributed.DistGraph('graph_name', 'part_config.json') pb = g.get_partition_book() train_nid = dgl.distributed.node_split(g.ndata['train_mask'], pb, force_even=True) # Create sampler sampler = NeighborSampler(g, [10,25], dgl.distributed.sample_neighbors, device) dataloader = DistDataLoader( dataset=train_nid.numpy(), batch_size=batch_size, collate_fn=sampler.sample_blocks, shuffle=True, drop_last=False) # Define model and optimizer model = SAGE(in_feats, num_hidden, n_classes, num_layers, F.relu, dropout) model = th.nn.parallel.DistributedDataParallel(model) loss_fcn = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=args.lr) # training loop for epoch in range(args.num_epochs): for step, blocks in enumerate(dataloader): batch_inputs, batch_labels = load_subtensor(g, blocks[0].srcdata[dgl.NID], blocks[-1].dstdata[dgl.NID]) batch_pred = model(blocks, batch_inputs) loss = loss_fcn(batch_pred, batch_labels) optimizer.zero_grad() loss.backward() optimizer.step()
- 在计算机集群中运行训练脚本时, DGL提供了一些工具, 可将数据复制到集群的计算机上并在所有计算机上启动训练作业;
- 注意: 当前的分布式训练API仅支持Pytorch后端;
- 注意: 当前实现仅支持具有一种节点类型和一种边缘类型的图;
- DGL实现了一些分布式组件以支持分布式训练; 下图显示了组件及其相互作用:
- 具体来说, DGL的分布式训练具有三种类型的交互过程:
- ① 服务器:
- 服务器进程在存储图形分区(包括图形结构和节点/边缘功能)的每台计算机上运行;
- 这些服务器一起工作以将图形数据提供给训练人员;
- 注意一台机器可以同时运行多个服务器进程, 以并行化计算和网络通信;
- ② 采样器: 采样器进程与服务器以及采样节点和边缘进行交互, 以生成用于训练的minibatch;
- ③ 训练器: 模型训练人员包含多个与服务器交互的进程;
- 它使用
DistGraph
来访问分区图形数据, 并具有DistEmbedding
和DistTensor
来访问节点/边缘特征/嵌入; - 它具有
DistDataLoader
与采样器进行交互以获得minibatch;
- 它使用
本章内容可能对于大部分人来说都不太会用得到, 且主要是文字说明, 笔者主要做一些机翻, 如果有兴趣可以直接通过链接查看原文;
7.1 分布式训练的预处理
DGL官方文档 ;
- 概述:
-
DGL需要预处理图形数据以进行分布式训练, 包括两个步骤:
- ① 将图形划分为子图形;
- ② 为节点/边分配新的ID;
-
DGL提供了执行两个步骤的分区API; 该API支持随机分区和基于Metis的分区; Metis分区的好处在于, 它可以以最小的边沿切割生成分区, 从而减少了用于分布式训练和推理的网络通信;
-
DGL使用最新版本的Metis, 并具有针对具有幂律分布的真实图形进行优化的选项; 分区后, API以易于在训练期间加载的格式构造分区结果;
-
注意: 图形分区API当前在一台计算机上运行; 因此, 如果图形很大, 则用户将需要一台大型计算机来对图形进行分区; 将来, DGL将支持分布式图形分区;
-
默认情况下, 分区API将新ID分配给输入图中的节点和边, 以在分布式训练/推理期间帮助定位节点/边; 分配ID后, 分区API会相应地对所有节点数据和边缘数据进行混洗; 在培训期间, 用户只需使用新的节点/边缘ID; 但是, 仍然可以通过
g.ndata['orig_id']
和g.edata['orig_id']
访问原始ID, 其中g是DistGraph对象(请参见DistGraph部分); -
分区结果存储在输出目录中的多个文件中; 它始终包含一个名为
xxx.json
的JSON文件, 其中xxx
是提供给分区API的图形名称; JSON文件包含所有分区配置; 如果分区API没有为节点和边缘分配新的ID, 它将生成两个附加的Numpy文件:node_map.npy
和edge_map.npy
, 它们存储节点/边缘ID与分区ID之间的映射; 对于具有数十亿个节点和边的图, 两个文件中的Numpy数组很大, 因为它们在图中的每个节点和边都有一个条目; 在每个分区的文件夹内, 有三个文件以DGL格式存储分区数据;graph.dgl
存储分区的图结构以及节点和边缘上的一些元数据;node_feats.dgl
和edge_feats.dgl
存储属于该分区的节点和边的所有特征;data_root_dir/ |-- xxx.json # partition configuration file in JSON |-- node_map.npy # partition id of each node stored in a numpy array (optional) |-- edge_map.npy # partition id of each edge stored in a numpy array (optional) |-- part0/ # data for partition 0 |-- node_feats.dgl # node features stored in binary format |-- edge_feats.dgl # edge features stored in binary format |-- graph.dgl # graph structure of this partition stored in binary format |-- part1/ # data for partition 1 |-- node_feats.dgl |-- edge_feats.dgl |-- graph.dgl
- 负载均衡
- 在对图进行分区时, 默认情况下, Metis仅平衡每个分区中的节点数; 根据当前的任务, 这可能导致配置欠佳; 例如, 在半监督节点分类的情况下, 训练者对局部分区中标记节点的子集执行计算; 仅平衡图中节点(带标签和未带标签)的分区可能最终会导致计算负载不平衡; 为了在每个分区中获得平衡的工作负载, 分区API通过在
dgl.distributed.partition_graph()
中指定balance_ntypes
, 可以在每个节点类型中的节点数之间实现分区之间的平衡; 用户可以利用这一点, 并考虑训练集中, 验证集中和测试集中的节点属于不同的节点类型; - 以下示例认为训练集内和训练集外的节点是两种类型的节点:
dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test', balance_ntypes=g.ndata['train_mask'])
- 除了平衡节点类型之外,
dgl.distributed.partition_graph()
还允许通过指定balance_edges
在不同节点类型的节点的入度之间进行平衡; 这平衡了入射到不同类型节点的边的数量;- 注意: 传递给
dgl.distributed.partition_graph()
的图形名称是一个重要的参数;dgl.distributed.DistGraph
将使用图名称来标识分布式图; 合法图形名称应仅包含字母字符和下划线;
- 注意: 传递给
7.2 分布式接口函数
-
章节内容详见: DGL官方文档 ;
-
主要的接口函数索引在:
dgl.distributed
; -
代码示例:
-
初始化DGL分布式模块:
dgl.distributed.initialize('ip_config.txt', num_workers=4) th.distributed.init_process_group(backend='gloo')
-
分布式图:
dgl.distributed.DistGraph(graph_name, gpb=None, part_config=None)
; -
分布式模式 v.s. 独立(standalone)模式;
-
分布式图创建:
import dgl g = dgl.distributed.DistGraph('graph_name')
import dgl g = dgl.distributed.DistGraph('graph_name', part_config='data/graph_name.json')
-
获取图结构:
print(g.number_of_nodes())
-
获取节点和边的数据:
g.ndata['train_mask'] <dgl.distributed.dist_graph.DistTensor at 0x7fec820937b8> g.ndata['train_mask'][0] tensor([1], dtype=torch.uint8)
-
分布式张量:
tensor = dgl.distributed.DistTensor((g.number_of_nodes(), 10), th.float32, name='test')
g.ndata['feat'] = tensor
data = g.ndata['feat'][[1, 2, 3]] print(data) g.ndata['feat'][[3, 4, 5]] = data
-
分布式嵌入:
def initializer(shape, dtype): arr = th.zeros(shape, dtype=dtype) arr.uniform_(-1, 1) return arr emb = dgl.distributed.DistEmbedding(g.number_of_nodes(), 10, init_func=initializer)
sparse_optimizer = dgl.distributed.SparseAdagrad([emb], lr=lr1) optimizer = th.optim.Adam(model.parameters(), lr=lr2) feats = emb(nids) loss = model(feats) loss.backward() optimizer.step() sparse_optimizer.step()
-
分布式采样:
def sample_blocks(seeds): seeds = th.LongTensor(np.asarray(seeds)) blocks = [] for fanout in [10, 25]: frontier = dgl.distributed.sample_neighbors(g, seeds, fanout, replace=True) block = dgl.to_block(frontier, seeds) seeds = block.srcdata[dgl.NID] blocks.insert(0, block) return blocks dataloader = dgl.distributed.DistDataLoader(dataset=train_nid, batch_size=batch_size, collate_fn=sample_blocks, shuffle=True) for batch in dataloader: ...
sampler = dgl.sampling.MultiLayerNeighborSampler([10, 25]) dataloader = dgl.sampling.NodeDataLoader(g, train_nid, sampler, batch_size=batch_size, shuffle=True) for batch in dataloader: ...
-
负载分割:
train_nids = dgl.distributed.node_split(g.ndata['train_mask'])
7.3 分布式训练的一些工具
DGL官方文档 ;
- DGL提供了两个脚本来协助进行分布式训练:
tools/copy_files.py
用于将图分区复制到图;tools/launch.py
用于在机器集群中启动分布式训练工作;copy_files.py
将机器(在其中对图形进行分区的机器)中的分区数据和相关文件(例如, 训练脚本)复制到机器集群(在其中进行分布式训练); 该脚本将分区复制到机器上, 在该计算机上, 分布式训练作业将需要该分区; 该脚本包含四个参数:--part_config
指定分区配置文件, 该文件包含本地计算机中分区数据的信息;--ip_config
指定集群的IP配置文件;--workspace
指定训练机中存储与分布式训练有关的所有数据的目录;--rel_data_path
指定工作空间目录下将存储分区数据的相对路径;--script_folder
指定工作空间目录下存储用户的训练脚本的相对路径;- 注意:
copy_files.py
根据IP配置文件找到合适的机器来存储分区; 因此,copy_files.py
和launch.py
应该使用相同的IP配置文件;
- DGL提供了用于启动群集中的分布式训练作业的
tools/launch.py
; 该脚本进行以下假设:
- 分区数据和训练脚本已复制到群集或群集中所有计算机均可访问的全局存储(例如NFS);
- 主计算机(在其中执行启动脚本的计算机)具有对所有其他计算机的无密码ssh访问权限;
- 注意: 必须在集群中的一台计算机上调用启动脚本;
- 下面显示了在集群中启动分布式训练作业的示例:
python3 tools/launch.py \ --workspace ~graphsage/ \ --num_trainers 2 \ --num_samplers 4 \ --num_servers 1 \ --part_config data/ogb-product.json \ --ip_config ip_config.txt \ "python3 code/train_dist.py --graph-name ogb-product --ip_config ip_config.txt --num-epochs 5 --batch-size 1000 --lr 0.1 --num_workers 4"
- 配置文件
ip_config.txt
包含集群中计算机的IP地址;ip_config.txt
的典型示例如下:172.31.19.1 172.31.23.205 172.31.29.175 172.31.16.98
- 每行是计算机的IP地址; IP地址后面还可以有一个端口, 该端口指定训练人员之间的网络通信使用的端口; 如果未提供端口, 则默认值为30050;
- 启动脚本中指定的工作空间是计算机中的工作目录, 其中包含训练脚本, IP配置文件, 分区配置文件以及图形分区; 文件的所有路径都应指定为工作空间的相对路径;
- 启动脚本会在每台计算机上创建指定数量的训练作业(
--num_trainers
); - 另外, 用户需要为每个训练者指定采样器处理的数量(
--num_samplers
); 采样器进程的数量必须与initialize()
中指定的辅助进程的数量匹配;
杂记
在附录的链接页面上, 官方文档给出了大量的接口函数, 出于时间成本考虑笔者不再一一翻译记录, 笔者简单浏览了一遍后发现还是有不少有趣的方法的, 比如dgl.sampling
中提到了随机游走的采样方法, 因此本章将不定期更新笔者在实际使用中遇到的值得记录的接口函数用法, 详细的接口函数只能看源码以及附录中的各个模块的链接了;
1 dgl.DGLGraph.add_edges
DGLGraph.add_edges(u, v, data=None, etype=None)
:
- 参数:
u(int, tensor, numpy.ndarray, list)
: 源节点编号,u[i]
为第i
条边的源节点;v(int, tensor, numpy.ndarray, list)
: 目标节点编号,v[i]
为第i
条边的目标节点;data(dict[str, tensor])
: 键为特征名称(常见的如'h'
或'w'
), 值为特征值, 值的第i
行对应第i
条边的特征值.etype(str or tuple of str)
: 每条边的类型, 如果为同构图(只有一种边类型)则可以省略;
- 示例:
g.add_edges(torch.tensor([0, 0]), torch.tensor([2, 2]), {'h': torch.tensor([[1.], [2.]]), 'w': torch.ones(2, 1)})
2 关于dgl创建图的本质
- 从User Guide中可以发现, 同构图本质就是一种特殊的异构图, 所以使用
dgl.DGLGraph
或dgl.graph
创建的图都是dgl.heterograph.DGLHeteroGraph
类型的;
src
与dst
参数中最大的节点ID决定了总节点数, 而非是根据src
和dst
- 添加边时或创建图时导入的边是可以重复的, 并且会被重复计数到
num_edges
中; - 以一个例子来说明, 如果是用
src = [1, 2, 3, 66, 1]
和dst = [2, 3, 66, 1, 2]
创建图则会由67个节点与5条边;
专题一 消息传递函数
以官方文档中
dgl.function
一节为参考;
- 概述:
- DGL中消息传递通过两种接口表达:
send(edges, message_func)
: 根据给定的边计算消息;recv(nodes, reduce_func)
: 收集输入的消息, 进行消息聚合等其他操作;
- 以上两种阶段的接口可以覆盖所有在消息传递框架下定义出的模型, 但是这种方式是低效的, 因为它需要存储显式消息(explicit message), DGL Blog Post 中给出了一些性能评估的详细情况;
- 解决方案也在上述链接中给出了说明, 即将两阶段融合进同一个kernel中, 于是就无需生成并存储显式消息, 因此DGL推荐使用内置的消息传递函数, 它们已经进行了这种融合优化;
- 代码示例:
import dgl import dgl.function as fn import torch as th import numpy as np # 1 create a DGLGraph src = np.random.randint(0, 100, 500) dst = np.random.randint(0, 100, 500) g = dgl.graph((np.concatenate([src, dst]), np.concatenate([dst, src]))) # 2 set feature for nodes and edges g.ndata['h'] = th.randn((g.number_of_nodes(), 10)) # each node has feature size 10 g.edata['w'] = th.randn((g.number_of_edges(), 1)) # each edge has feature size 1 # 3 collect features from source nodes and aggregate them in destination nodes g.update_all(fn.copy_u('h', 'm'), fn.sum('m', 'h_sum')) print(g.ndata['m']) # error print(g.edata['m']) # error # 4 multiply source node features with edge weights and aggregate them in destination nodes g.update_all(fn.u_mul_e('h', 'w', 'm'), fn.max('m', 'h_max')) # 5 compute edge embedding by multiplying source and destination node embeddings g.apply_edges(fn.u_mul_v('h', 'h', 'w_new'))
- ① 首先随机生成一个带有至多101个节点(因为随机值
randn
未必会取得到100), 500条边的随图, 将图设置为无向图(取逆向图再合并); - ② 接着为边和节点分别设置随即特征值;
- ③ 重点就是第三步:
update_all
函数可以参考本文 Chapter 2中的具体说明, 它接收一个消息生成函数, 消息聚合函数和更新函数(optional), 这里先复制了一份节点h
特征然后再将所有节点的h_sum
特征值更新为其近邻节点的m
特征值之和;'m'
显然是一个中间值, 可以发现update_all
函数执行结束后根本没有名为m
的特征值, 这就是上面所说的融合后而无需存储显示消息;- 为了看出具体的传播逻辑可以创建一个小一些的简单图, 特征值用常数, 这样输出更新后的特征值会更加清晰;
- ④ 与第③步大同小异, 仍是节点特征的更新, 不再赘述;
- ⑤ 最后通过将源节点和目标节点的
h
特征值相乘达到更新边特征的目的;’
- ① 首先随机生成一个带有至多101个节点(因为随机值
- 自定义消息传递函数的写法参考(不推荐自定义, 尽量使用内置的进行拼积木);
fn.u_mul_e('h', 'w', 'm')
等价于:
def udf_u_mul_e(edges): return {'m' : edges.src['h'] * edges.data['w']}
fn.max('m', 'h_max')
等价于:
def udf_max(nodes): return {'h_max' : th.max(nodes.mailbox['m'], 1)[0]}
- DGL内置的消息传递函数一览:
- 一元消息传递函数: 都是复制函数;
copy_u
,copy_src
: 两个函数功能用法完全相同, 参数为(src, out)
;copy_e
,copy_edge
: 两个函数功能用法完全相同, 参数为(edge, out)
;
- 二元函数: 浅显易懂, 参数为
(lhs_field, rhs_field, out)
;u_add_v
,u_sub_v
,u_mul_v
,u_div_v
,u_dot_v
;u_add_e
,u_sub_e
,u_mul_e
,u_div_e
,u_dot_e
;v_add_u
,v_sub_u
,v_mul_u
,v_div_u
,v_dot_u
;v_add_e
,v_sub_e
,v_mul_e
,v_div_e
,v_dot_e
;e_add_u
,e_sub_u
,e_mul_u
,e_div_u
,e_dot_u
;e_add_v
,e_sub_v
,e_mul_v
,e_div_v
,e_dot_v
;src_mul_edge
是u_mul_e
的另一种写法, 参数为(src, edge, out)
;
- 聚合函数: 参数为
(msg, out)
;max
,min
;sum
,mean
;
小节附录: 消息函数杂记
graph.apply_edge
函数:
graph.apply_edges(dgl.function.u_add_v('e', 'e', 'e'))
是将每条边的名为'e'
的特征值(不一定要存在, 即边可以没有名为'e'
的特征值)更新为该边源节点的'e'
特征值加上目标节点的'e'
特征值;graph.apply_edges(dgl.function.u_add_v('el', 'er', 'e'))
是将每条边的名为'e'
的特征值(不一定要存在, 即边可以没有名为'e'
的特征值)更新为该边源节点的'el'
特征值加上目标节点的'er'
特征值;- 后者还是有些令人费解的, 前者的更新方式似乎更符合常理;
附录: 接口索引
dgl
: https://docs.dgl.ai/api/python/dgl.html ;dgl.data
: https://docs.dgl.ai/api/python/dgl.data.html ;dgl.dataloading
: https://docs.dgl.ai/api/python/dgl.dataloading.html ;dgl.DGLGraph
: https://docs.dgl.ai/api/python/dgl.DGLGraph.html ;dgl.distributed
: https://docs.dgl.ai/api/python/dgl.distributed.html ;dgl.function
: https://docs.dgl.ai/api/python/dgl.function.html ;dgl.nn
: https://docs.dgl.ai/api/python/nn.html ;dgl.ops
: https://docs.dgl.ai/api/python/dgl.ops.html ;dgl.sampling
: https://docs.dgl.ai/api/python/dgl.sampling.html ;- User-defined Functions: https://docs.dgl.ai/api/python/udf.html ;
更多推荐
所有评论(0)