transcode.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. # Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from argparse import ArgumentParser
  15. import os
  16. import torch
  17. import pandas as pd
  18. from feature_spec import FeatureSpec
  19. from neumf_constants import USER_CHANNEL_NAME, ITEM_CHANNEL_NAME, LABEL_CHANNEL_NAME
  20. def parse_args():
  21. parser = ArgumentParser()
  22. parser.add_argument('--path', type=str, default='',
  23. help='Path to input data directory')
  24. parser.add_argument('--feature_spec_in', type=str, default='feature_spec.yaml',
  25. help='Name of the input feature specification file, or path relative to data directory.')
  26. parser.add_argument('--output', type=str, default='/data',
  27. help='Path to output data directory')
  28. parser.add_argument('--feature_spec_out', type=str, default='feature_spec.yaml',
  29. help='Name of the output feature specification file, or path relative to data directory.')
  30. return parser.parse_args()
  31. def main():
  32. args = parse_args()
  33. args_output = args.output
  34. args_path = args.path
  35. args_feature_spec_in = args.feature_spec_in
  36. args_feature_spec_out = args.feature_spec_out
  37. feature_spec_path = os.path.join(args_path, args_feature_spec_in)
  38. feature_spec = FeatureSpec.from_yaml(feature_spec_path)
  39. # Only three features are transcoded - this is NCF specific
  40. user_feature_name = feature_spec.channel_spec[USER_CHANNEL_NAME][0]
  41. item_feature_name = feature_spec.channel_spec[ITEM_CHANNEL_NAME][0]
  42. label_feature_name = feature_spec.channel_spec[LABEL_CHANNEL_NAME][0]
  43. categorical_features = [user_feature_name, item_feature_name]
  44. found_cardinalities = {f: 0 for f in categorical_features}
  45. new_source_spec = {}
  46. for mapping_name, mapping in feature_spec.source_spec.items():
  47. # Load all chunks and link into one df
  48. chunk_dfs = []
  49. for chunk in mapping:
  50. assert chunk['type'] == 'csv', "Only csv files supported in this transcoder"
  51. file_dfs = []
  52. for file in chunk['files']:
  53. path_to_load = os.path.join(feature_spec.base_directory, file)
  54. file_dfs.append(pd.read_csv(path_to_load, header=None))
  55. chunk_df = pd.concat(file_dfs, ignore_index=True)
  56. chunk_df.columns = chunk['features']
  57. chunk_df.reset_index(drop=True, inplace=True)
  58. chunk_dfs.append(chunk_df)
  59. mapping_df = pd.concat(chunk_dfs, axis=1) # This takes care of making sure feature names are unique
  60. for feature in categorical_features:
  61. mapping_cardinality = mapping_df[feature].max() + 1
  62. previous_cardinality = found_cardinalities[feature]
  63. found_cardinalities[feature] = max(previous_cardinality, mapping_cardinality)
  64. # We group together users and items, while separating labels. This is because of the target dtypes: ids are int,
  65. # while labels are float to compute loss.
  66. ints_tensor = torch.from_numpy(mapping_df[[user_feature_name, item_feature_name]].values).long()
  67. ints_file = f"{mapping_name}_data_0.pt"
  68. ints_chunk = {"type": "torch_tensor",
  69. "features": [user_feature_name, item_feature_name],
  70. "files": [ints_file]}
  71. torch.save(ints_tensor, os.path.join(args_output, ints_file))
  72. floats_tensor = torch.from_numpy(mapping_df[[label_feature_name]].values).float()
  73. floats_file = f"{mapping_name}_data_1.pt"
  74. floats_chunk = {"type": "torch_tensor",
  75. "features": [label_feature_name],
  76. "files": [floats_file]}
  77. torch.save(floats_tensor, os.path.join(args_output, floats_file))
  78. new_source_spec[mapping_name] = [ints_chunk, floats_chunk]
  79. for feature in categorical_features:
  80. found_cardinality = found_cardinalities[feature]
  81. declared_cardinality = feature_spec.feature_spec[feature].get('cardinality', 'auto')
  82. if declared_cardinality != "auto":
  83. declared = int(declared_cardinality)
  84. assert declared >= found_cardinality, "Specified cardinality conflicts data"
  85. found_cardinalities[feature] = declared
  86. new_inner_feature_spec = {
  87. user_feature_name: {
  88. "dtype": "torch.int64",
  89. "cardinality": int(found_cardinalities[user_feature_name])
  90. },
  91. item_feature_name: {
  92. "dtype": "torch.int64",
  93. "cardinality": int(found_cardinalities[item_feature_name])
  94. },
  95. label_feature_name: {
  96. "dtype": "torch.float32"
  97. }
  98. }
  99. new_feature_spec = FeatureSpec(feature_spec=new_inner_feature_spec,
  100. source_spec=new_source_spec,
  101. channel_spec=feature_spec.channel_spec,
  102. metadata=feature_spec.metadata,
  103. base_directory="")
  104. feature_spec_save_path = os.path.join(args_output, args_feature_spec_out)
  105. new_feature_spec.to_yaml(output_path=feature_spec_save_path)
  106. if __name__ == '__main__':
  107. main()