program.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. # Copyright (c) 2022 NVIDIA Corporation. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import time
  15. import logging
  16. from profile import Profiler
  17. import numpy as np
  18. from optimizer import build_optimizer
  19. from lr_scheduler import build_lr_scheduler
  20. from utils.misc import AverageMeter
  21. from utils.mode import Mode, RunScope
  22. from utils.utility import get_num_trainers
  23. import models
  24. import dllogger
  25. import paddle
  26. import paddle.nn.functional as F
  27. from paddle.distributed import fleet
  28. from paddle.distributed.fleet import DistributedStrategy
  29. from paddle.static import sparsity
  30. from paddle.distributed.fleet.meta_optimizers.common import CollectiveHelper
  31. def create_feeds(image_shape):
  32. """
  33. Create feeds mapping for the inputs of Pragrm execution.
  34. Args:
  35. image_shape(list[int]): Model input shape, such as [4, 224, 224].
  36. Returns:
  37. feeds(dict): A dict to map variables'name to their values.
  38. key (string): Name of variable to feed.
  39. Value (tuple): paddle.static.data.
  40. """
  41. feeds = dict()
  42. feeds['data'] = paddle.static.data(
  43. name="data", shape=[None] + image_shape, dtype="float32")
  44. feeds['label'] = paddle.static.data(
  45. name="label", shape=[None, 1], dtype="int64")
  46. return feeds
  47. def create_fetchs(out, feeds, class_num, label_smoothing=0, mode=Mode.TRAIN):
  48. """
  49. Create fetchs to obtain specific outputs from Pragrm execution (included loss and measures).
  50. Args:
  51. out(variable): The model output variable.
  52. feeds(dict): A dict of mapping variables'name to their values
  53. (The input of Program execution).
  54. class_num(int): The number of classes.
  55. label_smoothing(float, optional): Epsilon of label smoothing. Default: 0.
  56. mode(utils.Mode, optional): Train or eval mode. Default: Mode.TRAIN
  57. Returns:
  58. fetchs(dict): A dict of outputs from Program execution (included loss and measures).
  59. key (string): Name of variable to fetch.
  60. Value (tuple): (variable, AverageMeter).
  61. """
  62. fetchs = dict()
  63. target = paddle.reshape(feeds['label'], [-1, 1])
  64. if mode == Mode.TRAIN:
  65. if label_smoothing == 0:
  66. loss = F.cross_entropy(out, target)
  67. else:
  68. label_one_hot = F.one_hot(target, class_num)
  69. soft_target = F.label_smooth(
  70. label_one_hot, epsilon=label_smoothing)
  71. soft_target = paddle.reshape(soft_target, shape=[-1, class_num])
  72. log_softmax = -F.log_softmax(out, axis=-1)
  73. loss = paddle.sum(log_softmax * soft_target, axis=-1)
  74. else:
  75. loss = F.cross_entropy(out, target)
  76. label = paddle.argmax(out, axis=-1, dtype='int32')
  77. fetchs['label'] = (label, None)
  78. loss = loss.mean()
  79. fetchs['loss'] = (loss, AverageMeter('loss', '7.4f', need_avg=True))
  80. acc_top1 = paddle.metric.accuracy(input=out, label=target, k=1)
  81. acc_top5 = paddle.metric.accuracy(input=out, label=target, k=5)
  82. metric_dict = dict()
  83. metric_dict["top1"] = acc_top1
  84. metric_dict["top5"] = acc_top5
  85. for key in metric_dict:
  86. if mode != Mode.TRAIN and paddle.distributed.get_world_size() > 1:
  87. paddle.distributed.all_reduce(
  88. metric_dict[key], op=paddle.distributed.ReduceOp.SUM)
  89. metric_dict[key] = metric_dict[
  90. key] / paddle.distributed.get_world_size()
  91. fetchs[key] = (metric_dict[key], AverageMeter(
  92. key, '7.4f', need_avg=True))
  93. return fetchs
  94. def create_strategy(args, is_train=True):
  95. """
  96. Create paddle.static.BuildStrategy and paddle.static.ExecutionStrategy with arguments.
  97. Args:
  98. args(Namespace): Arguments obtained from ArgumentParser.
  99. is_train(bool, optional): Indicate the prupose of strategy is for training
  100. of not. Default is True.
  101. Returns:
  102. build_strategy(paddle.static.BuildStrategy): A instance of BuildStrategy.
  103. exec_strategy(paddle.static.ExecutionStrategy): A instance of ExecutionStrategy.
  104. """
  105. build_strategy = paddle.static.BuildStrategy()
  106. exec_strategy = paddle.static.ExecutionStrategy()
  107. exec_strategy.num_threads = 1
  108. exec_strategy.num_iteration_per_drop_scope = (10000 if args.amp and
  109. args.use_pure_fp16 else 10)
  110. paddle.set_flags({
  111. 'FLAGS_cudnn_exhaustive_search': True,
  112. 'FLAGS_conv_workspace_size_limit': 4096
  113. })
  114. if not is_train:
  115. build_strategy.fix_op_run_order = True
  116. if args.amp:
  117. build_strategy.fuse_bn_act_ops = True
  118. build_strategy.fuse_elewise_add_act_ops = True
  119. build_strategy.fuse_bn_add_act_ops = True
  120. build_strategy.enable_addto = True
  121. return build_strategy, exec_strategy
  122. def dist_optimizer(args, optimizer):
  123. """
  124. Create a distributed optimizer based on a given optimizer.
  125. Args:
  126. args(Namespace): Arguments obtained from ArgumentParser.
  127. optimizer(paddle.optimizer): A normal optimizer.
  128. Returns:
  129. optimizer(fleet.distributed_optimizer): A distributed optimizer.
  130. """
  131. build_strategy, exec_strategy = create_strategy(args)
  132. dist_strategy = DistributedStrategy()
  133. dist_strategy.execution_strategy = exec_strategy
  134. dist_strategy.build_strategy = build_strategy
  135. dist_strategy.fuse_all_reduce_ops = True
  136. all_reduce_size = 16
  137. dist_strategy.fuse_grad_size_in_MB = all_reduce_size
  138. dist_strategy.nccl_comm_num = 1
  139. dist_strategy.sync_nccl_allreduce = True
  140. if args.amp:
  141. dist_strategy.cudnn_batchnorm_spatial_persistent = True
  142. dist_strategy.amp = True
  143. dist_strategy.amp_configs = {
  144. "init_loss_scaling": args.scale_loss,
  145. "use_dynamic_loss_scaling": args.use_dynamic_loss_scaling,
  146. "use_pure_fp16": args.use_pure_fp16
  147. }
  148. dist_strategy.asp = args.asp
  149. optimizer = fleet.distributed_optimizer(optimizer, strategy=dist_strategy)
  150. return optimizer
  151. def build(args, main_prog, startup_prog, step_each_epoch, is_train=True):
  152. """
  153. Build a executable paddle.static.Program via following four steps:
  154. 1. Create feeds.
  155. 2. Create a model.
  156. 3. Create fetchs.
  157. 4. Create an optimizer if is_train==True.
  158. Args:
  159. args(Namespace): Arguments obtained from ArgumentParser.
  160. main_prog(paddle.static.Program):The main program.
  161. startup_prog(paddle.static.Program):The startup program.
  162. step_each_epoch(int): The number of steps in each epoch.
  163. is_train(bool, optional): Whether the main programe created is for training. Default: True.
  164. Returns:
  165. fetchs(dict): A dict of outputs from Program execution (included loss and measures).
  166. lr_scheduler(paddle.optimizer.lr.LRScheduler): A learning rate scheduler.
  167. feeds(dict): A dict to map variables'name to their values.
  168. optimizer(Optimizer): An optimizer with distributed/AMP/ASP strategy.
  169. """
  170. with paddle.static.program_guard(main_prog, startup_prog):
  171. with paddle.utils.unique_name.guard():
  172. mode = Mode.TRAIN if is_train else Mode.EVAL
  173. feeds = create_feeds(args.image_shape)
  174. model_name = args.model_arch_name
  175. class_num = args.num_of_class
  176. input_image_channel = args.image_channel
  177. data_format = args.data_layout
  178. use_pure_fp16 = args.use_pure_fp16
  179. bn_weight_decay = args.bn_weight_decay
  180. model = models.__dict__[model_name](
  181. class_num=class_num,
  182. input_image_channel=input_image_channel,
  183. data_format=data_format,
  184. use_pure_fp16=use_pure_fp16,
  185. bn_weight_decay=bn_weight_decay)
  186. out = model(feeds["data"])
  187. fetchs = create_fetchs(
  188. out, feeds, class_num, args.label_smoothing, mode=mode)
  189. if args.asp:
  190. sparsity.set_excluded_layers(main_prog, [model.fc.weight.name])
  191. lr_scheduler = None
  192. optimizer = None
  193. if is_train:
  194. lr_scheduler = build_lr_scheduler(args, step_each_epoch)
  195. optimizer = build_optimizer(args, lr_scheduler)
  196. optimizer = dist_optimizer(args, optimizer)
  197. optimizer.minimize(fetchs['loss'][0], startup_prog)
  198. # This is a workaround to "Communicator of ring id 0 has not been initialized.".
  199. # Since Paddle's design, the initialization would be done inside train program,
  200. # eval_only need to manually call initialization.
  201. if args.run_scope == RunScope.EVAL_ONLY and \
  202. paddle.distributed.get_world_size() > 1:
  203. collective_helper = CollectiveHelper(
  204. role_maker=fleet.PaddleCloudRoleMaker(is_collective=True))
  205. collective_helper.update_startup_program(startup_prog)
  206. return fetchs, lr_scheduler, feeds, optimizer
  207. def compile_prog(args, program, loss_name=None, is_train=True):
  208. """
  209. Compile the given program, which would fuse computing ops or optimize memory footprint
  210. based building strategy in config.
  211. Args:
  212. args(Namespace): Arguments obtained from ArgumentParser.
  213. program(paddle.static.Program): The main program to be compiled.
  214. loss_name(str, optional): The name of loss variable. Default: None.
  215. is_train(bool, optional): Indicate the prupose of strategy is for
  216. training of not. Default is True.
  217. Returns:
  218. compiled_program(paddle.static.CompiledProgram): A compiled program.
  219. """
  220. build_strategy, exec_strategy = create_strategy(args, is_train)
  221. compiled_program = paddle.static.CompiledProgram(
  222. program).with_data_parallel(
  223. loss_name=loss_name,
  224. build_strategy=build_strategy,
  225. exec_strategy=exec_strategy)
  226. return compiled_program
  227. def run(args,
  228. dataloader,
  229. exe,
  230. program,
  231. fetchs,
  232. epoch,
  233. mode=Mode.TRAIN,
  234. lr_scheduler=None):
  235. """
  236. Execute program.
  237. Args:
  238. args(Namespace): Arguments obtained from ArgumentParser.
  239. dataloader(nvidia.dali.plugin.paddle.DALIGenericIterator):
  240. Iteratable output of NVIDIA DALI pipeline,
  241. please refer to dali_dataloader in dali.py for details.
  242. exe(paddle.static.Executor): A executor to run program.
  243. program(paddle.static.Program): The program to be executed.
  244. fetchs(dict): A dict of outputs from Program execution (included loss and measures).
  245. epoch(int): Current epoch id to run.
  246. mode(utils.Mode, optional): Train or eval mode. Default: Mode.TRAIN.
  247. lr_scheduler(paddle.optimizer.lr.LRScheduler, optional): A learning rate scheduler.
  248. Default: None.
  249. Returns:
  250. metrics (dict): A dictionary to collect values of metrics.
  251. """
  252. num_trainers = get_num_trainers()
  253. fetch_list = [f[0] for f in fetchs.values()]
  254. metric_dict = {"lr": AverageMeter('lr', 'f', postfix=",", need_avg=False)}
  255. for k in fetchs:
  256. if fetchs[k][1] is not None:
  257. metric_dict[k] = fetchs[k][1]
  258. metric_dict["batch_time"] = AverageMeter(
  259. 'batch_time', '.5f', postfix=" s,")
  260. metric_dict["data_time"] = AverageMeter('data_time', '.5f', postfix=" s,")
  261. metric_dict["compute_time"] = AverageMeter(
  262. 'compute_time', '.5f', postfix=" s,")
  263. for m in metric_dict.values():
  264. m.reset()
  265. profiler = Profiler()
  266. tic = time.perf_counter()
  267. idx = 0
  268. batch_size = None
  269. latency = []
  270. total_benchmark_steps = \
  271. args.benchmark_steps + args.benchmark_warmup_steps
  272. dataloader.reset()
  273. while True:
  274. # profiler.profile_setup return True only when
  275. # profile is enable and idx == stop steps
  276. if profiler.profile_setup(idx):
  277. break
  278. idx += 1
  279. try:
  280. batch = next(dataloader)
  281. except StopIteration:
  282. # Reset dataloader when run benchmark to fill required steps.
  283. if args.benchmark and (idx < total_benchmark_steps):
  284. dataloader.reset()
  285. # Reset tic timestamp to ignore exception handling time.
  286. tic = time.perf_counter()
  287. continue
  288. break
  289. except RuntimeError:
  290. logging.warning(
  291. "Except RuntimeError when reading data from dataloader, try to read once again..."
  292. )
  293. continue
  294. reader_toc = time.perf_counter()
  295. metric_dict['data_time'].update(reader_toc - tic)
  296. batch_size = batch[0]["data"].shape()[0]
  297. feed_dict = batch[0]
  298. with profiler.profile_tag(idx, "Training"
  299. if mode == Mode.TRAIN else "Evaluation"):
  300. results = exe.run(program=program,
  301. feed=feed_dict,
  302. fetch_list=fetch_list)
  303. for name, m in zip(fetchs.keys(), results):
  304. if name in metric_dict:
  305. metric_dict[name].update(np.mean(m), batch_size)
  306. metric_dict["compute_time"].update(time.perf_counter() - reader_toc)
  307. metric_dict["batch_time"].update(time.perf_counter() - tic)
  308. if mode == Mode.TRAIN:
  309. metric_dict['lr'].update(lr_scheduler.get_lr())
  310. if lr_scheduler is not None:
  311. with profiler.profile_tag(idx, "LR Step"):
  312. lr_scheduler.step()
  313. tic = time.perf_counter()
  314. if idx % args.print_interval == 0:
  315. log_msg = dict()
  316. log_msg['loss'] = metric_dict['loss'].val.item()
  317. log_msg['top1'] = metric_dict['top1'].val.item()
  318. log_msg['top5'] = metric_dict['top5'].val.item()
  319. log_msg['data_time'] = metric_dict['data_time'].val
  320. log_msg['compute_time'] = metric_dict['compute_time'].val
  321. log_msg['batch_time'] = metric_dict['batch_time'].val
  322. log_msg['ips'] = \
  323. batch_size * num_trainers / metric_dict['batch_time'].val
  324. if mode == Mode.TRAIN:
  325. log_msg['lr'] = metric_dict['lr'].val
  326. log_info((epoch, idx), log_msg, mode)
  327. if args.benchmark:
  328. latency.append(metric_dict['batch_time'].val)
  329. # Ignore the warmup iters
  330. if idx == args.benchmark_warmup_steps:
  331. metric_dict["compute_time"].reset()
  332. metric_dict["data_time"].reset()
  333. metric_dict["batch_time"].reset()
  334. latency.clear()
  335. logging.info("Begin benchmark at step %d", idx + 1)
  336. if idx == total_benchmark_steps:
  337. benchmark_data = dict()
  338. benchmark_data[
  339. 'ips'] = batch_size * num_trainers / metric_dict[
  340. 'batch_time'].avg
  341. if mode == mode.EVAL:
  342. latency = np.array(latency) * 1000
  343. quantile = np.quantile(latency, [0.9, 0.95, 0.99])
  344. benchmark_data['latency_avg'] = np.mean(latency)
  345. benchmark_data['latency_p90'] = quantile[0]
  346. benchmark_data['latency_p95'] = quantile[1]
  347. benchmark_data['latency_p99'] = quantile[2]
  348. logging.info("End benchmark at epoch step %d", idx)
  349. return benchmark_data
  350. epoch_data = dict()
  351. epoch_data['loss'] = metric_dict['loss'].avg.item()
  352. epoch_data['epoch_time'] = metric_dict['batch_time'].total
  353. epoch_data['ips'] = batch_size * num_trainers * \
  354. metric_dict["batch_time"].count / metric_dict["batch_time"].sum
  355. if mode == Mode.EVAL:
  356. epoch_data['top1'] = metric_dict['top1'].avg.item()
  357. epoch_data['top5'] = metric_dict['top5'].avg.item()
  358. log_info((epoch, ), epoch_data, mode)
  359. return epoch_data
  360. def log_info(step, metrics, mode):
  361. """
  362. Log metrics with step and mode information.
  363. Args:
  364. step(tuple): Step, coulbe (epoch-id, iter-id). Use tuple() for summary.
  365. metrics(dict): A dictionary collected values of metrics.
  366. mode(utils.Mode): Train or eval mode.
  367. """
  368. prefix = 'train' if mode == Mode.TRAIN else 'val'
  369. dllogger_iter_data = dict()
  370. for key in metrics:
  371. dllogger_iter_data[f"{prefix}.{key}"] = metrics[key]
  372. dllogger.log(step=step, data=dllogger_iter_data)