input_pipeline.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. # -----------------------------------------------------------------------
  2. #
  3. # Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import numpy as np
  17. import cupy as cp
  18. def generate_negatives(neg_users, true_mat, item_range, sort=False, use_trick=False):
  19. """
  20. Generate negative samples for data augmentation
  21. """
  22. neg_u = []
  23. neg_i = []
  24. # If using the shortcut, generate negative items without checking if the associated
  25. # user has interacted with it. Speeds up training significantly with very low impact
  26. # on accuracy.
  27. if use_trick:
  28. neg_items = cp.random.randint(0, high=item_range, size=neg_users.shape[0])
  29. return neg_users, neg_items
  30. # Otherwise, generate negative items, check if associated user has interacted with it,
  31. # then generate a new one if true
  32. while len(neg_users) > 0:
  33. neg_items = cp.random.randint(0, high=item_range, size=neg_users.shape[0])
  34. neg_mask = true_mat[neg_users, neg_items]
  35. neg_u.append(neg_users[neg_mask])
  36. neg_i.append(neg_items[neg_mask])
  37. neg_users = neg_users[cp.logical_not(neg_mask)]
  38. neg_users = cp.concatenate(neg_u)
  39. neg_items = cp.concatenate(neg_i)
  40. if not sort:
  41. return neg_users, neg_items
  42. sorted_users = cp.sort(neg_users)
  43. sort_indices = cp.argsort(neg_users)
  44. return sorted_users, neg_items[sort_indices]
  45. class DataGenerator:
  46. """
  47. Class to handle data augmentation
  48. """
  49. def __init__(self,
  50. seed,
  51. hvd_rank,
  52. num_users, # type: int
  53. num_items, # type: int
  54. neg_mat, # type: np.ndarray
  55. train_users, # type: np.ndarray
  56. train_items, # type: np.ndarray
  57. train_labels, # type: np.ndarray
  58. train_batch_size, # type: int
  59. train_negative_samples, # type: int
  60. pos_eval_users, # type: np.ndarray
  61. pos_eval_items, # type: np.ndarray
  62. eval_users_per_batch, # type: int
  63. eval_negative_samples, # type: int
  64. ):
  65. # Check input data
  66. if train_users.shape != train_items.shape:
  67. raise ValueError(
  68. "Train shapes mismatch! {} Users vs {} Items!".format(
  69. train_users.shape, train_items.shape))
  70. if pos_eval_users.shape != pos_eval_items.shape:
  71. raise ValueError(
  72. "Eval shapes mismatch! {} Users vs {} Items!".format(
  73. pos_eval_users.shape, pos_eval_items.shape))
  74. np.random.seed(seed)
  75. cp.random.seed(seed)
  76. # Use GPU assigned to the horovod rank
  77. self.hvd_rank = hvd_rank
  78. cp.cuda.Device(self.hvd_rank).use()
  79. self.num_users = num_users
  80. self.num_items = num_items
  81. self._neg_mat = neg_mat
  82. self._train_users = cp.array(train_users)
  83. self._train_items = cp.array(train_items)
  84. self._train_labels = cp.array(train_labels)
  85. self.train_batch_size = train_batch_size
  86. self._train_negative_samples = train_negative_samples
  87. self._pos_eval_users = pos_eval_users
  88. self._pos_eval_items = pos_eval_items
  89. self.eval_users_per_batch = eval_users_per_batch
  90. self._eval_negative_samples = eval_negative_samples
  91. # Eval data
  92. self.eval_users = None
  93. self.eval_items = None
  94. self.dup_mask = None
  95. # Training data
  96. self.train_users_batches = None
  97. self.train_items_batches = None
  98. self.train_labels_batches = None
  99. # Augment test data with negative samples
  100. def prepare_eval_data(self):
  101. pos_eval_users = cp.array(self._pos_eval_users)
  102. pos_eval_items = cp.array(self._pos_eval_items)
  103. neg_mat = cp.array(self._neg_mat)
  104. neg_eval_users_base = cp.repeat(pos_eval_users, self._eval_negative_samples)
  105. # Generate negative samples
  106. test_u_neg, test_i_neg = generate_negatives(neg_users=neg_eval_users_base, true_mat=neg_mat,
  107. item_range=self.num_items, sort=True, use_trick=False)
  108. test_u_neg = test_u_neg.reshape((-1, self._eval_negative_samples)).get()
  109. test_i_neg = test_i_neg.reshape((-1, self._eval_negative_samples)).get()
  110. test_users = self._pos_eval_users.reshape((-1, 1))
  111. test_items = self._pos_eval_items.reshape((-1, 1))
  112. # Combine positive and negative samples
  113. test_users = np.concatenate((test_u_neg, test_users), axis=1)
  114. test_items = np.concatenate((test_i_neg, test_items), axis=1)
  115. # Generate duplicate mask
  116. ## Stable sort indices by incrementing all values with fractional position
  117. indices = np.arange(test_users.shape[1]).reshape((1, -1)).repeat(test_users.shape[0], axis=0)
  118. summed_items = np.add(test_items, indices/test_users.shape[1])
  119. sorted_indices = np.argsort(summed_items, axis=1)
  120. sorted_order = np.argsort(sorted_indices, axis=1)
  121. sorted_items = np.sort(test_items, axis=1)
  122. ## Generate duplicate mask
  123. dup_mask = np.equal(sorted_items[:,0:-1], sorted_items[:,1:])
  124. dup_mask = np.concatenate((dup_mask, np.zeros((test_users.shape[0], 1))), axis=1)
  125. r_indices = np.arange(test_users.shape[0]).reshape((-1, 1)).repeat(test_users.shape[1], axis=1)
  126. dup_mask = dup_mask[r_indices, sorted_order].astype(np.float32)
  127. # Reshape all to (-1) and split into chunks
  128. batch_size = self.eval_users_per_batch * test_users.shape[1]
  129. split_indices = np.arange(batch_size, test_users.shape[0]*test_users.shape[1], batch_size)
  130. self.eval_users = np.split(test_users.reshape(-1), split_indices)
  131. self.eval_items = np.split(test_items.reshape(-1), split_indices)
  132. self.dup_mask = np.split(dup_mask.reshape(-1), split_indices)
  133. # Free GPU memory to make space for Tensorflow
  134. cp.get_default_memory_pool().free_all_blocks()
  135. # Augment training data with negative samples
  136. def prepare_train_data(self):
  137. batch_size = self.train_batch_size
  138. is_neg = cp.logical_not(self._train_labels)
  139. # Do not store verification matrix if using the negatives generation shortcut
  140. neg_mat = None
  141. # If there are no negative samples in the local portion of the training data, do nothing
  142. any_neg = cp.any(is_neg)
  143. if any_neg:
  144. self._train_users[is_neg], self._train_items[is_neg] = generate_negatives(
  145. self._train_users[is_neg], neg_mat, self.num_items, use_trick=True
  146. )
  147. shuffled_order = cp.random.permutation(self._train_users.shape[0])
  148. self._train_users = self._train_users[shuffled_order]
  149. self._train_items = self._train_items[shuffled_order]
  150. self._train_labels = self._train_labels[shuffled_order]
  151. # Manually create batches
  152. split_indices = np.arange(batch_size, self._train_users.shape[0], batch_size)
  153. self.train_users_batches = np.split(self._train_users, split_indices)
  154. self.train_items_batches = np.split(self._train_items, split_indices)
  155. self.train_labels_batches = np.split(self._train_labels, split_indices)