DGL分布式训练

由于目前图神经网络框架DGL的分布式模块仍在建设中,文档非常不齐全,故本文主要记录分布式训练GNN的整个流程。

目前分布式代码都被放在dgl.distributed文件夹中,核心部分是分布式的KVStore,基本上所有的上层封装的模块(包括DistGraphDistTensorDistEmbedding等)都会走KVServer和KVClient的接口。

注意目前DGL对于大规模分布式部署实际上是做得没有Euler好的,几个缺点包括:

  • 无法支持long-running graph store,且需要提前设置好client的数目,也就是没有办法动态地提交任务,这一点将造成重复的构图开销
  • server和client co-locate在一起,即一台机器上必须同时有server又有client,且server和client之间通过shared memory进行通信

因此如果要实现特定的深度学习分布式架构(比如parameter server),用原生的DGL是比较难实现的,需要做一定的适配和改动才有办法达成需求。

初始化

在所有分布式训练启动前都应先配置好各台机器的IP地址,通过initialize实现。

def initialize(ip_config, num_servers=1, num_workers=0)

其中ip_config是server的地址,num_server每台机器上的server进程数,num_worker是每台机器上sampler的进程数。

多个server在同一台机器上,除了主server,其他为backup server。send_requests_to_machine这一行通过随机选择server machine来发送对应RPC请求,从而实现负载均衡。

进来先会判断DGL_ROLE,包括serversamplerdefault三种角色,可以用os.environ['DGL_ROLE'] = 'server'进行配置。

  • server:需要配置相关环境变量,然后会创建一个DistGraphServer(从KVServer继承而来),然后start服务
    • backup server只会load_partition_book而不会加载整个graph,而master server则会通过load_partition把整个graph读进来,然后再读partition book
  • worker:会跳转到下半部分的逻辑,通过connect_to_server连接到上述的server,接着用init_kvstore创建KVClient,后续通过get_kvstore可获取该worker。

下面以1 server + 1 sampler + 1 worker来说明怎么初始化DGL的相关数据结构。这里server和sampler放在同一台机器上,专门用来存储graph structure & features;worker则放在另一台机器,专门用来做计算

# kv_ip_config.txt
127.0.0.1 9000 # server's ip address & port

# kv_ip_config_multi.txt
# 在文件中只需要写主server的IP地址和端口即可
# bakcup server的端口从主端口开始会依次增加1
# rpc.read_ip_config(ip_config, num_servers)
127.0.0.1 9000
127.0.0.2 9001

# 如上述的配置
# 通过rpc.read_ip_config(ip_config, num_servers)可读出server namebook
# key: [machine_id, ip, port, num_servers]
# {0: [0, "127.0.0.1", 9000, 2],
#  1: [0, "127.0.0.1", 9001, 2],
#  2: [1, "127.0.0.2", 9001, 2],
#  3: [1, "127.0.0.2", 9002, 2]}

# server
os.environ['DGL_DIST_MODE'] = 'distributed'
os.environ['DGL_ROLE'] = 'server'
os.environ['DGL_SERVER_ID'] = "0"
os.environ['DGL_IP_CONFIG'] = 'kv_ip_config.txt'
os.environ['DGL_NUM_SERVER'] = "1"
os.environ['DGL_NUM_CLIENT'] = "2" # worker + sampler
os.environ['DGL_CONF_PATH'] = 'part_graph.json'
dist.initialize(ip_config='kv_ip_config.txt',
                num_servers=1,
                num_workers=0) # 如果设置了环境变量,则传入的这几个参数没有作用

# sampler/client
os.environ['DGL_DIST_MODE'] = 'distributed'
os.environ['DGL_ROLE'] = 'sampler'
dist.initialize(ip_config='kv_ip_config.txt',
                num_servers=self.num_servers,
                num_workers=self.num_clients)

# worker
os.environ['DGL_DIST_MODE'] = 'distributed'
os.environ['DGL_ROLE'] = 'default'
# Note: connect to server first!
dist.initialize(ip_config='kv_ip_config.txt',
                num_servers=self.num_servers,
                num_workers=0)

构图

先用Metis做图划分,然后读入分布式图。

import dgl.distributed as dist
dist.partition_graph(G, name, num_parts=1, out_path='dataset/'+name+"_tmp")
g = dist.DistGraph(name, part_config="dataset/{0}_tmp/{0}.json".format(name))

