deployer.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. #!/usr/bin/python
  2. # Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import argparse
  16. import json
  17. import os
  18. import sys
  19. import torch
  20. import numpy as np
  21. from dlrm.data.datasets import SyntheticDataset
  22. from dlrm.model.distributed import DistributedDlrm
  23. from dlrm.utils.checkpointing.distributed import make_distributed_checkpoint_loader
  24. from dlrm.utils.distributed import get_gpu_batch_sizes, get_device_mapping, is_main_process
  25. from triton import deployer_lib
  26. sys.path.append('../')
  27. def get_model_args(model_args):
  28. parser = argparse.ArgumentParser()
  29. parser.add_argument("--batch_size", default=1, type=int)
  30. parser.add_argument("--fp16", action="store_true", default=False)
  31. parser.add_argument("--dump_perf_data", type=str, default=None)
  32. parser.add_argument("--model_checkpoint", type=str, default=None)
  33. parser.add_argument("--num_numerical_features", type=int, default=13)
  34. parser.add_argument("--embedding_dim", type=int, default=128)
  35. parser.add_argument("--embedding_type", type=str, default="joint", choices=["joint", "multi_table"])
  36. parser.add_argument("--top_mlp_sizes", type=int, nargs="+",
  37. default=[1024, 1024, 512, 256, 1])
  38. parser.add_argument("--bottom_mlp_sizes", type=int, nargs="+",
  39. default=[512, 256, 128])
  40. parser.add_argument("--interaction_op", type=str, default="dot",
  41. choices=["dot", "cat"])
  42. parser.add_argument("--cpu", default=False, action="store_true")
  43. parser.add_argument("--dataset", type=str, required=True)
  44. return parser.parse_args(model_args)
  45. def initialize_model(args, categorical_sizes, device_mapping):
  46. ''' return model, ready to trace '''
  47. device = "cuda:0" if not args.cpu else "cpu"
  48. model_config = {
  49. 'top_mlp_sizes': args.top_mlp_sizes,
  50. 'bottom_mlp_sizes': args.bottom_mlp_sizes,
  51. 'embedding_dim': args.embedding_dim,
  52. 'interaction_op': args.interaction_op,
  53. 'categorical_feature_sizes': categorical_sizes,
  54. 'num_numerical_features': args.num_numerical_features,
  55. 'embedding_type': args.embedding_type,
  56. 'hash_indices': False,
  57. 'use_cpp_mlp': False,
  58. 'fp16': args.fp16,
  59. 'device': device,
  60. }
  61. model = DistributedDlrm.from_dict(model_config)
  62. model.to(device)
  63. if args.model_checkpoint:
  64. checkpoint_loader = make_distributed_checkpoint_loader(device_mapping=device_mapping, rank=0)
  65. checkpoint_loader.load_checkpoint(model, args.model_checkpoint)
  66. model.to(device)
  67. if args.fp16:
  68. model = model.half()
  69. return model
  70. def get_dataloader(args, categorical_sizes):
  71. dataset_test = SyntheticDataset(num_entries=2000,
  72. batch_size=args.batch_size,
  73. numerical_features=args.num_numerical_features,
  74. categorical_feature_sizes=categorical_sizes,
  75. device="cpu" if args.cpu else "cuda:0")
  76. class RemoveOutput:
  77. def __init__(self, dataset):
  78. self.dataset = dataset
  79. def __getitem__(self, idx):
  80. value = self.dataset[idx]
  81. if args.fp16:
  82. value = (value[0].half(), value[1].long(), value[2])
  83. else:
  84. value = (value[0], value[1].long(), value[2])
  85. return value[:-1]
  86. def __len__(self):
  87. return len(self.dataset)
  88. test_loader = torch.utils.data.DataLoader(RemoveOutput(dataset_test),
  89. batch_size=None,
  90. num_workers=0,
  91. pin_memory=False)
  92. return test_loader
  93. def main():
  94. # deploys and returns removed deployer arguments
  95. deployer, model_args = deployer_lib.create_deployer(sys.argv[1:],
  96. get_model_args)
  97. with open(os.path.join(model_args.dataset, "model_size.json")) as f:
  98. categorical_sizes = list(json.load(f).values())
  99. categorical_sizes = [s + 1 for s in categorical_sizes]
  100. categorical_sizes = np.array(categorical_sizes)
  101. device_mapping = get_device_mapping(categorical_sizes, num_gpus=1)
  102. categorical_sizes = categorical_sizes[device_mapping['embedding'][0]].tolist()
  103. model = initialize_model(model_args, categorical_sizes, device_mapping)
  104. dataloader = get_dataloader(model_args, categorical_sizes)
  105. if model_args.dump_perf_data:
  106. input_0, input_1 = next(iter(dataloader))
  107. if model_args.fp16:
  108. input_0 = input_0.half()
  109. os.makedirs(model_args.dump_perf_data, exist_ok=True)
  110. input_0.detach().cpu().numpy()[0].tofile(os.path.join(model_args.dump_perf_data, "input__0"))
  111. input_1.detach().cpu().numpy()[0].tofile(os.path.join(model_args.dump_perf_data, "input__1"))
  112. deployer.deploy(dataloader, model)
  113. if __name__=='__main__':
  114. main()