|
@@ -30,11 +30,10 @@
|
|
|
|
|
|
|
|
import torch.jit
|
|
import torch.jit
|
|
|
from apex.optimizers import FusedAdam
|
|
from apex.optimizers import FusedAdam
|
|
|
-import logging
|
|
|
|
|
import os
|
|
import os
|
|
|
-import sys
|
|
|
|
|
import math
|
|
import math
|
|
|
import time
|
|
import time
|
|
|
|
|
+import numpy as np
|
|
|
from argparse import ArgumentParser
|
|
from argparse import ArgumentParser
|
|
|
|
|
|
|
|
import torch
|
|
import torch
|
|
@@ -44,15 +43,11 @@ import utils
|
|
|
import dataloading
|
|
import dataloading
|
|
|
from neumf import NeuMF
|
|
from neumf import NeuMF
|
|
|
|
|
|
|
|
-from logger.logger import LOGGER, timed_block, timed_function
|
|
|
|
|
-from logger import tags
|
|
|
|
|
-from logger.autologging import log_hardware, log_args
|
|
|
|
|
|
|
+import dllogger
|
|
|
|
|
|
|
|
from apex.parallel import DistributedDataParallel as DDP
|
|
from apex.parallel import DistributedDataParallel as DDP
|
|
|
from apex import amp
|
|
from apex import amp
|
|
|
|
|
|
|
|
-LOGGER.model = 'ncf'
|
|
|
|
|
-
|
|
|
|
|
def parse_args():
|
|
def parse_args():
|
|
|
parser = ArgumentParser(description="Train a Nerual Collaborative"
|
|
parser = ArgumentParser(description="Train a Nerual Collaborative"
|
|
|
" Filtering model")
|
|
" Filtering model")
|
|
@@ -98,36 +93,29 @@ def parse_args():
|
|
|
parser.add_argument('--opt_level', default='O2', type=str,
|
|
parser.add_argument('--opt_level', default='O2', type=str,
|
|
|
help='Optimization level for Automatic Mixed Precision',
|
|
help='Optimization level for Automatic Mixed Precision',
|
|
|
choices=['O0', 'O2'])
|
|
choices=['O0', 'O2'])
|
|
|
- parser.add_argument('--local_rank', default=0, type=int, help='Necessary for multi-GPU training')
|
|
|
|
|
|
|
+ parser.add_argument('--log_path', default='log.json', type=str,
|
|
|
|
|
+ help='Path for the JSON training log')
|
|
|
return parser.parse_args()
|
|
return parser.parse_args()
|
|
|
|
|
|
|
|
|
|
|
|
|
-def init_distributed(local_rank=0):
|
|
|
|
|
- distributed = int(os.environ['WORLD_SIZE']) > 1
|
|
|
|
|
|
|
+def init_distributed(args):
|
|
|
|
|
+ args.world_size = int(os.environ['WORLD_SIZE'])
|
|
|
|
|
+ args.distributed = args.world_size > 1
|
|
|
|
|
+
|
|
|
|
|
+ if args.distributed:
|
|
|
|
|
+ args.local_rank = int(os.environ['LOCAL_RANK'])
|
|
|
|
|
|
|
|
- if distributed:
|
|
|
|
|
'''
|
|
'''
|
|
|
Set cuda device so everything is done on the right GPU.
|
|
Set cuda device so everything is done on the right GPU.
|
|
|
THIS MUST BE DONE AS SOON AS POSSIBLE.
|
|
THIS MUST BE DONE AS SOON AS POSSIBLE.
|
|
|
'''
|
|
'''
|
|
|
- torch.cuda.set_device(local_rank)
|
|
|
|
|
|
|
+ torch.cuda.set_device(args.local_rank)
|
|
|
|
|
|
|
|
'''Initialize distributed communication'''
|
|
'''Initialize distributed communication'''
|
|
|
torch.distributed.init_process_group(backend='nccl',
|
|
torch.distributed.init_process_group(backend='nccl',
|
|
|
init_method='env://')
|
|
init_method='env://')
|
|
|
- logging_logger = logging.getLogger('mlperf_compliance')
|
|
|
|
|
- if local_rank > 0:
|
|
|
|
|
- sys.stdout = open('/dev/null', 'w')
|
|
|
|
|
- sys.stderr = open('/dev/null', 'w')
|
|
|
|
|
- logging_logger.setLevel(logging.ERROR)
|
|
|
|
|
-
|
|
|
|
|
- logging_nvlogger = logging.getLogger('nv_dl_logger')
|
|
|
|
|
- if local_rank > 0:
|
|
|
|
|
- sys.stdout = open('/dev/null', 'w')
|
|
|
|
|
- sys.stderr = open('/dev/null', 'w')
|
|
|
|
|
- logging_nvlogger.setLevel(logging.ERROR)
|
|
|
|
|
-
|
|
|
|
|
- return distributed, int(os.environ['WORLD_SIZE'])
|
|
|
|
|
|
|
+ else:
|
|
|
|
|
+ args.local_rank = 0
|
|
|
|
|
|
|
|
|
|
|
|
|
def val_epoch(model, x, y, dup_mask, real_indices, K, samples_per_user, num_user,
|
|
def val_epoch(model, x, y, dup_mask, real_indices, K, samples_per_user, num_user,
|
|
@@ -152,10 +140,6 @@ def val_epoch(model, x, y, dup_mask, real_indices, K, samples_per_user, num_user
|
|
|
hits = ifzero.sum()
|
|
hits = ifzero.sum()
|
|
|
ndcg = (math.log(2) / (torch.nonzero(ifzero)[:,1].view(-1).to(torch.float)+2).log_()).sum()
|
|
ndcg = (math.log(2) / (torch.nonzero(ifzero)[:,1].view(-1).to(torch.float)+2).log_()).sum()
|
|
|
|
|
|
|
|
- LOGGER.log(key=tags.EVAL_SIZE, value={"epoch": epoch, "value": num_user * samples_per_user})
|
|
|
|
|
- LOGGER.log(key=tags.EVAL_HP_NUM_USERS, value=num_user)
|
|
|
|
|
- LOGGER.log(key=tags.EVAL_HP_NUM_NEG, value=samples_per_user - 1)
|
|
|
|
|
-
|
|
|
|
|
if distributed:
|
|
if distributed:
|
|
|
torch.distributed.all_reduce(hits, op=torch.distributed.reduce_op.SUM)
|
|
torch.distributed.all_reduce(hits, op=torch.distributed.reduce_op.SUM)
|
|
|
torch.distributed.all_reduce(ndcg, op=torch.distributed.reduce_op.SUM)
|
|
torch.distributed.all_reduce(ndcg, op=torch.distributed.reduce_op.SUM)
|
|
@@ -168,10 +152,17 @@ def val_epoch(model, x, y, dup_mask, real_indices, K, samples_per_user, num_user
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
def main():
|
|
|
- log_hardware()
|
|
|
|
|
args = parse_args()
|
|
args = parse_args()
|
|
|
- args.distributed, args.world_size = init_distributed(args.local_rank)
|
|
|
|
|
- log_args(args)
|
|
|
|
|
|
|
+ init_distributed(args)
|
|
|
|
|
+
|
|
|
|
|
+ if args.local_rank == 0:
|
|
|
|
|
+ dllogger.init(backends=[dllogger.JSONStreamBackend(verbosity=dllogger.Verbosity.VERBOSE,
|
|
|
|
|
+ filename=args.log_path),
|
|
|
|
|
+ dllogger.StdOutBackend(verbosity=dllogger.Verbosity.VERBOSE)])
|
|
|
|
|
+ else:
|
|
|
|
|
+ dllogger.init(backends=[])
|
|
|
|
|
+
|
|
|
|
|
+ dllogger.log(data=vars(args), step='PARAMETER')
|
|
|
|
|
|
|
|
if args.seed is not None:
|
|
if args.seed is not None:
|
|
|
torch.manual_seed(args.seed)
|
|
torch.manual_seed(args.seed)
|
|
@@ -180,31 +171,22 @@ def main():
|
|
|
if not os.path.exists(args.checkpoint_dir) and args.checkpoint_dir != '':
|
|
if not os.path.exists(args.checkpoint_dir) and args.checkpoint_dir != '':
|
|
|
os.makedirs(args.checkpoint_dir, exist_ok=True)
|
|
os.makedirs(args.checkpoint_dir, exist_ok=True)
|
|
|
|
|
|
|
|
- # The default of np.random.choice is replace=True, so does pytorch random_()
|
|
|
|
|
- LOGGER.log(key=tags.PREPROC_HP_SAMPLE_EVAL_REPLACEMENT, value=True)
|
|
|
|
|
- LOGGER.log(key=tags.INPUT_HP_SAMPLE_TRAIN_REPLACEMENT, value=True)
|
|
|
|
|
- LOGGER.log(key=tags.INPUT_STEP_EVAL_NEG_GEN)
|
|
|
|
|
-
|
|
|
|
|
# sync workers before timing
|
|
# sync workers before timing
|
|
|
if args.distributed:
|
|
if args.distributed:
|
|
|
torch.distributed.broadcast(torch.tensor([1], device="cuda"), 0)
|
|
torch.distributed.broadcast(torch.tensor([1], device="cuda"), 0)
|
|
|
torch.cuda.synchronize()
|
|
torch.cuda.synchronize()
|
|
|
|
|
|
|
|
main_start_time = time.time()
|
|
main_start_time = time.time()
|
|
|
- LOGGER.log(key=tags.RUN_START)
|
|
|
|
|
|
|
|
|
|
train_ratings = torch.load(args.data+'/train_ratings.pt', map_location=torch.device('cuda:{}'.format(args.local_rank)))
|
|
train_ratings = torch.load(args.data+'/train_ratings.pt', map_location=torch.device('cuda:{}'.format(args.local_rank)))
|
|
|
test_ratings = torch.load(args.data+'/test_ratings.pt', map_location=torch.device('cuda:{}'.format(args.local_rank)))
|
|
test_ratings = torch.load(args.data+'/test_ratings.pt', map_location=torch.device('cuda:{}'.format(args.local_rank)))
|
|
|
test_negs = torch.load(args.data+'/test_negatives.pt', map_location=torch.device('cuda:{}'.format(args.local_rank)))
|
|
test_negs = torch.load(args.data+'/test_negatives.pt', map_location=torch.device('cuda:{}'.format(args.local_rank)))
|
|
|
|
|
|
|
|
valid_negative = test_negs.shape[1]
|
|
valid_negative = test_negs.shape[1]
|
|
|
- LOGGER.log(key=tags.PREPROC_HP_NUM_EVAL, value=valid_negative)
|
|
|
|
|
-
|
|
|
|
|
|
|
|
|
|
nb_maxs = torch.max(train_ratings, 0)[0]
|
|
nb_maxs = torch.max(train_ratings, 0)[0]
|
|
|
nb_users = nb_maxs[0].item() + 1
|
|
nb_users = nb_maxs[0].item() + 1
|
|
|
nb_items = nb_maxs[1].item() + 1
|
|
nb_items = nb_maxs[1].item() + 1
|
|
|
- LOGGER.log(key=tags.INPUT_SIZE, value=len(train_ratings))
|
|
|
|
|
|
|
|
|
|
all_test_users = test_ratings.shape[0]
|
|
all_test_users = test_ratings.shape[0]
|
|
|
|
|
|
|
@@ -213,9 +195,6 @@ def main():
|
|
|
# make pytorch memory behavior more consistent later
|
|
# make pytorch memory behavior more consistent later
|
|
|
torch.cuda.empty_cache()
|
|
torch.cuda.empty_cache()
|
|
|
|
|
|
|
|
- LOGGER.log(key=tags.INPUT_BATCH_SIZE, value=args.batch_size)
|
|
|
|
|
- LOGGER.log(key=tags.INPUT_ORDER) # we shuffled later with randperm
|
|
|
|
|
-
|
|
|
|
|
# Create model
|
|
# Create model
|
|
|
model = NeuMF(nb_users, nb_items,
|
|
model = NeuMF(nb_users, nb_items,
|
|
|
mf_dim=args.factors,
|
|
mf_dim=args.factors,
|
|
@@ -243,12 +222,6 @@ def main():
|
|
|
|
|
|
|
|
print(model)
|
|
print(model)
|
|
|
print("{} parameters".format(utils.count_parameters(model)))
|
|
print("{} parameters".format(utils.count_parameters(model)))
|
|
|
- LOGGER.log(key=tags.OPT_LR, value=args.learning_rate)
|
|
|
|
|
- LOGGER.log(key=tags.OPT_NAME, value="Adam")
|
|
|
|
|
- LOGGER.log(key=tags.OPT_HP_ADAM_BETA1, value=args.beta1)
|
|
|
|
|
- LOGGER.log(key=tags.OPT_HP_ADAM_BETA2, value=args.beta2)
|
|
|
|
|
- LOGGER.log(key=tags.OPT_HP_ADAM_EPSILON, value=args.eps)
|
|
|
|
|
- LOGGER.log(key=tags.MODEL_HP_LOSS_FN, value=tags.VALUE_BCE)
|
|
|
|
|
|
|
|
|
|
if args.load_checkpoint_path:
|
|
if args.load_checkpoint_path:
|
|
|
state_dict = torch.load(args.load_checkpoint_path)
|
|
state_dict = torch.load(args.load_checkpoint_path)
|
|
@@ -256,33 +229,24 @@ def main():
|
|
|
model.load_state_dict(state_dict)
|
|
model.load_state_dict(state_dict)
|
|
|
|
|
|
|
|
if args.mode == 'test':
|
|
if args.mode == 'test':
|
|
|
- LOGGER.log(key=tags.EVAL_START, value=0)
|
|
|
|
|
start = time.time()
|
|
start = time.time()
|
|
|
hr, ndcg = val_epoch(model, test_users, test_items, dup_mask, real_indices, args.topk,
|
|
hr, ndcg = val_epoch(model, test_users, test_items, dup_mask, real_indices, args.topk,
|
|
|
samples_per_user=valid_negative + 1,
|
|
samples_per_user=valid_negative + 1,
|
|
|
num_user=all_test_users, distributed=args.distributed)
|
|
num_user=all_test_users, distributed=args.distributed)
|
|
|
- print('HR@{K} = {hit_rate:.4f}, NDCG@{K} = {ndcg:.4f}'
|
|
|
|
|
- .format(K=args.topk, hit_rate=hr, ndcg=ndcg))
|
|
|
|
|
val_time = time.time() - start
|
|
val_time = time.time() - start
|
|
|
eval_size = all_test_users * (valid_negative + 1)
|
|
eval_size = all_test_users * (valid_negative + 1)
|
|
|
eval_throughput = eval_size / val_time
|
|
eval_throughput = eval_size / val_time
|
|
|
|
|
|
|
|
- LOGGER.log(key=tags.EVAL_ACCURACY, value={"epoch": 0, "value": hr})
|
|
|
|
|
- LOGGER.log(key=tags.EVAL_STOP, value=0)
|
|
|
|
|
- LOGGER.log(key='best_eval_throughput', value=eval_throughput)
|
|
|
|
|
|
|
+ dllogger.log(step=tuple(), data={'best_eval_throughput' : eval_throughput,
|
|
|
|
|
+ 'hr@10' : hr})
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
- success = False
|
|
|
|
|
max_hr = 0
|
|
max_hr = 0
|
|
|
|
|
+ best_epoch = 0
|
|
|
train_throughputs, eval_throughputs = [], []
|
|
train_throughputs, eval_throughputs = [], []
|
|
|
|
|
|
|
|
- LOGGER.log(key=tags.TRAIN_LOOP)
|
|
|
|
|
for epoch in range(args.epochs):
|
|
for epoch in range(args.epochs):
|
|
|
|
|
|
|
|
- LOGGER.log(key=tags.TRAIN_EPOCH_START, value=epoch)
|
|
|
|
|
- LOGGER.log(key=tags.INPUT_HP_NUM_NEG, value=args.negative_samples)
|
|
|
|
|
- LOGGER.log(key=tags.INPUT_STEP_TRAIN_NEG_GEN)
|
|
|
|
|
-
|
|
|
|
|
begin = time.time()
|
|
begin = time.time()
|
|
|
|
|
|
|
|
epoch_users, epoch_items, epoch_label = dataloading.prepare_epoch_train_data(train_ratings, nb_items, args)
|
|
epoch_users, epoch_items, epoch_label = dataloading.prepare_epoch_train_data(train_ratings, nb_items, args)
|
|
@@ -315,32 +279,28 @@ def main():
|
|
|
epoch_samples = len(train_ratings) * (args.negative_samples + 1)
|
|
epoch_samples = len(train_ratings) * (args.negative_samples + 1)
|
|
|
train_throughput = epoch_samples / train_time
|
|
train_throughput = epoch_samples / train_time
|
|
|
train_throughputs.append(train_throughput)
|
|
train_throughputs.append(train_throughput)
|
|
|
- LOGGER.log(key='train_throughput', value=train_throughput)
|
|
|
|
|
- LOGGER.log(key=tags.TRAIN_EPOCH_STOP, value=epoch)
|
|
|
|
|
- LOGGER.log(key=tags.EVAL_START, value=epoch)
|
|
|
|
|
|
|
|
|
|
hr, ndcg = val_epoch(model, test_users, test_items, dup_mask, real_indices, args.topk,
|
|
hr, ndcg = val_epoch(model, test_users, test_items, dup_mask, real_indices, args.topk,
|
|
|
samples_per_user=valid_negative + 1,
|
|
samples_per_user=valid_negative + 1,
|
|
|
num_user=all_test_users, epoch=epoch, distributed=args.distributed)
|
|
num_user=all_test_users, epoch=epoch, distributed=args.distributed)
|
|
|
|
|
|
|
|
val_time = time.time() - begin
|
|
val_time = time.time() - begin
|
|
|
- print('Epoch {epoch}: HR@{K} = {hit_rate:.4f}, NDCG@{K} = {ndcg:.4f},'
|
|
|
|
|
- ' train_time = {train_time:.2f}, val_time = {val_time:.2f}'
|
|
|
|
|
- .format(epoch=epoch, K=args.topk, hit_rate=hr,
|
|
|
|
|
- ndcg=ndcg, train_time=train_time,
|
|
|
|
|
- val_time=val_time))
|
|
|
|
|
|
|
|
|
|
- LOGGER.log(key=tags.EVAL_ACCURACY, value={"epoch": epoch, "value": hr})
|
|
|
|
|
- LOGGER.log(key=tags.EVAL_TARGET, value=args.threshold)
|
|
|
|
|
- LOGGER.log(key=tags.EVAL_STOP, value=epoch)
|
|
|
|
|
|
|
|
|
|
eval_size = all_test_users * (valid_negative + 1)
|
|
eval_size = all_test_users * (valid_negative + 1)
|
|
|
eval_throughput = eval_size / val_time
|
|
eval_throughput = eval_size / val_time
|
|
|
eval_throughputs.append(eval_throughput)
|
|
eval_throughputs.append(eval_throughput)
|
|
|
- LOGGER.log(key='eval_throughput', value=eval_throughput)
|
|
|
|
|
|
|
+
|
|
|
|
|
+ dllogger.log(step=(epoch,),
|
|
|
|
|
+ data = {'train_throughput': train_throughput,
|
|
|
|
|
+ 'hr@10': hr,
|
|
|
|
|
+ 'train_epoch_time': train_time,
|
|
|
|
|
+ 'validation_epoch_time': val_time,
|
|
|
|
|
+ 'eval_throughput': eval_throughput})
|
|
|
|
|
|
|
|
if hr > max_hr and args.local_rank == 0:
|
|
if hr > max_hr and args.local_rank == 0:
|
|
|
max_hr = hr
|
|
max_hr = hr
|
|
|
|
|
+ best_epoch = epoch
|
|
|
save_checkpoint_path = os.path.join(args.checkpoint_dir, 'model.pth')
|
|
save_checkpoint_path = os.path.join(args.checkpoint_dir, 'model.pth')
|
|
|
print("New best hr! Saving the model to: ", save_checkpoint_path)
|
|
print("New best hr! Saving the model to: ", save_checkpoint_path)
|
|
|
torch.save(model.state_dict(), save_checkpoint_path)
|
|
torch.save(model.state_dict(), save_checkpoint_path)
|
|
@@ -349,18 +309,19 @@ def main():
|
|
|
if args.threshold is not None:
|
|
if args.threshold is not None:
|
|
|
if hr >= args.threshold:
|
|
if hr >= args.threshold:
|
|
|
print("Hit threshold of {}".format(args.threshold))
|
|
print("Hit threshold of {}".format(args.threshold))
|
|
|
- success = True
|
|
|
|
|
break
|
|
break
|
|
|
|
|
|
|
|
if args.local_rank == 0:
|
|
if args.local_rank == 0:
|
|
|
- LOGGER.log(key='best_train_throughput', value=max(train_throughputs))
|
|
|
|
|
- LOGGER.log(key='best_eval_throughput', value=max(eval_throughputs))
|
|
|
|
|
- LOGGER.log(key='best_accuracy', value=max_hr)
|
|
|
|
|
- LOGGER.log(key='time_to_target', value=time.time() - main_start_time)
|
|
|
|
|
- LOGGER.log(key='time_to_best_model', value=best_model_timestamp - main_start_time)
|
|
|
|
|
-
|
|
|
|
|
- LOGGER.log(key=tags.RUN_STOP, value={"success": success})
|
|
|
|
|
- LOGGER.log(key=tags.RUN_FINAL)
|
|
|
|
|
|
|
+ dllogger.log(data={'best_train_throughput': max(train_throughputs),
|
|
|
|
|
+ 'best_eval_throughput': max(eval_throughputs),
|
|
|
|
|
+ 'mean_train_throughput': np.mean(train_throughputs),
|
|
|
|
|
+ 'mean_eval_throughput': np.mean(eval_throughputs),
|
|
|
|
|
+ 'best_accuracy': max_hr,
|
|
|
|
|
+ 'best_epoch': best_epoch,
|
|
|
|
|
+ 'time_to_target': time.time() - main_start_time,
|
|
|
|
|
+ 'time_to_best_model': best_model_timestamp - main_start_time},
|
|
|
|
|
+ step=tuple())
|
|
|
|
|
+
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
|
main()
|
|
main()
|