如果要实现多机,并且server和client分开,则需要分别起。

# server
DistGraphServer(server_id, ip_config, num_servers,
                num_clients, part_config, disable_shared_mem=False)
# sampler/client
DistGraph(graph_name, gpb=None, part_config=None)

如果disable_shared_mem没有禁掉,则server和client会在同一台机器上共享一个partition。

KVClient里的init_data会调用empty_shared_mem,进而强制server和client得在同台机器上通过shared mem通信,这一点是DGL设计的一个非常大的弊端。

shared_data = empty_shared_mem(name+'-kvdata-', True, data_tensor.shape, data_type)

注意server端如果调用了dist.initialize,则不需要自己再起图;而在sampler/client这边则需要通过dist.initializeDistGraph两个接口与graph store建立联系。

DistTensor

DistEmbedding底层实现是DistTensor

DistTensor会获取KVClient,然后通过init_data初始化数据。

DistTensor(shape, dtype, name=None, init_func=None, part_policy=None, persistent=False)

DistEmbedding目前只支持SparseAdaGrad一个优化类。

下面给出每个模块应该在哪一部分声明:

  • Server:DistGraphServer
  • Client:DistGraphDistEmbeddingDistDataloader
  • Worker:通过获取KVClient来pull embedding,采样结果则需要另外走socket传送

KVStore

由于DGL原生实现的KVStore一定会先从shared memory里找数据,这对server和client分离的情况并不友好,因此下面对KVClient的pull和push的模块做了一定修改,以强制其从远端server获取数据。(原始代码可在dist.kvstore里找到)

# 这里需要人为传入node_map
def pull(kvclient, name, id_tensor, node_map):
    assert len(name) > 0, 'name cannot be empty.'
    id_tensor = utils.toindex(id_tensor)
    id_tensor = id_tensor.tousertensor()
    assert F.ndim(id_tensor) == 1, 'ID must be a vector.'
    # partition data
    machine_id = nid2partid(node_map,id_tensor)
    # sort index by machine id
    sorted_id = F.tensor(np.argsort(F.asnumpy(machine_id)))
    back_sorted_id = F.tensor(np.argsort(F.asnumpy(sorted_id)))
    id_tensor = id_tensor[sorted_id]
    machine, count = np.unique(F.asnumpy(machine_id), return_counts=True)
    # pull data from server by order
    start = 0
    pull_count = 0
    local_id = None
    for idx, machine_idx in enumerate(machine):
        end = start + count[idx]
        if start == end: # No data for target machine
            continue
        partial_id = id_tensor[start:end]
        request = PullRequest(name, partial_id)
        rpc.send_request_to_machine(machine_idx, request)
        pull_count += 1
        start += count[idx]
    # recv response
    response_list = []
    # wait response from remote server nodes
    for i in range(pull_count):
        remote_response = rpc.recv_response()
        response_list.append(remote_response)
    # sort response by server_id and concat tensor
    response_list.sort(key=kvclient._take_id)
    data_tensor = F.cat(seq=[response.data_tensor for response in response_list], dim=0)
    return data_tensor[back_sorted_id] # return data with original index order

def push(kvclient, name, id_tensor, data_tensor, node_map):
    assert len(name) > 0, 'name cannot be empty.'
    id_tensor = utils.toindex(id_tensor)
    id_tensor = id_tensor.tousertensor()
    assert F.ndim(id_tensor) == 1, 'ID must be a vector.'
    assert F.shape(id_tensor)[0] == F.shape(data_tensor)[0], \
    'The data must has the same row size with ID.'
    # partition data
    machine_id = nid2partid(node_map,id_tensor)
    # sort index by machine id
    sorted_id = F.tensor(np.argsort(F.asnumpy(machine_id)))
    id_tensor = id_tensor[sorted_id]
    data_tensor = data_tensor[sorted_id]
    machine, count = np.unique(F.asnumpy(machine_id), return_counts=True)
    # push data to server by order
    start = 0
    local_id = None
    local_data = None
    for idx, machine_idx in enumerate(machine):
        end = start + count[idx]
        if start == end: # No data for target machine
            continue
        partial_id = id_tensor[start:end]
        partial_data = data_tensor[start:end]
        request = PushRequest(name, partial_id, partial_data)
        rpc.send_request_to_machine(machine_idx, request)
        start += count[idx]

可以通过下面语句在client端获取kvstore。

kvclient = dist.kvstore.get_kvstore()