sim_preprocessing.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. # Copyright (c) 2022 NVIDIA CORPORATION. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Preprocessing script for SIM models."""
  15. import logging
  16. import multiprocessing
  17. import os
  18. import click
  19. import cudf
  20. import cupy
  21. import dask.dataframe
  22. import dask_cudf
  23. import rmm
  24. from preprocessing.io import load_metadata, load_review_data, save_metadata
  25. from preprocessing.ops import ExplodeSequence, add_negative_sequence, list_slice, slice_and_pad_left
  26. DASK_TRAIN_DATASET_CHUNKSIZE = 15_000
  27. TRAIN_DATA_DIR = "train"
  28. TEST_DATA_DIR = "test"
  29. TEST_DATA_FILE = "part.0.parquet"
  30. CATEGORIZED_METADATA_FILE = "metadata.json"
  31. OUTPUT_META = {
  32. "label": "int8",
  33. "uid": "int64",
  34. "item": "int32",
  35. "cat": "int32",
  36. "item_sequence": "list",
  37. "cat_sequence": "list",
  38. "neg_item_sequence": "list",
  39. "neg_cat_sequence": "list",
  40. }
  41. logging.basicConfig(
  42. level=logging.INFO,
  43. format="[%(asctime)s] %(levelname)s: %(message)s",
  44. )
  45. def add_categorified_column(df, col_name, id_col_name):
  46. unique_values = df[col_name].unique().to_frame()
  47. unique_values[id_col_name] = cupy.arange(len(unique_values), dtype="int32") + 1
  48. df = df.merge(unique_values, how="left", on=col_name)
  49. return df
  50. def categorify_items(all_items_unique: cudf.DataFrame, metadata: cudf.DataFrame) -> cudf.DataFrame:
  51. unique_item_with_category = all_items_unique.merge(metadata, how="left", on="item")
  52. unique_item_with_category = unique_item_with_category.fillna("no_category")
  53. df = add_categorified_column(unique_item_with_category, "item", "item_id")
  54. df = add_categorified_column(df, "cat", "cat_id")
  55. return df
  56. def filter_too_short_sequences(reviews: cudf.DataFrame, min_seq_length: int) -> cudf.DataFrame:
  57. user_counts = reviews["user"].value_counts()
  58. user_counts_filtered = user_counts[user_counts >= min_seq_length]
  59. valid_users = user_counts_filtered.index
  60. reviews = reviews[reviews["user"].isin(valid_users)]
  61. reviews.reset_index(drop=True, inplace=True)
  62. return reviews
  63. def add_items_and_categories_indices(
  64. reviews: cudf.DataFrame,
  65. item_and_cat_with_ids: cudf.DataFrame,
  66. ) -> cudf.DataFrame:
  67. return reviews.merge(item_and_cat_with_ids, how="left", on="item")
  68. def categorify_users(reviews: cudf.DataFrame) -> cudf.DataFrame:
  69. return add_categorified_column(reviews, "user", "uid")
  70. def create_sampling_df(
  71. all_items: cudf.DataFrame,
  72. item_and_cat_with_ids: cudf.DataFrame
  73. ) -> cudf.DataFrame:
  74. sampling_df = all_items.merge(item_and_cat_with_ids, how="left", on="item")
  75. sampling_df = sampling_df[["item_id", "cat_id"]]
  76. sampling_df = sampling_df.sort_values(by="item_id")
  77. sampling_df.reset_index(drop=True, inplace=True)
  78. return sampling_df
  79. def aggregate_per_user(df):
  80. df = df.sort_values(by=["unixReviewTime", "item"])
  81. df = df.groupby("uid").agg({
  82. "item_id": list,
  83. "cat_id": list,
  84. })
  85. df.reset_index(inplace=True)
  86. df = df.rename(columns={
  87. "item_id": "item_sequence",
  88. "cat_id": "cat_sequence",
  89. })
  90. df["item"] = df["item_sequence"].list.get(-1)
  91. df["cat"] = df["cat_sequence"].list.get(-1)
  92. df["item_sequence"] = list_slice(df["item_sequence"], 0, -1)
  93. df["cat_sequence"] = list_slice(df["cat_sequence"], 0, -1)
  94. return df
  95. def explode_sequence(df: cudf.DataFrame, min_elements: int, max_elements: int) -> cudf.DataFrame:
  96. df = ExplodeSequence(
  97. col_names=["item_sequence", "cat_sequence"],
  98. keep_cols=["uid"],
  99. max_elements=max_elements + 1,
  100. ).transform(df)
  101. df["item"] = df["item_sequence"].list.get(-1)
  102. df["cat"] = df["cat_sequence"].list.get(-1)
  103. df["item_sequence"] = list_slice(df["item_sequence"], 0, -1)
  104. df["cat_sequence"] = list_slice(df["cat_sequence"], 0, -1)
  105. df = df[df.item_sequence.list.len() >= min_elements]
  106. return df
  107. def add_negative_label(pos_df: cudf.DataFrame, sampling_df: cudf.DataFrame) -> cudf.DataFrame:
  108. neg_df = pos_df.copy()
  109. pos_df["label"] = cupy.int8(1)
  110. neg_df["label"] = cupy.int8(0)
  111. neg = cupy.random.randint(
  112. low=0,
  113. high=len(sampling_df),
  114. size=len(neg_df),
  115. dtype=int,
  116. )
  117. neg_item_ids = sampling_df["item_id"].iloc[neg].values
  118. neg_df["item"] = neg_item_ids
  119. neg_cat_ids = sampling_df["cat_id"].iloc[neg].values
  120. neg_df["cat"] = neg_cat_ids
  121. df = cudf.concat([pos_df, neg_df])
  122. return df
  123. def add_negative_sampling(df: cudf.DataFrame, sampling_df: cudf.DataFrame) -> cudf.DataFrame:
  124. df = add_negative_label(df, sampling_df)
  125. neg = cupy.random.randint(
  126. low=0,
  127. high=len(sampling_df),
  128. size=int(df.item_sequence.list.len().sum()),
  129. dtype=int,
  130. )
  131. item_samples = sampling_df["item_id"].iloc[neg]
  132. cat_samples = sampling_df["cat_id"].iloc[neg]
  133. df["neg_item_sequence"] = add_negative_sequence(df["item_sequence"], item_samples)
  134. df["neg_cat_sequence"] = add_negative_sequence(df["cat_sequence"], cat_samples)
  135. return df
  136. def pad_with_zeros(df: cudf.DataFrame, max_elements: int) -> cudf.DataFrame:
  137. df["item_sequence"] = slice_and_pad_left(df["item_sequence"], max_elements)
  138. df["cat_sequence"] = slice_and_pad_left(df["cat_sequence"], max_elements)
  139. df["neg_item_sequence"] = slice_and_pad_left(df["neg_item_sequence"], max_elements)
  140. df["neg_cat_sequence"] = slice_and_pad_left(df["neg_cat_sequence"], max_elements)
  141. return df
  142. def create_train_dataset(
  143. df: cudf.DataFrame,
  144. sampling_df: cudf.DataFrame,
  145. min_elements: int,
  146. max_elements: int,
  147. output_path: str,
  148. seed: int,
  149. dask_scheduler: str = "processes",
  150. ) -> None:
  151. def transform(df, sampling_df, partition_info):
  152. part_seed = seed + partition_info["number"] + 1
  153. cupy.random.seed(part_seed)
  154. df = explode_sequence(df, min_elements, max_elements)
  155. df = add_negative_sampling(df, sampling_df)
  156. df = pad_with_zeros(df, max_elements)
  157. df = df.sort_values(by=["uid"])
  158. df.reset_index(drop=True, inplace=True)
  159. df = df[list(OUTPUT_META)]
  160. return df
  161. ddf = dask_cudf.from_cudf(df, chunksize=DASK_TRAIN_DATASET_CHUNKSIZE)
  162. ddf = ddf.map_partitions(transform, meta=OUTPUT_META, sampling_df=sampling_df)
  163. ddf = ddf.clear_divisions()
  164. with dask.config.set(scheduler=dask_scheduler):
  165. ddf.to_parquet(output_path, write_index=False, overwrite=True)
  166. def create_test_dataset(
  167. df: cudf.DataFrame,
  168. sampling_df: cudf.DataFrame,
  169. max_elements: int,
  170. output_path: str,
  171. ) -> None:
  172. df = add_negative_sampling(df, sampling_df)
  173. df = pad_with_zeros(df, max_elements)
  174. df = df.sort_values(by=["uid"])
  175. df.reset_index(drop=True, inplace=True)
  176. df = df[list(OUTPUT_META)]
  177. os.makedirs(output_path, exist_ok=True)
  178. output_file = os.path.join(output_path, TEST_DATA_FILE)
  179. df.to_parquet(output_file, index=False)
  180. @click.command()
  181. @click.option(
  182. "--amazon_dataset_path",
  183. required=True,
  184. help="Path to the dataset. Must contain both reviews and metadata json files.",
  185. type=str,
  186. )
  187. @click.option(
  188. "--output_path",
  189. required=True,
  190. help="Path where preprocessed dataset is saved.",
  191. type=str,
  192. )
  193. @click.option(
  194. "--metadata_file_name",
  195. default="meta_Books.json",
  196. help="Path to the dataset. Must contain both reviews and metadata json files.",
  197. type=str,
  198. )
  199. @click.option(
  200. "--reviews_file_name",
  201. default="reviews_Books.json",
  202. help="Path where preprocessed dataset is saved.",
  203. type=str,
  204. )
  205. @click.option(
  206. "--max_sequence_length",
  207. default=100,
  208. help="Take only `max_sequence_length` last elements of a sequence.",
  209. )
  210. @click.option(
  211. "--shortest_sequence_for_user",
  212. default=20,
  213. help="Specifies what is a minimal length of a sequence. "
  214. "Every user with a sequence shorter than this value will be discarded."
  215. )
  216. @click.option(
  217. "--shortest_sequence_for_training",
  218. default=1,
  219. help="Specifies what is a minimal length of a sequence in a training set.",
  220. )
  221. @click.option(
  222. "--metadata_loader_n_proc",
  223. default=multiprocessing.cpu_count(),
  224. help="Specifies the number of processes used to parse metadata.",
  225. )
  226. @click.option(
  227. "--review_loader_num_workers",
  228. default=20,
  229. help="Specifies the number of dask workers used to read reviews data. "
  230. "Note that, as each worker is a new process, too high value might cause GPU OOM errors."
  231. )
  232. @click.option(
  233. "--seed",
  234. default=12345,
  235. help="Seed for reproducibility."
  236. "Note that the results can still differ between machines because of dask/cudf non-determinism.",
  237. type=int,
  238. )
  239. def main(
  240. amazon_dataset_path: str,
  241. output_path: str,
  242. metadata_file_name: str,
  243. reviews_file_name: str,
  244. max_sequence_length: int,
  245. shortest_sequence_for_user: int,
  246. shortest_sequence_for_training: int,
  247. metadata_loader_n_proc: int,
  248. review_loader_num_workers: int,
  249. seed: int,
  250. ):
  251. cupy.random.seed(seed)
  252. rmm.reinitialize(managed_memory=True)
  253. metadata_path = os.path.join(amazon_dataset_path, metadata_file_name)
  254. reviews_path = os.path.join(amazon_dataset_path, reviews_file_name)
  255. logging.info("Loading metadata")
  256. metadata = load_metadata(metadata_path, metadata_loader_n_proc)
  257. assert len(metadata) == metadata["item"].nunique(), "metadata should contain unique items"
  258. logging.info("Loading review data")
  259. reviews = load_review_data(reviews_path, review_loader_num_workers)
  260. logging.info("Removing short user sequences")
  261. reviews = filter_too_short_sequences(reviews, shortest_sequence_for_user)
  262. logging.info("Categorifying users, items, categories")
  263. all_items_unique = reviews["item"].unique().to_frame()
  264. item_and_cat_with_ids = categorify_items(all_items_unique, metadata)
  265. reviews = add_items_and_categories_indices(reviews, item_and_cat_with_ids)
  266. reviews = categorify_users(reviews)
  267. logging.info("Aggregating data per user")
  268. df = aggregate_per_user(reviews)
  269. logging.info("Preparing dataframe for negative sampling")
  270. all_items = reviews["item"].to_frame()
  271. sampling_df = create_sampling_df(all_items, item_and_cat_with_ids)
  272. os.makedirs(output_path, exist_ok=True)
  273. logging.info("Creating train dataset")
  274. create_train_dataset(
  275. df,
  276. sampling_df,
  277. min_elements=shortest_sequence_for_training,
  278. max_elements=max_sequence_length,
  279. output_path=os.path.join(output_path, TRAIN_DATA_DIR),
  280. seed=seed,
  281. )
  282. logging.info("Creating test dataset")
  283. create_test_dataset(
  284. df,
  285. sampling_df,
  286. max_elements=max_sequence_length,
  287. output_path=os.path.join(output_path, TEST_DATA_DIR),
  288. )
  289. logging.info("Saving metadata")
  290. save_metadata(
  291. number_of_items=len(item_and_cat_with_ids),
  292. number_of_categories=item_and_cat_with_ids["cat_id"].nunique(),
  293. number_of_users=len(df),
  294. output_path=os.path.join(output_path, CATEGORIZED_METADATA_FILE),
  295. )
  296. if __name__ == "__main__":
  297. main()