split_dataset.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. # Copyright (c) 2021 NVIDIA CORPORATION. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import argparse
  15. import json
  16. import os
  17. import math
  18. from tqdm import tqdm
  19. import numpy as np
  20. from typing import Sequence
  21. # Workaround to avoid duplicating code from the main module, without building it outright.
  22. import sys
  23. sys.path.append('/workspace/dlrm')
  24. from feature_spec import FeatureSpec, get_categorical_feature_type
  25. def split_binary_file(
  26. binary_file_path: str,
  27. output_dir: str,
  28. categorical_feature_sizes: Sequence[int],
  29. num_numerical_features: int,
  30. batch_size: int,
  31. source_data_type: str = 'int32',
  32. ):
  33. record_width = 1 + num_numerical_features + len(categorical_feature_sizes) # label + numerical + categorical
  34. bytes_per_feature = np.__dict__[source_data_type]().nbytes
  35. bytes_per_entry = record_width * bytes_per_feature
  36. total_size = os.path.getsize(binary_file_path)
  37. batches_num = int(math.ceil((total_size // bytes_per_entry) / batch_size))
  38. cat_feature_types = [get_categorical_feature_type(cat_size) for cat_size in categorical_feature_sizes]
  39. file_streams = []
  40. try:
  41. input_data_f = open(binary_file_path, "rb")
  42. file_streams.append(input_data_f)
  43. numerical_f = open(os.path.join(output_dir, "numerical.bin"), "wb+")
  44. file_streams.append(numerical_f)
  45. label_f = open(os.path.join(output_dir, 'label.bin'), 'wb+')
  46. file_streams.append(label_f)
  47. categorical_fs = []
  48. for i in range(len(categorical_feature_sizes)):
  49. fs = open(os.path.join(output_dir, f'cat_{i}.bin'), 'wb+')
  50. categorical_fs.append(fs)
  51. file_streams.append(fs)
  52. for _ in tqdm(range(batches_num)):
  53. raw_data = np.frombuffer(input_data_f.read(bytes_per_entry * batch_size), dtype=np.int32)
  54. batch_data = raw_data.reshape(-1, record_width)
  55. numerical_features = batch_data[:, 1:1 + num_numerical_features].view(dtype=np.float32)
  56. numerical_f.write(numerical_features.astype(np.float16).tobytes())
  57. label = batch_data[:, 0]
  58. label_f.write(label.astype(bool).tobytes())
  59. cat_offset = num_numerical_features + 1
  60. for cat_idx, cat_feature_type in enumerate(cat_feature_types):
  61. cat_data = batch_data[:, (cat_idx + cat_offset):(cat_idx + cat_offset + 1)].astype(cat_feature_type)
  62. categorical_fs[cat_idx].write(cat_data.tobytes())
  63. finally:
  64. for stream in file_streams:
  65. stream.close()
  66. def split_dataset(dataset_dir: str, output_dir: str, batch_size: int, numerical_features: int):
  67. categorical_sizes_file = os.path.join(dataset_dir, "model_size.json")
  68. with open(categorical_sizes_file) as f:
  69. # model_size.json contains the max value of each feature instead of the cardinality.
  70. # For feature spec this is changed for consistency and clarity.
  71. categorical_cardinalities = [int(v)+1 for v in json.load(f).values()]
  72. train_file = os.path.join(dataset_dir, "train_data.bin")
  73. test_file = os.path.join(dataset_dir, "test_data.bin")
  74. val_file = os.path.join(dataset_dir, "validation_data.bin")
  75. target_train = os.path.join(output_dir, "train")
  76. target_test = os.path.join(output_dir, "test")
  77. target_val = os.path.join(output_dir, "validation")
  78. os.makedirs(output_dir, exist_ok=True)
  79. os.makedirs(target_train, exist_ok=True)
  80. os.makedirs(target_test, exist_ok=True)
  81. os.makedirs(target_val, exist_ok=True)
  82. # VALIDATION chunk is ignored in feature spec on purpose
  83. feature_spec = FeatureSpec.get_default_feature_spec(number_of_numerical_features=numerical_features,
  84. categorical_feature_cardinalities=categorical_cardinalities)
  85. feature_spec.to_yaml(os.path.join(output_dir, 'feature_spec.yaml'))
  86. split_binary_file(test_file, target_test, categorical_cardinalities, numerical_features, batch_size)
  87. split_binary_file(train_file, target_train, categorical_cardinalities, numerical_features, batch_size)
  88. split_binary_file(val_file, target_val, categorical_cardinalities, numerical_features, batch_size)
  89. if __name__ == '__main__':
  90. parser = argparse.ArgumentParser()
  91. parser.add_argument('--dataset', type=str, required=True)
  92. parser.add_argument('--output', type=str, required=True)
  93. parser.add_argument('--batch_size', type=int, default=32768)
  94. parser.add_argument('--numerical_features', type=int, default=13)
  95. args = parser.parse_args()
  96. split_dataset(
  97. dataset_dir=args.dataset,
  98. output_dir=args.output,
  99. batch_size=args.batch_size,
  100. numerical_features=args.numerical_features
  101. )