-
Notifications
You must be signed in to change notification settings - Fork 558
Euler 2.0 在大规模图上的应用
本章的章节安排如下:
在本章,我们将介绍euler2.0是如何在大规模图上应用。
在现实应用,尤其是工业级应用中,所面临的图是十分巨大的,比如在淘宝的应用场景中,往往有数亿个节点和数十亿条边。在这样一个规模的图下,单机的训练图神经网络是不可行的。如何处理大规模图数据是图神经网络能够真正落地的关键之一。
Euler-2.0支持图分割和高效稳定的分布式训练,可以轻松支撑如此规模的图数据。
本章将价绍如何使用Euler 2.0来处理图数据,进行分布式训练。
Note:
与其他场景应用的联系和区别的对比见这里。
大规模图数据生成在步骤与一般图生成类似,也是分为两步:
一般而言,在大规模图下,由于图数据量很大,需要对对应的JSON文件进行切分,形成多个JSON文件,切分的原则是:边和其源节点(src节点)在同一个JSON文件中。假设JSON分片数目为partition_num=p,则需生成p个JSON文件,形如:
graph.json_0
graph.json_1
...
graph.json_(p-1)
对于meta文件而言,所有的JSON文件共用一个meta文件。
为了生成相应的图数据二进制文件,Euler2.0提供了相应的脚本工具在项目根目录下的euler/tools/gen_partitioned_data.sh 可以将上述的多个JSON文件转化为Euler加载所需要的二进制文件。
假设JSON分片数目为partition_num=p,euler server分片数目为shard_num=s,那么依次执行
sh gen_partitioned_data.sh graph.json_0 index_meta output_dir s 0
sh gen_partitioned_data.sh graph.json_1 index_meta output_dir s 1
...
sh gen_partitioned_data.sh graph.json_(p-1) index_meta output_dir s (p-1)
其中:
- graph.json_[index]:为各个切分过的euler json文件名
- index_meta:全局公用的meta索引文件名,每次执行时包保持索引文件相同
- output_dir:Euler二进制文件输出目录,每次执行后,转换工具会把转换的结果分别保存在同一个目录中,所以每次执行的时候要保持这个参数相同。
- shard_num(s):euler 二进制转换完成后的分片个数,每次执行的时候保持一致。
- partition_num(p):euler JSON文件的分片个数,每次执行的时候要根据实际的分片修改这个参数。
对于大规模分布式图数据加载而言一般需要以下三步:
- 在不同机器上,运行euler启动python脚本
- 初始化euler引擎
- 调用Euler op对节点或者边进行采样获邻居采样
假设数据被分割成了N个shards,那么在N台机器上运行下面的Euler启动python脚本。假设机器 0N-1 分别对应 0N-1个shard,那么第k个shard运行脚本为
euler.start(
directory='euler_graph_data_dir', # 图数据路径
shard_idx=k, # 当前启动的进程为k号shard
shard_num=N, # 一共有N个shard
zk_addr=zk_addr, # Zookeeper address, ip:port
zk_path=zk_path, # Zookeeper path
module=euler.Module.DEFAULT_MODULE)
while True:
time.sleep(1)
tf_euler.initialize_graph({
'mode': 'remote',
'zk_server': zk_addr, # Zookeeper address, ip:port
'zk_path': zk_path, # Zookeeper path
'shard_num': shard_num, # 一共有N个shard
'num_retries': 1
})
启动并初始化euler引擎之后,后台图数据的分布式调度会自动启用,用户就可以和之前单机的方式一样去使用Euler相关的op,比如生成batch样本数据的时候,仍然可以使用sample_node()来实现。
tf_euler.sample_node(inputs, params['train_node_type'])
在分布式的场景下,模型的实现部分仍然和单机的模式一样,不需要有任何的改动,具体实现可以参考进阶教程下的各个应用中的图模型的实现,在这里就不再展开介绍。
比如:
start_euler.py
euler.start(
directory='euler_graph_data_dir', # 图数据路径
shard_idx=k, # 当前启动的进程为k号shard
shard_num=N, # 一共有N个shard
zk_addr=zk_addr, # Zookeeper address, ip:port
zk_path=zk_path, # Zookeeper path
module=euler.Module.DEFAULT_MODULE)
while True:
time.sleep(1)
#初始化euler引擎
if not task_type == 'ps':
tf_euler.initialize_graph({
'mode': 'remote',
'zk_server': zk_addr,
'zk_path': zk_path,
'shard_num': shard_num,
'num_retries': 1
})
以上两步之后便可使用euler进行图数据操作
# 分布式参数配置
# 参考https://www.tensorflow.org/api_docs/python/tf/estimator/RunConfig进行配置
'''
cluster = {'chief': ['host0:2222'],
'ps': ['host1:2222', 'host2:2222'],
'worker': ['host3:2222', 'host4:2222']}
task_type = 'worker'
task_id = 0
os.environ['TF_CONFIG'] = json.dumps(
{'cluster': cluster,
'task': {'type': task_type, 'index': task_id}})
})
'''
#初始化euler引擎
if not task_type == 'ps':
tf_euler.initialize_graph({
'mode': 'remote',
'zk_server': zk_addr,
'zk_path': zk_path,
'shard_num': shard_num,
'num_retries': 1
})
tf.logging.set_verbosity(tf.logging.INFO)
# 算法模型的构建,可以使用example中的算法模型,自行配置各个参数
model_cls = your_model(model_params)
# 训练参数配置,提供必要的训练参数
params = estimator_training_params
# Estimator的创建与训练/验证/预测
config = tf.estimator.RunConfig(log_step_count_steps=None)
base_estimator = NodeEstimator(model_cls, params, config)
base_estimator.train_and_evaluate()