图神经网络——节点分类与边预测
1.InMemoryDataset基类
在PyG中,可以通过继承InMemoryDataset类来自定义一个数据可全部存储到内存的数据集类。(继承Dataset是分次加载到内存,继承InMemoryDataset是一次性加载所有数据到内存)
1 |
class InMemoryDataset(root: Optional[str] = None, transform: Optional[Callable] = None, pre_transform: Optional[Callable] = None, pre_filter: Optional[Callable] = None) |
参数说明:
transform:数据转换函数,用于转换Data对象,每一次数据获取过程中都会被执行。
pre_transform:数据转换函数,用于转换Data对象,在Data对象被保存到文件前调用。
pre_filter:检查数据是否要保留的函数,接收一个Data对象,返回此Data对象是否应该被包含在最终的数据集中,在Data对象被保存到文件前调用。
2.Sequential容器
nn.Sequential是nn.module的容器,用于按顺序包装一组网络层。参数说明:
args(str):模型的全局输入参数;
modules ([(str, Callable) or Callable]) :模块列表。
3. 节点分类
定义一个GAT图神经网络,通过hidden_channels_list参数来设置每一层GATConv的outchannel,所以hidden_channels_list长度即为GATConv的层数。
1 2 3 4 5 |
#载入数据集 import torch from torch_geometric.datasets import Planetoid from torch_geometric.transforms import NormalizeFeatures dataset = Planetoid(root='dataset', ame='Cora',transform=NormalizeFeatures()) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
import torch from torch.nn import Linear,ReLU import torch.nn.functional as F from torch_geometric.nn import GATConv, Sequential #使用Sequential容器定义一个GAT网络 from torch_geometric.nn import GATConv class GAT(torch.nn.Module): def __init__(self, num_features, hidden_channels_list, num_classes): super(GAT, self).__init__() torch.manual_seed(2021) hns = [num_features] + hidden_channels_list conv_list = [] for idx in range(len(hidden_channels_list)): conv_list.append((GATConv(hns[idx], hns[idx+1]), 'x, edge_index -> x')) conv_list.append(ReLU(inplace=True),) self.convseq = Sequential('x, edge_index', conv_list) self.linear = Linear(hidden_channels_list[-1], num_classes) def forward(self, x, edge_index): x = self.convseq(x, edge_index) x = F.dropout(x, p=0.5, training=self.training) x = self.linear(x) return x |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
#训练和测试 device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") model = GAT(num_features=dataset.num_features, hidden_channels_list=[200, 100], num_classes=dataset.num_classes).to(device) print(model) optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4) criterion = torch.nn.CrossEntropyLoss() #train()、test()省略,与上章一致 for epoch in range(1, 201): loss = train() if epoch % 50 == 0: print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}') test_acc = test() print(f'Test Accuracy: {test_acc:.4f}') |
构建2层GAT,Accuracy为0.7640;构建2层GCN,Accuracy为0.6490。
构造3层GAT(将hidden_channels_list的值改为[200,100,50]),Accuracy为0.7680;构建3层GCN,Accuracy为0.5190。
4 边预测
边预测任务的目标是预测两个节点间是否有边。做边预测任务首先需要获取正负样本数量平衡的数据集(edge_index存储的是正样本,需要采样一些不存在边的节点对作为负样本边),PyG中可以通过train_test_split_edges(data, val_ratio=0.05, test_ratio=0.1)采样负样本边。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
#构造边预测神经网络 import os.path as osp from torch_geometric.utils import negative_sampling from torch_geometric.datasets import Planetoid import torch_geometric.transforms as T from torch_geometric.utils import train_test_split_edges import torch from torch_geometric.nn import GCNConv class Net(torch.nn.Module): def __init__(self, in_channels, out_channels): super(Net, self).__init__() self.conv1 = GCNConv(in_channels, 128) self.conv2 = GCNConv(128, out_channels) def encode(self, x, edge_index): x = self.conv1(x, edge_index) x = x.relu() return self.conv2(x, edge_index) def decode(self, z, pos_edge_index, neg_edge_index): edge_index = torch.cat([pos_edge_index, neg_edge_index], dim=-1) return (z[edge_index[0]] * z[edge_index[1]]).sum(dim=-1) def decode_all(self, z): #对所有的节点对预测存在边的几率 prob_adj = z @ z.t() # @ 表示矩阵乘法 return (prob_adj > 0).nonzero(as_tuple=False).t() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# 定义单个epoch的训练过程 def get_link_labels(pos_edge_index, neg_edge_index): num_links = pos_edge_index.size(1) + neg_edge_index.size(1) link_labels = torch.zeros(num_links, dtype=torch.float) link_labels[:pos_edge_index.size(1)] = 1 return link_labels def train(data, model, optimizer): model.train() neg_edge_index = negative_sampling( edge_index = data.train_pos_edge_index, num_nodes = data.num_nodes, num_neg_samples = data.train_pos_edge_index.size(1) ) train_neg_edge_set = set(map(tuple, neg_edge_index.T.tolist())) val_pos_edge_set = set(map(tuple, data.val_pos_edge_index.T.tolist())) test_pos_edge_set = set(map(tuple, data.test_pos_edge_index.T.tolist())) if (len(train_neg_edge_set & val_pos_edge_set) > 0) or (len(train_neg_edge_set & test_pos_edge_set) > 0): print('wrong!') optimizer.zero_grad() z = model.encode(data.x, data.train_pos_edge_index) link_logits = model.decode(z, data.train_pos_edge_index, neg_edge_index) link_labels = get_link_labels(data.train_pos_edge_index, neg_edge_index).to(data.x.device) loss = F.binary_cross_entropy_with_logits(link_logits, link_labels) loss.backward() optimizer.step() return loss |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
#定义单个epoch验证与测试过程 @torch.no_grad() def test(data, model): model.eval() z = model.encode(data.x, data.train_pos_edge_index) results = [] for prefix in ['val', 'test']: pos_edge_index = data[f'{prefix}_pos_edge_index'] neg_edge_index = data[f'{prefix}_neg_edge_index'] link_logits = model.decode(z, pos_edge_index, neg_edge_index) link_probs = link_logits.sigmoid() link_labels = get_link_labels(pos_edge_index, neg_edge_index) results.append(roc_auc_score(link_labels.cpu(), link_probs.cpu())) return results |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
#完整的训练、验证与测试 from sklearn.metrics import roc_auc_score def main(): device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') dataset = Planetoid('/Dataset/Planetoid/Cora', 'Cora', transform=T.NormalizeFeatures()) data = dataset[0] ground_truth_edge_index = data.edge_index.to(device) data.train_mask = data.val_mask = data.test_mask = data.y = None data = train_test_split_edges(data) data = data.to(device) model = Net(dataset.num_features, 64).to(device) optimizer = torch.optim.Adam(params=model.parameters(), lr=0.01) best_val_auc = test_auc = 0 for epoch in range(1, 101): loss = train(data, model, optimizer) val_auc, tmp_test_auc = test(data, model) if val_auc > best_val_auc: best_val_auc = val_auc test_auc = tmp_test_auc print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}, Val: {val_auc:.4f}, ' f'Test: {test_auc:.4f}') z = model.encode(data.x, data.train_pos_edge_index) final_edge_index = model.decode_all(z) if __name__ == "__main__": main() |
结果:Epoch: 100, Loss: 0.4414, Val: 0.9330, Test: 0.8943
将Sequential容器用于边预测,需要在Net类定义中将__init_函数和main()做部分修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
#替换上面的对应代码 class Net(torch.nn.Module): def __init__(self, in_channels, hidden_channels_list, out_channels): super(Net, self).__init__() torch.manual_seed(2021) hns = [in_channels] + hidden_channels_list conv_list = [] for idx in range(len(hidden_channels_list)-1): conv_list.append((GCNConv(hns[idx], hns[idx+1]), 'x, edge_index -> x')) conv_list.append(ReLU(inplace=True), ) conv_list.append((GCNConv(hns[-2], hns[-1]), 'x, edge_index -> x')) self.convseq = Sequential('x, edge_index', conv_list) def main(): model = Net(dataset.num_features,[200,100],dataset.num_classes).to(device) |
在边预测任务用Sequential容器的结果:
Epoch: 100, Loss: 0.4226, Val: 0.9123, Test: 0.8958