图神经网络——节点分类与边预测

1.InMemoryDataset基类

在PyG中,可以通过继承InMemoryDataset类来自定义一个数据可全部存储到内存的数据集类。(继承Dataset是分次加载到内存,继承InMemoryDataset是一次性加载所有数据到内存)

1
class InMemoryDataset(root: Optional[str] = None, transform: Optional[Callable] = None, pre_transform: Optional[Callable] = None, pre_filter: Optional[Callable] = None)

参数说明:

transform:数据转换函数,用于转换Data对象,每一次数据获取过程中都会被执行。
pre_transform:数据转换函数,用于转换Data对象,在Data对象被保存到文件前调用。
pre_filter:检查数据是否要保留的函数,接收一个Data对象,返回此Data对象是否应该被包含在最终的数据集中,在Data对象被保存到文件前调用。

2.Sequential容器

nn.Sequential是nn.module的容器,用于按顺序包装一组网络层。参数说明:

args(str):模型的全局输入参数;
modules ([(str, Callable) or Callable]) :模块列表。

3. 节点分类

定义一个GAT图神经网络,通过hidden_channels_list参数来设置每一层GATConv的outchannel,所以hidden_channels_list长度即为GATConv的层数。

1
2
3
4
5
#载入数据集
import torch
from torch_geometric.datasets import Planetoid
from torch_geometric.transforms import NormalizeFeatures
dataset = Planetoid(root='dataset', ame='Cora',transform=NormalizeFeatures())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import torch
from torch.nn import Linear,ReLU
import torch.nn.functional as F
from torch_geometric.nn import GATConv, Sequential

#使用Sequential容器定义一个GAT网络
from torch_geometric.nn import GATConv
class GAT(torch.nn.Module):
def __init__(self, num_features, hidden_channels_list, num_classes):
super(GAT, self).__init__()
torch.manual_seed(2021)
hns = [num_features] + hidden_channels_list
conv_list = []
for idx in range(len(hidden_channels_list)):
conv_list.append((GATConv(hns[idx], hns[idx+1]), 'x, edge_index -> x'))
conv_list.append(ReLU(inplace=True),)

self.convseq = Sequential('x, edge_index', conv_list)
self.linear = Linear(hidden_channels_list[-1], num_classes)

def forward(self, x, edge_index):
x = self.convseq(x, edge_index)
x = F.dropout(x, p=0.5, training=self.training)
x = self.linear(x)
return x
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#训练和测试
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = GAT(num_features=dataset.num_features, hidden_channels_list=[200, 100], num_classes=dataset.num_classes).to(device)
print(model)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
criterion = torch.nn.CrossEntropyLoss()
#train()、test()省略,与上章一致
for epoch in range(1, 201):
loss = train()
if epoch % 50 == 0:
print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}')

test_acc = test()
print(f'Test Accuracy: {test_acc:.4f}')

构建2层GAT,Accuracy为0.7640;构建2层GCN,Accuracy为0.6490。
构造3层GAT(将hidden_channels_list的值改为[200,100,50]),Accuracy为0.7680;构建3层GCN,Accuracy为0.5190。

4 边预测

边预测任务的目标是预测两个节点间是否有边。做边预测任务首先需要获取正负样本数量平衡的数据集(edge_index存储的是正样本,需要采样一些不存在边的节点对作为负样本边),PyG中可以通过train_test_split_edges(data, val_ratio=0.05, test_ratio=0.1)采样负样本边。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#构造边预测神经网络
import os.path as osp
from torch_geometric.utils import negative_sampling
from torch_geometric.datasets import Planetoid
import torch_geometric.transforms as T
from torch_geometric.utils import train_test_split_edges

import torch
from torch_geometric.nn import GCNConv

class Net(torch.nn.Module):

def __init__(self, in_channels, out_channels):
super(Net, self).__init__()
self.conv1 = GCNConv(in_channels, 128)
self.conv2 = GCNConv(128, out_channels)

def encode(self, x, edge_index):
x = self.conv1(x, edge_index)
x = x.relu()
return self.conv2(x, edge_index)

def decode(self, z, pos_edge_index, neg_edge_index):
edge_index = torch.cat([pos_edge_index, neg_edge_index], dim=-1)
return (z[edge_index[0]] * z[edge_index[1]]).sum(dim=-1)

