transcode.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. # Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from argparse import ArgumentParser
  15. import os
  16. import numpy as np
  17. import pandas as pd
  18. import pyarrow as pa
  19. import pyarrow.parquet as pq
  20. from data.feature_spec import FeatureSpec, FEATURES_SELECTOR, TYPE_SELECTOR, FILES_SELECTOR
  21. from data.outbrain.defaults import MULTIHOT_CHANNEL, PARQUET_TYPE
  22. def parse_args():
  23. parser = ArgumentParser()
  24. parser.add_argument('--input', type=str, default='',
  25. help='Path to input data directory')
  26. parser.add_argument('--feature_spec_in', type=str, default='feature_spec.yaml',
  27. help='Name of the input feature specification file')
  28. parser.add_argument('--output', type=str, default='/data',
  29. help='Path to output data directory')
  30. parser.add_argument('--feature_spec_out', type=str, default='feature_spec.yaml',
  31. help='Name of the output feature specification file')
  32. parser.add_argument('--chunk_size', type=int, default=65536,
  33. help='Number of rows to write out per partition')
  34. parser.add_argument('--minimum_partition_number', type=int, default=8,
  35. help='throw error if each mapping does not produce at least this many partitions')
  36. return parser.parse_args()
  37. def check_only_one_file_per_chunk(feature_spec):
  38. for mapping in feature_spec.source_spec.values():
  39. for chunk in mapping:
  40. chunk_files = chunk[FILES_SELECTOR]
  41. assert len(chunk_files) == 1
  42. assert chunk[TYPE_SELECTOR] == 'csv'
  43. def main():
  44. args = parse_args()
  45. args_output = args.output
  46. args_input = args.input
  47. args_feature_spec_in = args.feature_spec_in
  48. args_feature_spec_out = args.feature_spec_out
  49. batch_size = args.chunk_size
  50. fspec_in_path = os.path.join(args_input, args_feature_spec_in)
  51. fspec_in = FeatureSpec.from_yaml(fspec_in_path)
  52. os.makedirs(args.output, exist_ok=True)
  53. paths_per_mapping = dict()
  54. check_only_one_file_per_chunk(fspec_in)
  55. for mapping_name, mapping in fspec_in.source_spec.items():
  56. paths_per_mapping[mapping_name]=[]
  57. df_iterators = []
  58. for chunk in mapping:
  59. # We checked earlier it's a single file chunk
  60. path_to_load = os.path.join(fspec_in.base_directory, chunk[FILES_SELECTOR][0])
  61. chunk_iterator = pd.read_csv(path_to_load, header=None, chunksize=batch_size, names=chunk[FEATURES_SELECTOR])
  62. df_iterators.append(chunk_iterator)
  63. zipped = zip(*df_iterators)
  64. # writer = None
  65. for chunk_id, chunks in enumerate(zipped):
  66. # chunks is now a list of the chunk_id-th segment of each dataframe iterator and contains all columns
  67. mapping_df = pd.concat(chunks, axis=1) # This takes care of making sure feature names are unique
  68. #transform multihots from strings to objects # TODO: find a better way to do this
  69. multihot_features = fspec_in.get_names_by_channel(MULTIHOT_CHANNEL)
  70. for feature in multihot_features:
  71. mapping_df[feature] = mapping_df[feature].apply(lambda x: np.fromstring(x[1:-1], sep=' ,'))
  72. # prepare path
  73. partition_path = f"{mapping_name}_{chunk_id}.parquet"
  74. paths_per_mapping[mapping_name].append(partition_path)
  75. partition_path_abs = os.path.join(args.output, partition_path)
  76. #write to parquet
  77. mapping_table = pa.Table.from_pandas(mapping_df)
  78. pq.write_table(mapping_table, partition_path_abs)
  79. # Prepare the new feature spec
  80. new_source_spec = {}
  81. old_source_spec = fspec_in.source_spec
  82. for mapping_name in old_source_spec.keys():
  83. #check if we met the required partitions number
  84. min_partitions = args.minimum_partition_number
  85. got_partitions = len(paths_per_mapping[mapping_name])
  86. assert got_partitions>min_partitions, f"Not enough partitions generated for mapping:{mapping_name}. Expected at least {min_partitions}, got {got_partitions}"
  87. all_features = []
  88. for chunk in old_source_spec[mapping_name]:
  89. all_features = all_features + chunk[FEATURES_SELECTOR]
  90. new_source_spec[mapping_name] = []
  91. new_source_spec[mapping_name].append({TYPE_SELECTOR: PARQUET_TYPE,
  92. FEATURES_SELECTOR: all_features,
  93. FILES_SELECTOR: paths_per_mapping[mapping_name]})
  94. fspec_out = FeatureSpec(feature_spec=fspec_in.feature_spec, source_spec=new_source_spec,
  95. channel_spec=fspec_in.channel_spec, metadata=fspec_in.metadata)
  96. fspec_out.base_directory = args.output
  97. feature_spec_save_path = os.path.join(args_output, args_feature_spec_out)
  98. fspec_out.to_yaml(output_path=feature_spec_save_path)
  99. if __name__ == '__main__':
  100. main()