由于目前图神经网络框架DGL的分布式模块仍在建设中,文档非常不齐全,故本文主要记录分布式训练GNN的整个流程。
目前分布式代码都被放在dgl.distributed文件夹中,核心部分是分布式的KVStore,基本上所有的上层封装的模块(包括DistGraph
、DistTensor
、DistEmbedding
等)都会走KVServer和KVClient的接口。
注意目前DGL对于大规模分布式部署实际上是做得没有Euler好的,几个缺点包括:
因此如果要实现特定的深度学习分布式架构(比如parameter server),用原生的DGL是比较难实现的,需要做一定的适配和改动才有办法达成需求。
在所有分布式训练启动前都应先配置好各台机器的IP地址,通过initialize实现。
def initialize(ip_config, num_servers=1, num_workers=0)
其中ip_config
是server的地址,num_server
是每台机器上的server进程数,num_worker
是每台机器上sampler的进程数。
多个server在同一台机器上,除了主server,其他为backup server。send_requests_to_machine
这一行通过随机选择server machine来发送对应RPC请求,从而实现负载均衡。
进来先会判断DGL_ROLE
,包括server
、sampler
、default
三种角色,可以用os.environ['DGL_ROLE'] = 'server'
进行配置。
server
:需要配置相关环境变量,然后会创建一个DistGraphServer
(从KVServer
继承而来),然后start
服务
load_partition_book
而不会加载整个graph,而master server则会通过load_partition
把整个graph读进来,然后再读partition bookworker
:会跳转到下半部分的逻辑,通过connect_to_server
连接到上述的server
,接着用init_kvstore
创建KVClient
,后续通过get_kvstore
可获取该worker。下面以1 server + 1 sampler + 1 worker来说明怎么初始化DGL的相关数据结构。这里server和sampler放在同一台机器上,专门用来存储graph structure & features;worker则放在另一台机器,专门用来做计算。
# kv_ip_config.txt
127.0.0.1 9000 # server's ip address & port
# kv_ip_config_multi.txt
# 在文件中只需要写主server的IP地址和端口即可
# bakcup server的端口从主端口开始会依次增加1
# rpc.read_ip_config(ip_config, num_servers)
127.0.0.1 9000
127.0.0.2 9001
# 如上述的配置
# 通过rpc.read_ip_config(ip_config, num_servers)可读出server namebook
# key: [machine_id, ip, port, num_servers]
# {0: [0, "127.0.0.1", 9000, 2],
# 1: [0, "127.0.0.1", 9001, 2],
# 2: [1, "127.0.0.2", 9001, 2],
# 3: [1, "127.0.0.2", 9002, 2]}
# server
os.environ['DGL_DIST_MODE'] = 'distributed'
os.environ['DGL_ROLE'] = 'server'
os.environ['DGL_SERVER_ID'] = "0"
os.environ['DGL_IP_CONFIG'] = 'kv_ip_config.txt'
os.environ['DGL_NUM_SERVER'] = "1"
os.environ['DGL_NUM_CLIENT'] = "2" # worker + sampler
os.environ['DGL_CONF_PATH'] = 'part_graph.json'
dist.initialize(ip_config='kv_ip_config.txt',
num_servers=1,
num_workers=0) # 如果设置了环境变量,则传入的这几个参数没有作用
# sampler/client
os.environ['DGL_DIST_MODE'] = 'distributed'
os.environ['DGL_ROLE'] = 'sampler'
dist.initialize(ip_config='kv_ip_config.txt',
num_servers=self.num_servers,
num_workers=self.num_clients)
# worker
os.environ['DGL_DIST_MODE'] = 'distributed'
os.environ['DGL_ROLE'] = 'default'
# Note: connect to server first!
dist.initialize(ip_config='kv_ip_config.txt',
num_servers=self.num_servers,
num_workers=0)
先用Metis做图划分,然后读入分布式图。
import dgl.distributed as dist
dist.partition_graph(G, name, num_parts=1, out_path='dataset/'+name+"_tmp")
g = dist.DistGraph(name, part_config="dataset/{0}_tmp/{0}.json".format(name))
如果要实现多机,并且server和client分开,则需要分别起。
# server
DistGraphServer(server_id, ip_config, num_servers,
num_clients, part_config, disable_shared_mem=False)
# sampler/client
DistGraph(graph_name, gpb=None, part_config=None)
如果disable_shared_mem
没有禁掉,则server和client会在同一台机器上共享一个partition。
KVClient里的init_data
会调用empty_shared_mem,进而强制server和client得在同台机器上通过shared mem通信,这一点是DGL设计的一个非常大的弊端。
shared_data = empty_shared_mem(name+'-kvdata-', True, data_tensor.shape, data_type)
注意server端如果调用了dist.initialize
,则不需要自己再起图;而在sampler/client这边则需要通过dist.initialize
和DistGraph
两个接口与graph store建立联系。
DistEmbedding底层实现是DistTensor。
DistTensor
会获取KVClient
,然后通过init_data
初始化数据。
DistTensor(shape, dtype, name=None, init_func=None, part_policy=None, persistent=False)
DistEmbedding
目前只支持SparseAdaGrad一个优化类。
下面给出每个模块应该在哪一部分声明:
DistGraphServer
DistGraph
、DistEmbedding
、DistDataloader
KVClient
来pull embedding,采样结果则需要另外走socket传送由于DGL原生实现的KVStore一定会先从shared memory里找数据,这对server和client分离的情况并不友好,因此下面对KVClient的pull和push的模块做了一定修改,以强制其从远端server获取数据。(原始代码可在dist.kvstore里找到)
# 这里需要人为传入node_map
def pull(kvclient, name, id_tensor, node_map):
assert len(name) > 0, 'name cannot be empty.'
id_tensor = utils.toindex(id_tensor)
id_tensor = id_tensor.tousertensor()
assert F.ndim(id_tensor) == 1, 'ID must be a vector.'
# partition data
machine_id = nid2partid(node_map,id_tensor)
# sort index by machine id
sorted_id = F.tensor(np.argsort(F.asnumpy(machine_id)))
back_sorted_id = F.tensor(np.argsort(F.asnumpy(sorted_id)))
id_tensor = id_tensor[sorted_id]
machine, count = np.unique(F.asnumpy(machine_id), return_counts=True)
# pull data from server by order
start = 0
pull_count = 0
local_id = None
for idx, machine_idx in enumerate(machine):
end = start + count[idx]
if start == end: # No data for target machine
continue
partial_id = id_tensor[start:end]
request = PullRequest(name, partial_id)
rpc.send_request_to_machine(machine_idx, request)
pull_count += 1
start += count[idx]
# recv response
response_list = []
# wait response from remote server nodes
for i in range(pull_count):
remote_response = rpc.recv_response()
response_list.append(remote_response)
# sort response by server_id and concat tensor
response_list.sort(key=kvclient._take_id)
data_tensor = F.cat(seq=[response.data_tensor for response in response_list], dim=0)
return data_tensor[back_sorted_id] # return data with original index order
def push(kvclient, name, id_tensor, data_tensor, node_map):
assert len(name) > 0, 'name cannot be empty.'
id_tensor = utils.toindex(id_tensor)
id_tensor = id_tensor.tousertensor()
assert F.ndim(id_tensor) == 1, 'ID must be a vector.'
assert F.shape(id_tensor)[0] == F.shape(data_tensor)[0], \
'The data must has the same row size with ID.'
# partition data
machine_id = nid2partid(node_map,id_tensor)
# sort index by machine id
sorted_id = F.tensor(np.argsort(F.asnumpy(machine_id)))
id_tensor = id_tensor[sorted_id]
data_tensor = data_tensor[sorted_id]
machine, count = np.unique(F.asnumpy(machine_id), return_counts=True)
# push data to server by order
start = 0
local_id = None
local_data = None
for idx, machine_idx in enumerate(machine):
end = start + count[idx]
if start == end: # No data for target machine
continue
partial_id = id_tensor[start:end]
partial_data = data_tensor[start:end]
request = PushRequest(name, partial_id, partial_data)
rpc.send_request_to_machine(machine_idx, request)
start += count[idx]
可以通过下面语句在client端获取kvstore。
kvclient = dist.kvstore.get_kvstore()