def decode_all(self, z):
#对所有的节点对预测存在边的几率
prob_adj = z @ z.t() # @ 表示矩阵乘法
return (prob_adj > 0).nonzero(as_tuple=False).t()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 定义单个epoch的训练过程
def get_link_labels(pos_edge_index, neg_edge_index):
num_links = pos_edge_index.size(1) + neg_edge_index.size(1)
link_labels = torch.zeros(num_links, dtype=torch.float)
link_labels[:pos_edge_index.size(1)] = 1
return link_labels

def train(data, model, optimizer):
model.train()

neg_edge_index = negative_sampling(
edge_index = data.train_pos_edge_index,
num_nodes = data.num_nodes,
num_neg_samples = data.train_pos_edge_index.size(1)
)

train_neg_edge_set = set(map(tuple, neg_edge_index.T.tolist()))
val_pos_edge_set = set(map(tuple, data.val_pos_edge_index.T.tolist()))
test_pos_edge_set = set(map(tuple, data.test_pos_edge_index.T.tolist()))
if (len(train_neg_edge_set & val_pos_edge_set) > 0) or (len(train_neg_edge_set & test_pos_edge_set) > 0):
print('wrong!')

optimizer.zero_grad()
z = model.encode(data.x, data.train_pos_edge_index)
link_logits = model.decode(z, data.train_pos_edge_index, neg_edge_index)
link_labels = get_link_labels(data.train_pos_edge_index, neg_edge_index).to(data.x.device)
loss = F.binary_cross_entropy_with_logits(link_logits, link_labels)
loss.backward()
optimizer.step()

return loss
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#定义单个epoch验证与测试过程
@torch.no_grad()
def test(data, model):
model.eval()

z = model.encode(data.x, data.train_pos_edge_index)

results = []
for prefix in ['val', 'test']:
pos_edge_index = data[f'{prefix}_pos_edge_index']
neg_edge_index = data[f'{prefix}_neg_edge_index']
link_logits = model.decode(z, pos_edge_index, neg_edge_index)
link_probs = link_logits.sigmoid()
link_labels = get_link_labels(pos_edge_index, neg_edge_index)
results.append(roc_auc_score(link_labels.cpu(), link_probs.cpu()))
return results
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#完整的训练、验证与测试
from sklearn.metrics import roc_auc_score
def main():
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
dataset = Planetoid('/Dataset/Planetoid/Cora', 'Cora', transform=T.NormalizeFeatures())
data = dataset[0]
ground_truth_edge_index = data.edge_index.to(device)
data.train_mask = data.val_mask = data.test_mask = data.y = None
data = train_test_split_edges(data)
data = data.to(device)

model = Net(dataset.num_features, 64).to(device)
optimizer = torch.optim.Adam(params=model.parameters(), lr=0.01)

best_val_auc = test_auc = 0
for epoch in range(1, 101):
loss = train(data, model, optimizer)
val_auc, tmp_test_auc = test(data, model)
if val_auc > best_val_auc:
best_val_auc = val_auc
test_auc = tmp_test_auc
print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}, Val: {val_auc:.4f}, '
f'Test: {test_auc:.4f}')

z = model.encode(data.x, data.train_pos_edge_index)
final_edge_index = model.decode_all(z)


if __name__ == "__main__":
main()

结果:Epoch: 100, Loss: 0.4414, Val: 0.9330, Test: 0.8943

将Sequential容器用于边预测,需要在Net类定义中将__init_函数和main()做部分修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#替换上面的对应代码
class Net(torch.nn.Module):
def __init__(self, in_channels, hidden_channels_list, out_channels):
super(Net, self).__init__()
torch.manual_seed(2021)
hns = [in_channels] + hidden_channels_list
conv_list = []

for idx in range(len(hidden_channels_list)-1):
conv_list.append((GCNConv(hns[idx], hns[idx+1]), 'x, edge_index -> x'))
conv_list.append(ReLU(inplace=True), )
conv_list.append((GCNConv(hns[-2], hns[-1]), 'x, edge_index -> x'))

self.convseq = Sequential('x, edge_index', conv_list)

def main():
model = Net(dataset.num_features,[200,100],dataset.num_classes).to(device)

在边预测任务用Sequential容器的结果:
Epoch: 100, Loss: 0.4226, Val: 0.9123, Test: 0.8958

参考资料

1.datawhale-GNN开源学习资料
2.GNN官方文档
3.Sequential官网文档

欢迎关注我们的公众号
0%