| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- # Copyright (c) 2017 Elad Hoffer
- # Copyright (c) 2018-2020, NVIDIA CORPORATION. All rights reserved.
- #
- # Permission is hereby granted, free of charge, to any person obtaining a copy
- # of this software and associated documentation files (the "Software"), to deal
- # in the Software without restriction, including without limitation the rights
- # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- # copies of the Software, and to permit persons to whom the Software is
- # furnished to do so, subject to the following conditions:
- #
- # The above copyright notice and this permission notice shall be included in all
- # copies or substantial portions of the Software.
- #
- # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- # SOFTWARE.
- import logging.config
- import os
- import random
- import sys
- import time
- from contextlib import contextmanager
- import dllogger
- import numpy as np
- import torch
- import torch.distributed as dist
- import torch.nn.init as init
- import torch.utils.collect_env
- def init_lstm_(lstm, init_weight=0.1):
- """
- Initializes weights of LSTM layer.
- Weights and biases are initialized with uniform(-init_weight, init_weight)
- distribution.
- :param lstm: instance of torch.nn.LSTM
- :param init_weight: range for the uniform initializer
- """
- # Initialize hidden-hidden weights
- init.uniform_(lstm.weight_hh_l0.data, -init_weight, init_weight)
- # Initialize input-hidden weights:
- init.uniform_(lstm.weight_ih_l0.data, -init_weight, init_weight)
- # Initialize bias. PyTorch LSTM has two biases, one for input-hidden GEMM
- # and the other for hidden-hidden GEMM. Here input-hidden bias is
- # initialized with uniform distribution and hidden-hidden bias is
- # initialized with zeros.
- init.uniform_(lstm.bias_ih_l0.data, -init_weight, init_weight)
- init.zeros_(lstm.bias_hh_l0.data)
- if lstm.bidirectional:
- init.uniform_(lstm.weight_hh_l0_reverse.data, -init_weight, init_weight)
- init.uniform_(lstm.weight_ih_l0_reverse.data, -init_weight, init_weight)
- init.uniform_(lstm.bias_ih_l0_reverse.data, -init_weight, init_weight)
- init.zeros_(lstm.bias_hh_l0_reverse.data)
- def generate_seeds(rng, size):
- """
- Generate list of random seeds
- :param rng: random number generator
- :param size: length of the returned list
- """
- seeds = [rng.randint(0, 2**32 - 1) for _ in range(size)]
- return seeds
- def broadcast_seeds(seeds, device):
- """
- Broadcasts random seeds to all distributed workers.
- Returns list of random seeds (broadcasted from workers with rank 0).
- :param seeds: list of seeds (integers)
- :param device: torch.device
- """
- if torch.distributed.is_available() and torch.distributed.is_initialized():
- seeds_tensor = torch.tensor(seeds, dtype=torch.int64, device=device)
- torch.distributed.broadcast(seeds_tensor, 0)
- seeds = seeds_tensor.tolist()
- return seeds
- def setup_seeds(master_seed, epochs, device):
- """
- Generates seeds from one master_seed.
- Function returns (worker_seeds, shuffling_seeds), worker_seeds are later
- used to initialize per-worker random number generators (mostly for
- dropouts), shuffling_seeds are for RNGs resposible for reshuffling the
- dataset before each epoch.
- Seeds are generated on worker with rank 0 and broadcasted to all other
- workers.
- :param master_seed: master RNG seed used to initialize other generators
- :param epochs: number of epochs
- :param device: torch.device (used for distributed.broadcast)
- """
- if master_seed is None:
- # random master seed, random.SystemRandom() uses /dev/urandom on Unix
- master_seed = random.SystemRandom().randint(0, 2**32 - 1)
- if get_rank() == 0:
- # master seed is reported only from rank=0 worker, it's to avoid
- # confusion, seeds from rank=0 are later broadcasted to other
- # workers
- logging.info(f'Using random master seed: {master_seed}')
- else:
- # master seed was specified from command line
- logging.info(f'Using master seed from command line: {master_seed}')
- # initialize seeding RNG
- seeding_rng = random.Random(master_seed)
- # generate worker seeds, one seed for every distributed worker
- worker_seeds = generate_seeds(seeding_rng, get_world_size())
- # generate seeds for data shuffling, one seed for every epoch
- shuffling_seeds = generate_seeds(seeding_rng, epochs)
- # broadcast seeds from rank=0 to other workers
- worker_seeds = broadcast_seeds(worker_seeds, device)
- shuffling_seeds = broadcast_seeds(shuffling_seeds, device)
- return worker_seeds, shuffling_seeds
- def barrier():
- """
- Call torch.distributed.barrier() if distritubed is in use
- """
- if torch.distributed.is_available() and torch.distributed.is_initialized():
- torch.distributed.barrier()
- def get_rank():
- """
- Gets distributed rank or returns zero if distributed is not initialized.
- """
- if torch.distributed.is_available() and torch.distributed.is_initialized():
- rank = torch.distributed.get_rank()
- else:
- rank = 0
- return rank
- def get_world_size():
- """
- Gets total number of distributed workers or returns one if distributed is
- not initialized.
- """
- if torch.distributed.is_available() and torch.distributed.is_initialized():
- world_size = torch.distributed.get_world_size()
- else:
- world_size = 1
- return world_size
- @contextmanager
- def sync_workers():
- """
- Yields distributed rank and synchronizes all workers on exit.
- """
- rank = get_rank()
- yield rank
- barrier()
- @contextmanager
- def timer(name, ndigits=2, sync_gpu=True):
- if sync_gpu:
- torch.cuda.synchronize()
- start = time.time()
- yield
- if sync_gpu:
- torch.cuda.synchronize()
- stop = time.time()
- elapsed = round(stop - start, ndigits)
- logging.info(f'TIMER {name} {elapsed}')
- def setup_logging(log_all_ranks=True, log_file=os.devnull):
- """
- Configures logging.
- By default logs from all workers are printed to the console, entries are
- prefixed with "N: " where N is the rank of the worker. Logs printed to the
- console don't include timestaps.
- Full logs with timestamps are saved to the log_file file.
- """
- class RankFilter(logging.Filter):
- def __init__(self, rank, log_all_ranks):
- self.rank = rank
- self.log_all_ranks = log_all_ranks
- def filter(self, record):
- record.rank = self.rank
- if self.log_all_ranks:
- return True
- else:
- return (self.rank == 0)
- rank = get_rank()
- rank_filter = RankFilter(rank, log_all_ranks)
- for handler in logging.root.handlers[:]:
- logging.root.removeHandler(handler)
- handler.close()
- logging_format = "%(asctime)s - %(levelname)s - %(rank)s - %(message)s"
- logging.basicConfig(level=logging.DEBUG,
- format=logging_format,
- datefmt="%Y-%m-%d %H:%M:%S",
- filename=log_file,
- filemode='w')
- console = logging.StreamHandler(sys.stdout)
- console.setLevel(logging.INFO)
- formatter = logging.Formatter('%(rank)s: %(message)s')
- console.setFormatter(formatter)
- logging.getLogger('').addHandler(console)
- logging.getLogger('').addFilter(rank_filter)
- def setup_dllogger(enabled=True, filename=os.devnull):
- rank = get_rank()
- if enabled and rank == 0:
- backends = [
- dllogger.JSONStreamBackend(
- dllogger.Verbosity.VERBOSE,
- filename,
- ),
- ]
- dllogger.init(backends)
- else:
- dllogger.init([])
- dllogger.metadata("test_bleu", {"unit": None})
- dllogger.metadata("eval_90%_latency", {"unit": "ms"})
- dllogger.metadata("eval_avg_latency", {"unit": "ms"})
- dllogger.metadata("train_elapsed", {"unit": "s"})
- dllogger.metadata("eval_throughput", {"unit": "tokens/s"})
- dllogger.metadata("train_throughput", {"unit": "tokens/s"})
- def set_device(cuda, local_rank):
- """
- Sets device based on local_rank and returns instance of torch.device.
- :param cuda: if True: use cuda
- :param local_rank: local rank of the worker
- """
- if cuda:
- torch.cuda.set_device(local_rank)
- device = torch.device('cuda')
- else:
- device = torch.device('cpu')
- return device
- def init_distributed(cuda):
- """
- Initializes distributed backend.
- :param cuda: (bool) if True initializes nccl backend, if False initializes
- gloo backend
- """
- world_size = int(os.environ.get('WORLD_SIZE', 1))
- distributed = (world_size > 1)
- if distributed:
- backend = 'nccl' if cuda else 'gloo'
- dist.init_process_group(backend=backend,
- init_method='env://')
- assert dist.is_initialized()
- return distributed
- def log_env_info():
- """
- Prints information about execution environment.
- """
- logging.info('Collecting environment information...')
- env_info = torch.utils.collect_env.get_pretty_env_info()
- logging.info(f'{env_info}')
- def pad_vocabulary(math):
- if math == 'tf32' or math == 'fp16' or math == 'manual_fp16':
- pad_vocab = 8
- elif math == 'fp32':
- pad_vocab = 1
- return pad_vocab
- def benchmark(test_acc, target_acc, test_perf, target_perf):
- def test(achieved, target, name):
- passed = True
- if target is not None and achieved is not None:
- logging.info(f'{name} achieved: {achieved:.2f} '
- f'target: {target:.2f}')
- if achieved >= target:
- logging.info(f'{name} test passed')
- else:
- logging.info(f'{name} test failed')
- passed = False
- return passed
- passed = True
- passed &= test(test_acc, target_acc, 'Accuracy')
- passed &= test(test_perf, target_perf, 'Performance')
- return passed
- def debug_tensor(tensor, name):
- """
- Simple utility which helps with debugging.
- Takes a tensor and outputs: min, max, avg, std, number of NaNs, number of
- INFs.
- :param tensor: torch tensor
- :param name: name of the tensor (only for logging)
- """
- logging.info(name)
- tensor = tensor.detach().float().cpu().numpy()
- logging.info(f'MIN: {tensor.min()} MAX: {tensor.max()} '
- f'AVG: {tensor.mean()} STD: {tensor.std()} '
- f'NAN: {np.isnan(tensor).sum()} INF: {np.isinf(tensor).sum()}')
- class AverageMeter:
- """
- Computes and stores the average and current value
- """
- def __init__(self, warmup=0, keep=False):
- self.reset()
- self.warmup = warmup
- self.keep = keep
- def reset(self):
- self.val = 0
- self.avg = 0
- self.sum = 0
- self.count = 0
- self.iters = 0
- self.vals = []
- def update(self, val, n=1):
- self.iters += 1
- self.val = val
- if self.iters > self.warmup:
- self.sum += val * n
- self.count += n
- self.avg = self.sum / self.count
- if self.keep:
- self.vals.append(val)
- def reduce(self, op):
- """
- Reduces average value over all workers.
- :param op: 'sum' or 'mean', reduction operator
- """
- if op not in ('sum', 'mean'):
- raise NotImplementedError
- distributed = (get_world_size() > 1)
- if distributed:
- backend = dist.get_backend()
- cuda = (backend == dist.Backend.NCCL)
- if cuda:
- avg = torch.cuda.FloatTensor([self.avg])
- _sum = torch.cuda.FloatTensor([self.sum])
- else:
- avg = torch.FloatTensor([self.avg])
- _sum = torch.FloatTensor([self.sum])
- dist.all_reduce(avg)
- dist.all_reduce(_sum)
- self.avg = avg.item()
- self.sum = _sum.item()
- if op == 'mean':
- self.avg /= get_world_size()
- self.sum /= get_world_size()
|