transcode.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. # Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from argparse import ArgumentParser
  15. import os
  16. from collections import defaultdict
  17. import numpy as np
  18. import pandas as pd
  19. from .feature_spec import FeatureSpec, get_categorical_feature_type
  20. from .defaults import CATEGORICAL_CHANNEL, NUMERICAL_CHANNEL, LABEL_CHANNEL, CARDINALITY_SELECTOR
  21. def parse_args():
  22. parser = ArgumentParser()
  23. parser.add_argument('--input', type=str, default='',
  24. help='Path to input data directory')
  25. parser.add_argument('--feature_spec_in', type=str, default='feature_spec.yaml',
  26. help='Name of the input feature specification file')
  27. parser.add_argument('--output', type=str, default='/data',
  28. help='Path to output data directory')
  29. parser.add_argument('--feature_spec_out', type=str, default='feature_spec.yaml',
  30. help='Name of the output feature specification file')
  31. parser.add_argument('--chunk_size', type=int, default=65536)
  32. return parser.parse_args()
  33. def main():
  34. args = parse_args()
  35. args_output = args.output
  36. args_input = args.input
  37. args_feature_spec_in = args.feature_spec_in
  38. args_feature_spec_out = args.feature_spec_out
  39. batch_size = args.chunk_size
  40. fspec_in_path = os.path.join(args_input, args_feature_spec_in)
  41. fspec_in = FeatureSpec.from_yaml(fspec_in_path)
  42. input_label_feature_name = fspec_in.channel_spec[LABEL_CHANNEL][0]
  43. input_numerical_features_list = fspec_in.channel_spec[NUMERICAL_CHANNEL]
  44. input_categorical_features_list = fspec_in.channel_spec[CATEGORICAL_CHANNEL]
  45. # Do a pass to establish the cardinalities: they influence the type we save the dataset as
  46. found_cardinalities = defaultdict(lambda: 0)
  47. for mapping_name, mapping in fspec_in.source_spec.items():
  48. df_iterators = []
  49. for chunk in mapping:
  50. assert chunk['type'] == 'csv', "Only csv files supported in this transcoder"
  51. assert len(chunk['files']) == 1, "Only one file per chunk supported in this transcoder"
  52. path_to_load = os.path.join(fspec_in.base_directory, chunk['files'][0])
  53. chunk_iterator = pd.read_csv(path_to_load, header=None, chunksize=batch_size, names=chunk['features'])
  54. df_iterators.append(chunk_iterator)
  55. zipped = zip(*df_iterators)
  56. for chunks in zipped:
  57. mapping_df = pd.concat(chunks, axis=1)
  58. for feature in input_categorical_features_list:
  59. mapping_cardinality = mapping_df[feature].max() + 1
  60. previous_cardinality = found_cardinalities[feature]
  61. found_cardinalities[feature] = max(previous_cardinality, mapping_cardinality)
  62. for feature in input_categorical_features_list:
  63. declared_cardinality = fspec_in.feature_spec[feature][CARDINALITY_SELECTOR]
  64. if declared_cardinality == 'auto':
  65. pass
  66. else:
  67. assert int(declared_cardinality) >= found_cardinalities[feature]
  68. found_cardinalities[feature] = int(declared_cardinality)
  69. categorical_cardinalities = [found_cardinalities[f] for f in input_categorical_features_list]
  70. number_of_numerical_features = fspec_in.get_number_of_numerical_features()
  71. fspec_out = FeatureSpec.get_default_feature_spec(number_of_numerical_features=number_of_numerical_features,
  72. categorical_feature_cardinalities=categorical_cardinalities)
  73. fspec_out.base_directory = args.output
  74. for mapping_name, mapping in fspec_in.source_spec.items():
  75. # open files for outputting
  76. label_path, numerical_path, categorical_paths = fspec_out.get_mapping_paths(mapping_name)
  77. for path in [label_path, numerical_path, *categorical_paths.values()]:
  78. os.makedirs(os.path.dirname(path), exist_ok=True)
  79. output_categorical_features_list = fspec_out.get_categorical_feature_names()
  80. numerical_f = open(numerical_path, "ab+")
  81. label_f = open(label_path, "ab+")
  82. categorical_fs = [open(categorical_paths[name], "ab+") for name in output_categorical_features_list]
  83. categorical_feature_types = [get_categorical_feature_type(card) for card in categorical_cardinalities]
  84. df_iterators = []
  85. for chunk in mapping:
  86. # We checked earlier it's a single file chunk
  87. path_to_load = os.path.join(fspec_in.base_directory, chunk['files'][0])
  88. chunk_iterator = pd.read_csv(path_to_load, header=None, chunksize=batch_size, names=chunk['features'])
  89. df_iterators.append(chunk_iterator)
  90. zipped = zip(*df_iterators)
  91. for chunks in zipped:
  92. mapping_df = pd.concat(chunks, axis=1) # This takes care of making sure feature names are unique
  93. # Choose the right columns
  94. numerical_df = mapping_df[input_numerical_features_list]
  95. categorical_df = mapping_df[input_categorical_features_list]
  96. label_df = mapping_df[[input_label_feature_name]]
  97. # Append them to the binary files
  98. numerical_f.write(numerical_df.values.astype(np.float16).tobytes())
  99. label_f.write(label_df.values.astype(bool).tobytes())
  100. categorical_arr = categorical_df.values
  101. for cat_idx, cat_feature_type in enumerate(categorical_feature_types):
  102. categorical_fs[cat_idx].write(
  103. categorical_arr[:, cat_idx].astype(cat_feature_type).tobytes())
  104. feature_spec_save_path = os.path.join(args_output, args_feature_spec_out)
  105. fspec_out.to_yaml(output_path=feature_spec_save_path)
  106. if __name__ == '__main__':
  107. main()