|
|
@@ -44,8 +44,8 @@ from mpi4py import MPI
|
|
|
from neumf import ncf_model_ops
|
|
|
from input_pipeline import DataGenerator
|
|
|
|
|
|
-from logger.logger import LOGGER
|
|
|
-from logger.autologging import log_args
|
|
|
+import dllogger
|
|
|
+
|
|
|
|
|
|
def parse_args():
|
|
|
"""
|
|
|
@@ -96,8 +96,7 @@ def parse_args():
|
|
|
parser.add_argument('--loss-scale', default=8192, type=int,
|
|
|
help='Loss scale value to use when manually enabling mixed precision')
|
|
|
parser.add_argument('--checkpoint-dir', default='/data/checkpoints/', type=str,
|
|
|
- help='Path to the store the result checkpoint file for training, \
|
|
|
- or to read from for evaluation')
|
|
|
+ help='Path to the store the result checkpoint file for training')
|
|
|
parser.add_argument('--load-checkpoint-path', default=None, type=str,
|
|
|
help='Path to the checkpoint for initialization. If None will initialize with random weights')
|
|
|
parser.add_argument('--mode', choices=['train', 'test'], default='train', type=str,
|
|
|
@@ -105,11 +104,12 @@ def parse_args():
|
|
|
otherwise full training will be performed')
|
|
|
parser.add_argument('--eval-after', type=int, default=8,
|
|
|
help='Perform evaluations only after this many epochs')
|
|
|
- parser.add_argument('--verbose', action='store_true',
|
|
|
- help='Log the performance and accuracy after every epoch')
|
|
|
+ parser.add_argument('--log-path', default='log.json', type=str,
|
|
|
+ help='Path for the JSON training log')
|
|
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
+
|
|
|
def hvd_init():
|
|
|
"""
|
|
|
Initialize Horovod
|
|
|
@@ -124,6 +124,7 @@ def hvd_init():
|
|
|
print('PY', sys.version)
|
|
|
print('TF', tf.__version__)
|
|
|
|
|
|
+
|
|
|
def get_local_train_data(pos_train_users, pos_train_items, negative_samples):
|
|
|
"""
|
|
|
For distributed, split up the train data and only keep the local portion
|
|
|
@@ -148,6 +149,7 @@ def get_local_train_data(pos_train_users, pos_train_items, negative_samples):
|
|
|
|
|
|
return local_train_users, local_train_items, local_train_labels
|
|
|
|
|
|
+
|
|
|
def get_local_test_data(pos_test_users, pos_test_items):
|
|
|
"""
|
|
|
For distributed, split up the test data and only keep the local portion
|
|
|
@@ -162,17 +164,21 @@ def get_local_test_data(pos_test_users, pos_test_items):
|
|
|
|
|
|
return local_test_users, local_test_items
|
|
|
|
|
|
+
|
|
|
def main():
|
|
|
- """
|
|
|
- Run training/evaluation
|
|
|
- """
|
|
|
script_start = time.time()
|
|
|
hvd_init()
|
|
|
mpi_comm = MPI.COMM_WORLD
|
|
|
args = parse_args()
|
|
|
|
|
|
if hvd.rank() == 0:
|
|
|
- log_args(args)
|
|
|
+ dllogger.init(backends=[dllogger.JSONStreamBackend(verbosity=dllogger.Verbosity.VERBOSE,
|
|
|
+ filename=args.log_path),
|
|
|
+ dllogger.StdOutBackend(verbosity=dllogger.Verbosity.VERBOSE)])
|
|
|
+ else:
|
|
|
+ dllogger.init(backends=[])
|
|
|
+
|
|
|
+ dllogger.log(data=vars(args), step='PARAMETER')
|
|
|
|
|
|
if args.seed is not None:
|
|
|
tf.random.set_random_seed(args.seed)
|
|
|
@@ -185,11 +191,6 @@ def main():
|
|
|
and os.environ["TF_ENABLE_AUTO_MIXED_PRECISION"] == "1":
|
|
|
args.fp16 = False
|
|
|
|
|
|
- # directory to store/read final checkpoint
|
|
|
- if args.mode == 'train' and hvd.rank() == 0:
|
|
|
- print("Saving best checkpoint to {}".format(args.checkpoint_dir))
|
|
|
- elif hvd.rank() == 0:
|
|
|
- print("Reading checkpoint: {}".format(args.checkpoint_dir))
|
|
|
if not os.path.exists(args.checkpoint_dir) and args.checkpoint_dir != '':
|
|
|
os.makedirs(args.checkpoint_dir, exist_ok=True)
|
|
|
final_checkpoint_path = os.path.join(args.checkpoint_dir, 'model.ckpt')
|
|
|
@@ -317,11 +318,11 @@ def main():
|
|
|
ndcg = ndcg_sum / ndcg_cnt
|
|
|
|
|
|
if hvd.rank() == 0:
|
|
|
- LOGGER.log("Eval Time: {:.4f}, HR: {:.4f}, NDCG: {:.4f}"
|
|
|
- .format(eval_duration, hit_rate, ndcg))
|
|
|
-
|
|
|
eval_throughput = pos_test_users.shape[0] * (args.valid_negative + 1) / eval_duration
|
|
|
- LOGGER.log('Average Eval Throughput: {:.4f}'.format(eval_throughput))
|
|
|
+ dllogger.log(step=tuple(), data={'eval_throughput': eval_throughput,
|
|
|
+ 'eval_time': eval_duration,
|
|
|
+ 'hr@10': hit_rate,
|
|
|
+ 'ndcg': ndcg})
|
|
|
return
|
|
|
|
|
|
# Performance Metrics
|
|
|
@@ -345,8 +346,6 @@ def main():
|
|
|
|
|
|
# Begin training
|
|
|
begin_train = time.time()
|
|
|
- if hvd.rank() == 0:
|
|
|
- LOGGER.log("Begin Training. Setup Time: {}".format(begin_train - script_start))
|
|
|
for epoch in range(args.epochs):
|
|
|
# Train for one epoch
|
|
|
train_start = time.time()
|
|
|
@@ -364,7 +363,7 @@ def main():
|
|
|
}
|
|
|
)
|
|
|
train_duration = time.time() - train_start
|
|
|
- ## Only log "warm" epochs
|
|
|
+ # Only log "warm" epochs
|
|
|
if epoch >= 1:
|
|
|
train_times.append(train_duration)
|
|
|
# Evaluate
|
|
|
@@ -399,22 +398,16 @@ def main():
|
|
|
ndcg = global_ndcg_sum[0] / global_ndcg_count[0]
|
|
|
|
|
|
eval_duration = time.time() - eval_start
|
|
|
- ## Only log "warm" epochs
|
|
|
+ # Only log "warm" epochs
|
|
|
if epoch >= 1:
|
|
|
eval_times.append(eval_duration)
|
|
|
|
|
|
if hvd.rank() == 0:
|
|
|
- if args.verbose:
|
|
|
- log_string = "Epoch: {:02d}, Train Time: {:.4f}, Eval Time: {:.4f}, HR: {:.4f}, NDCG: {:.4f}"
|
|
|
- LOGGER.log(
|
|
|
- log_string.format(
|
|
|
- epoch,
|
|
|
- train_duration,
|
|
|
- eval_duration,
|
|
|
- hit_rate,
|
|
|
- ndcg
|
|
|
- )
|
|
|
- )
|
|
|
+ dllogger.log(step=(epoch,), data={
|
|
|
+ 'train_time': train_duration,
|
|
|
+ 'eval_time': eval_duration,
|
|
|
+ 'hr@10': hit_rate,
|
|
|
+ 'ndcg': ndcg})
|
|
|
|
|
|
# Update summary metrics
|
|
|
if hit_rate > args.target and first_to_target is None:
|
|
|
@@ -424,18 +417,6 @@ def main():
|
|
|
best_hr = hit_rate
|
|
|
best_epoch = epoch
|
|
|
time_to_best = time.time() - begin_train
|
|
|
- if not args.verbose:
|
|
|
- log_string = "New Best Epoch: {:02d}, Train Time: {:.4f}, Eval Time: {:.4f}, HR: {:.4f}, NDCG: {:.4f}"
|
|
|
- LOGGER.log(
|
|
|
- log_string.format(
|
|
|
- epoch,
|
|
|
- train_duration,
|
|
|
- eval_duration,
|
|
|
- hit_rate,
|
|
|
- ndcg
|
|
|
- )
|
|
|
- )
|
|
|
- # Save, if meets target
|
|
|
if hit_rate > args.target:
|
|
|
saver.save(sess, final_checkpoint_path)
|
|
|
|
|
|
@@ -445,26 +426,22 @@ def main():
|
|
|
train_throughputs = pos_train_users.shape[0]*(args.negative_samples+1) / train_times
|
|
|
eval_times = np.array(eval_times)
|
|
|
eval_throughputs = pos_test_users.shape[0]*(args.valid_negative+1) / eval_times
|
|
|
- LOGGER.log(' ')
|
|
|
-
|
|
|
- LOGGER.log('batch_size: {}'.format(args.batch_size))
|
|
|
- LOGGER.log('num_gpus: {}'.format(hvd.size()))
|
|
|
- LOGGER.log('AMP: {}'.format(1 if args.amp else 0))
|
|
|
- LOGGER.log('seed: {}'.format(args.seed))
|
|
|
- LOGGER.log('Minimum Train Time per Epoch: {:.4f}'.format(np.min(train_times)))
|
|
|
- LOGGER.log('Average Train Time per Epoch: {:.4f}'.format(np.mean(train_times)))
|
|
|
- LOGGER.log('Average Train Throughput: {:.4f}'.format(np.mean(train_throughputs)))
|
|
|
- LOGGER.log('Minimum Eval Time per Epoch: {:.4f}'.format(np.min(eval_times)))
|
|
|
- LOGGER.log('Average Eval Time per Epoch: {:.4f}'.format(np.mean(eval_times)))
|
|
|
- LOGGER.log('Average Eval Throughput: {:.4f}'.format(np.mean(eval_throughputs)))
|
|
|
- LOGGER.log('First Epoch to hit: {}'.format(first_to_target))
|
|
|
- LOGGER.log('Time to Train: {:.4f}'.format(time_to_train))
|
|
|
- LOGGER.log('Time to Best: {:.4f}'.format(time_to_best))
|
|
|
- LOGGER.log('Best HR: {:.4f}'.format(best_hr))
|
|
|
- LOGGER.log('Best Epoch: {}'.format(best_epoch))
|
|
|
+
|
|
|
+ dllogger.log(step=tuple(), data={
|
|
|
+ 'average_train_time_per_epoch': np.mean(train_times),
|
|
|
+ 'average_train_throughput': np.mean(train_throughputs),
|
|
|
+ 'average_eval_time_per_epoch': np.mean(eval_times),
|
|
|
+ 'average_eval_throughput': np.mean(eval_throughputs),
|
|
|
+ 'first_epoch_to_hit': first_to_target,
|
|
|
+ 'time_to_train': time_to_train,
|
|
|
+ 'time_to_best': time_to_best,
|
|
|
+ 'best_hr': best_hr,
|
|
|
+ 'best_epoch': best_epoch})
|
|
|
+ dllogger.flush()
|
|
|
|
|
|
sess.close()
|
|
|
return
|
|
|
|
|
|
+
|
|
|
if __name__ == '__main__':
|
|
|
main()
|