utils.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. # Copyright (c) 2020 NVIDIA CORPORATION. All rights reserved.
  2. # Licensed under the Apache License, Version 2.0 (the "License");
  3. # you may not use this file except in compliance with the License.
  4. # You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software
  9. # distributed under the License is distributed on an "AS IS" BASIS,
  10. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. # See the License for the specific language governing permissions and
  12. # limitations under the License.
  13. import json, pickle, sys, unicodedata, six, time, os
  14. import horovod.tensorflow as hvd
  15. import tensorflow as tf
  16. import dllogger
  17. def get_rank():
  18. try:
  19. return hvd.rank()
  20. except:
  21. return 0
  22. def get_world_size():
  23. try:
  24. return hvd.size()
  25. except:
  26. return 1
  27. def is_main_process():
  28. return get_rank() == 0
  29. def format_step(step):
  30. if isinstance(step, str):
  31. return step
  32. s = ""
  33. if len(step) == 1:
  34. s += "Training Iteration: {} ".format(step[0])
  35. return s
  36. if len(step) > 0:
  37. s += "Training Epoch: {} ".format(step[0])
  38. if len(step) > 1:
  39. s += "Training Iteration: {} ".format(step[1])
  40. return s
  41. def load_json(path):
  42. with tf.io.gfile.GFile(path, "r") as f:
  43. return json.load(f)
  44. def write_json(o, path):
  45. if "/" in path:
  46. tf.io.gfile.makedirs(path.rsplit("/", 1)[0])
  47. with tf.io.gfile.GFile(path, "w") as f:
  48. json.dump(o, f)
  49. def load_pickle(path):
  50. with tf.io.gfile.GFile(path, "rb") as f:
  51. return pickle.load(f)
  52. def write_pickle(o, path):
  53. if "/" in path:
  54. tf.io.gfile.makedirs(path.rsplit("/", 1)[0])
  55. with tf.io.gfile.GFile(path, "wb") as f:
  56. pickle.dump(o, f, -1)
  57. def mkdir(path):
  58. if not tf.io.gfile.exists(path):
  59. tf.io.gfile.makedirs(path)
  60. def rmrf(path):
  61. if tf.io.gfile.exists(path):
  62. tf.io.gfile.rmtree(path)
  63. def rmkdir(path):
  64. rmrf(path)
  65. mkdir(path)
  66. def log(*args, **kwargs):
  67. all_rank = kwargs.pop("all_rank", False)
  68. if not all_rank and not is_main_process():
  69. return
  70. msg = " ".join(map(str, args))
  71. sys.stdout.write(msg + "\n")
  72. sys.stdout.flush()
  73. def log_config(config):
  74. for key, value in sorted(config.__dict__.items()):
  75. log(key, value)
  76. log()
  77. def heading(*args):
  78. log(80 * "=")
  79. log(*args)
  80. log(80 * "=")
  81. def nest_dict(d, prefixes, delim="_"):
  82. """Go from {prefix_key: value} to {prefix: {key: value}}."""
  83. nested = {}
  84. for k, v in d.items():
  85. for prefix in prefixes:
  86. if k.startswith(prefix + delim):
  87. if prefix not in nested:
  88. nested[prefix] = {}
  89. nested[prefix][k.split(delim, 1)[1]] = v
  90. else:
  91. nested[k] = v
  92. return nested
  93. def flatten_dict(d, delim="_"):
  94. """Go from {prefix: {key: value}} to {prefix_key: value}."""
  95. flattened = {}
  96. for k, v in d.items():
  97. if isinstance(v, dict):
  98. for k2, v2 in v.items():
  99. flattened[k + delim + k2] = v2
  100. else:
  101. flattened[k] = v
  102. return flattened
  103. def printable_text(text):
  104. """Returns text encoded in a way suitable for print or `tf.logging`."""
  105. # These functions want `str` for both Python2 and Python3, but in one case
  106. # it's a Unicode string and in the other it's a byte string.
  107. if six.PY3:
  108. if isinstance(text, str):
  109. return text
  110. elif isinstance(text, bytes):
  111. return text.decode("utf-8", "ignore")
  112. else:
  113. raise ValueError("Unsupported string type: %s" % (type(text)))
  114. elif six.PY2:
  115. if isinstance(text, str):
  116. return text
  117. elif isinstance(text, unicode):
  118. return text.encode("utf-8")
  119. else:
  120. raise ValueError("Unsupported string type: %s" % (type(text)))
  121. else:
  122. raise ValueError("Not running on Python2 or Python 3?")
  123. def get_readable_time(elapsed):
  124. d, h, m, s = [int(x) for x in time.strftime("%d:%H:%M:%S", time.gmtime(elapsed)).split(':')]
  125. d -= 1
  126. return '{:2d}h{:2d}m{:2d}s'.format(24*d + h, m, s)
  127. def setup_logger(args):
  128. os.makedirs(args.log_dir, exist_ok=True)
  129. if not args.json_summary:
  130. log_path = os.path.join(args.log_dir, 'dllogger_rank{}.log'.format(get_rank()))
  131. else:
  132. log_path = "{}_rank{}".format(args.json_summary, get_rank())
  133. if is_main_process():
  134. dllogger.init(backends = [dllogger.JSONStreamBackend(verbosity=1, filename=log_path),
  135. dllogger.StdOutBackend(verbosity=dllogger.Verbosity.VERBOSE, step_format=format_step)])
  136. else:
  137. dllogger.init(backends = [dllogger.JSONStreamBackend(verbosity=1, filename=log_path)])
  138. for k,v in vars(args).items():
  139. dllogger.log(step='PARAMETER', data={k:v}, verbosity=0)
  140. container_setup_info = {
  141. 'NVIDIA_TENSORFLOW_VERSION': os.environ.get('NVIDIA_TENSORFLOW_VERSION'),
  142. 'TENSORFLOW_VERSION': os.environ.get('TENSORFLOW_VERSION'),
  143. 'CUBLAS_VERSION': os.environ.get('CUBLAS_VERSION'),
  144. 'NCCL_VERSION': os.environ.get('NCCL_VERSION'),
  145. 'CUDA_DRIVER_VERSION': os.environ.get('CUDA_DRIVER_VERSION'),
  146. 'CUDNN_VERSION': os.environ.get('CUDNN_VERSION'),
  147. 'CUDA_VERSION': os.environ.get('CUDA_VERSION'),
  148. 'NVIDIA_PIPELINE_ID': os.environ.get('NVIDIA_PIPELINE_ID'),
  149. 'NVIDIA_BUILD_ID': os.environ.get('NVIDIA_BUILD_ID'),
  150. 'NVIDIA_TF32_OVERRIDE': os.environ.get('NVIDIA_TF32_OVERRIDE'),
  151. }
  152. dllogger.log(step='PARAMETER', data=container_setup_info, verbosity=0)
  153. def postprocess_dllog(args):
  154. if not args.json_summary:
  155. log_path = os.path.join(args.log_dir, 'dllogger_rank{}.log')
  156. else:
  157. log_path = str(args.json_summary) + "_rank{}"
  158. logfiles = [open(log_path.format(i), 'r') for i in range(get_world_size())]
  159. if not args.json_summary:
  160. log_path = os.path.join(args.log_dir, 'dllogger.log')
  161. else:
  162. log_path = str(args.json_summary)
  163. with open(log_path, 'w') as dest_file:
  164. for lines in zip(*[f.readlines() for f in logfiles]):
  165. json_lines = [json.loads(l[5:]) for l in lines]
  166. assert all(x['step'] == json_lines[0]['step'] for x in json_lines)
  167. if json_lines[0]['step'] == 'PARAMETER':
  168. dest_file.write(lines[0])
  169. else:
  170. d = dict.fromkeys(json_lines[0]['data'])
  171. for k in d.keys():
  172. vs = [line['data'][k] for line in json_lines]
  173. d[k] = sum(vs)/len(vs)
  174. json_lines[0]['data'] = d
  175. dest_file.write('DLLL ')
  176. dest_file.write(json.dumps(json_lines[0]))
  177. dest_file.write('\n')
  178. for l in logfiles:
  179. l.close()