先說明一下背景,目前正在魔改以下這篇論文的代碼:
https://github.com/QipengGuo/GraphWriter-DGLgithub.com
由于每次完成實驗需要5個小時(baseline),自己的模型需要更久(2倍),非常不利于調參和發現問題,所以開始嘗試使用多卡加速。
torch.nn.DataParallel ==> 簡稱 DP
torch.nn.parallel.DistributedDataParallel ==> 簡稱DDP
一開始采用dp試圖加速,結果因為dgl的實現(每個batch的點都會打包進一個batch,從而不可分割),而torch.nn.DataParallel的實現是把一個batch切分成更小,再加上他的加速性能也不如ddp,所以我開始嘗試魔改成ddp。
另外,作者在實現Sampler的時候是繼承了torch.utils.data.Sampler這個類的,目的在于agenda數據集的文本長度嚴重不均衡,如下:
為了讓模型更快train完,把長度相近的文本打包成一個batch(溫馨提醒,torchtext也有相關的類 bucketiterator[1],大概形式如下:
class BucketSampler(torch.utils.data.Sampler):
def __init__(self, data_source, batch_size=32):
self.data_source = data_source
self.batch_size = batch_size
def __iter__(self):
idxs, lens, batch, middle_batch_size, long_batch_size = basesampler(self.data_source , self.batch_size)
for idx in idxs:
batch.append(idx)
mlen = max([0]+[lens[x] for x in batch])
#if (mlen<100 and len(batch) == 32) or (mlen>100 and mlen<220 and len(batch) >= 24) or (mlen>220 and len(batch)>=8) or len(batch)==32:
if (mlen<100 and len(batch) == self.batch_size) or (mlen>100 and mlen<220 and len(batch) >= middle_batch_size) or (mlen>220 and len(batch)>=long_batch_size) or len(batch)==self.batch_size:
yield batch
batch = []
if len(batch) > 0:
yield batch
def __len__(self):
return (len(self.data_source)+self.batch_size-1)//self.batch_size
這是背景。
寫bug第一步:繼承DistributedSampler的漏洞百出
我一開始理想當然的把作者的sampler源碼crtl-cv下來,唯獨只改動了這里:
class DDPBaseBucketSampler(torch.utils.data.distributed.DistributedSampler):
隨后就發現了幾個問題:
然后我就開始看起了源碼[2],很快啊:
def __iter__(self) -> Iterator[T_co]:
if self.shuffle:
# deterministically shuffle based on epoch and seed
g = torch.Generator()
g.manual_seed(self.seed + self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist() # type: ignore
else:
indices = list(range(len(self.dataset))) # type: ignore
if not self.drop_last:
# add extra samples to make it evenly divisible
padding_size = self.total_size - len(indices)
if padding_size <= len(indices):
indices += indices[:padding_size]
else:
indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size]
else:
# remove tail of data to make it evenly divisible.
indices = indices[:self.total_size]
assert len(indices) == self.total_size
# subsample
indices = indices[self.rankself.num_replicas] # 這一步保證每個進程拿到的數據不同
assert len(indices) == self.num_samples
return iter(indices)
這里最關鍵的問題是是什么呢?首先在torch.utils.data.distributed.DistributedSampler里面,數據集的變量叫self.dataset而不是data_source;其次和torch.utils.data.Sampler要求你_重寫__iter__函數不同:
def __iter__(self) -> Iterator[T_co]:
raise NotImplementedError
DistributedSampler這個父類里有部分實現,如果你沒有考慮到這部分,就自然會出現每個進程拿到的數據都是all的情況。
于是我重寫了我的DDPBaseBucketSampler類:
def basesampler(lens, indices, batch_size):
# the magic number comes from the author's code
t1 = []
t2 = []
t3 = []
for i, l in enumerate(lens):
if (l<100):
t1.append(indices[i])
elif (l>100 and l<220):
t2.append(indices[i])
else:
t3.append(indices[i])
datas = [t1,t2,t3]
random.shuffle(datas)
idxs = sum(datas, [])
batch = []
#為了保證不爆卡,我們給不同長度的數據上保護鎖
middle_batch_size = min(int(batch_size * 0.75) , 32)
long_batch_size = min(int(batch_size * 0.5) , 24)
return idxs, batch, middle_batch_size, long_batch_size
class DDPBaseBucketSampler(torch.utils.data.distributed.DistributedSampler):
'''
這里要注意和單GPU的sampler類同步
'''
def __init__(self, dataset, num_replicas, rank, shuffle=True, batch_size=32):
super(DDPBaseBucketSampler, self).__init__(dataset, num_replicas, rank, shuffle)
self.batch_size = batch_size
def __iter__(self):
# deterministically shuffle based on epoch
g = torch.Generator()
g.manual_seed(self.epoch)
#print('here is pytorch code and you can delete it in the /home/lzk/anaconda3/lib/python3.7/site-packages/torch/utils/data')
if self.shuffle:
indices = torch.randperm(len(self.dataset), generator=g).tolist()
else:
indices = list(range(len(self.dataset)))
# add extra samples to make it evenly divisible
indices += indices[:(self.total_size - len(indices))]
assert len(indices) == self.total_size
indices = indices[self.rankself.num_replicas]
assert len(indices) == self.num_samples
# 然后我也要拿到每個數據的長度 (每個rank不同)
lens = torch.Tensor([len(x) for x in self.dataset])
idxs, batch, middle_batch_size, long_batch_size = basesampler(lens[indices], indices, self.batch_size)
for idx in idxs:
batch.append(idx)
mlen = max([0]+[lens[x] for x in batch])
#if (mlen<100 and len(batch) == 32) or (mlen>100 and mlen<220 and len(batch) >= 24) or (mlen>220 and len(batch)>=8) or len(batch)==32:
if (mlen<100 and len(batch) == self.batch_size) or (mlen>100 and mlen<220 and len(batch) >= middle_batch_size) or (mlen>220 and len(batch)>=long_batch_size) or len(batch)==self.batch_size:
yield batch
batch = []
# print('應該出現2次如果是2個進程的話')
if len(batch) > 0:
yield batch
def __len__(self):
return (len(self.dataset)+self.batch_size-1)//self.batch_size
后面每個進程終于可以跑屬于自己的數據了(1/n,n=進程數量=GPU數量,單機)
緊接著問題又來了,我發現訓練過程正常結束后,主進程無法退出mp.spawn()函數。
寫bug第二步,master進程無法正常結束
number workers ddp pytorch下無法正常結束。具體表現為,mp.spawn傳遞的函數參數可以順利運行完,但是master進程一直占著卡,不退出。一開始我懷疑是sampler函數的分發batch的機制導致的,什么意思呢?就是由于每個進程拿到的數據不一樣,各自進程執行sampler類的時候,由于我規定了長度接近的文本打包在一起,所以可能master進程有一百個iter,slave只有80個,然后我馬上試了一下,很快啊:
發現只有細微的差別,并且,程序最后都越過了這些print,應該不會是batch數量不一致導致的問題。(順便指的一提的是,sampler在很早的時候就把batch打包好了)
加了摧毀進程,也于事無補
if args.is_ddp:
dist.destroy_process_group()
print('rank destroy_process_group: ' , rank)
然后只能點擊強制退出
File "train.py", line 322, in
main(args.gpu, args)
File "/home/lzk/anaconda3/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 171, in spawn
while not spawn_context.join():
File "/home/lzk/anaconda3/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 77, in join
timeout=timeout,
File "/home/lzk/anaconda3/lib/python3.7/multiprocessing/connection.py", line 920, in wait
ready = selector.select(timeout)
File "/home/lzk/anaconda3/lib/python3.7/selectors.py", line 415, in select
fd_event_list = self._selector.poll(timeout)
TypeError: keyboard_interrupt_handler() takes 1 positional argument but 2 were given
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/home/lzk/anaconda3/lib/python3.7/multiprocessing/popen_fork.py", line 28, in poll
pid, sts = os.waitpid(self.pid, flag)
TypeError: keyboard_interrupt_handler() takes 1 positional argument but 2 were given
代碼參考:基于Python初探Linux下的僵尸進程和孤兒進程(三)[3]、Multiprocessing in python blocked[4]
很顯然是pytorch master進程產生死鎖了,變成了僵尸進程。
再探究,發現當我把dataloader的number workers設為0的時候,程序可以正常結束。經過我的注釋大法后我發現,哪怕我把for _i , batch in enumerate(dataloader)內的代碼全部注釋改為pass,程序還是會出現master無法正常結束的情況。所以問題鎖定在dataloader身上。參考:nero:PyTorch DataLoader初探[5]
另外一種想法是,mp.spawn出現了問題。使用此方式啟動的進程,只會執行和 target 參數或者 run() 方法相關的代碼。Windows 平臺只能使用此方法,事實上該平臺默認使用的也是該啟動方式。相比其他兩種方式,此方式啟動進程的效率最低。參考:Python設置進程啟動的3種方式[6]
現在試一下,繞開mp.spawn函數,用shell腳本實現ddp,能不能不報錯:
python -m torch.distributed.launch --nproc_per_node=2 --nnodes=1 --node_rank=0 --master_addr="192.168.1.201" --master_port=23456 我的文件.py
參數解釋:
- nnodes:因為是單機多卡,所以設為1,顯然node_rank 只能是0了
- local_rank:進程在運行的時候,會利用args插入local_rank這個參數標識進程序號
一番改動后,發現問題有所好轉,最直觀的感受是速度快了非常多!!現在我沒有父進程的問題了,但還是在運行完所有的程序后,無法正常結束:
此時我的代碼運行到:
上面的代碼是main函數,2個進程(master,salve)都可以越過barrier,其中slave順利結束,但是master卻遲遲不見蹤影:
這個時候ctrl+c終止,發現:
順著報錯路徑去torch/distributed/launch.py, line 239找代碼:
def main():
args = parse_args()
# world size in terms of number of processes
dist_world_size = args.nproc_per_node * args.nnodes
# set PyTorch distributed related environmental variables
current_env = os.environ.copy()
current_env["MASTER_ADDR"] = args.master_addr
current_env["MASTER_PORT"] = str(args.master_port)
current_env["WORLD_SIZE"] = str(dist_world_size)
processes = []
if 'OMP_NUM_THREADS' not in os.environ and args.nproc_per_node > 1:
current_env["OMP_NUM_THREADS"] = str(1)
print("*****************************************
"
"Setting OMP_NUM_THREADS environment variable for each process "
"to be {} in default, to avoid your system being overloaded, "
"please further tune the variable for optimal performance in "
"your application as needed.
"
"*****************************************".format(current_env["OMP_NUM_THREADS"]))
for local_rank in range(0, args.nproc_per_node):
# each process's rank
dist_rank = args.nproc_per_node * args.node_rank + local_rank
current_env["RANK"] = str(dist_rank)
current_env["LOCAL_RANK"] = str(local_rank)
# spawn the processes
if args.use_env:
cmd = [sys.executable, "-u",
args.training_script] + args.training_script_args
else:
cmd = [sys.executable,
"-u",
args.training_script,
"--local_rank={}".format(local_rank)] + args.training_script_args
process = subprocess.Popen(cmd, env=current_env)
processes.append(process)
for process in processes:
process.wait() # 等待運行結束
if process.returncode != 0:
raise subprocess.CalledProcessError(returncode=process.returncode,
cmd=cmd)
可惡,master和dataloader到底有什么關系哇。。
這個問題終于在昨天(2020/12/22)被解決了,說來也好笑,左手是graphwriter的ddp實現,無法正常退出,右手是minst的ddp最小例程,可以正常退出,于是我開始了刪減大法。替換了數據集,model,然后讓dataloader空轉,都沒有發現問題,最后一步步逼近,知道我把自己的代碼這一行注釋掉以后,終于可以正常結束了:
def main(args):
############################################################
print('local_rank : ' , args.local_rank )
if args.is_ddp:
dist.init_process_group(
backend='nccl',
init_method='env://',
world_size=args.world_size,
rank=args.local_rank
)
############################################################
# torch.multiprocessing.set_sharing_strategy('file_system') 萬惡之源
os.environ["CUDA_VISIBLE_DEVICES"] = os.environ["CUDA_VISIBLE_DEVICES"].split(',')[args.local_rank]
args.device = torch.device(0)
...
為什么我當時會加上這句話呢?因為當時在調試number worker的時候(當時年輕,以為越大越好,所以設置成了number workers = cpu.count()),發現系統報錯,說超出了打開文件的最大數量限制。在torch.multiprocessing的設定里,共享策略(參考pytorch中文文檔[7])默認是File descriptor,此策略將使用文件描述符作為共享內存句柄。當存儲被移動到共享內存中,一個由shm_open
獲得的文件描述符被緩存。當時,文檔還提到:
如果你的系統對打開的文件描述符數量有限制,并且無法提高,你應該使用
file_system
策略。
所以我換成了torch.multiprocessing.set_sharing_strategy('file_system'),但是卻忽略文檔里的共享內存泄露警告。顯然,或許這不是嚴重的問題,文檔里提到:
也有可能我所說的master進程就是這個torch_shm_manager,因為destory進程組始終無法結束0號進程:
這個BUG結束了,真開心,期待下一個BUG快快到來。
責任編輯:xj
原文標題:Pytorch翻車記錄:單卡改多卡踩坑記!
文章出處:【微信公眾號:深度學習自然語言處理】歡迎添加關注!文章轉載請注明出處。
-
機器學習
+關注
關注
66文章
8428瀏覽量
132837 -
深度學習
+關注
關注
73文章
5510瀏覽量
121338 -
pytorch
+關注
關注
2文章
808瀏覽量
13283
原文標題:Pytorch翻車記錄:單卡改多卡踩坑記!
文章出處:【微信號:zenRRan,微信公眾號:深度學習自然語言處理】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論