datasets.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. # Copyright (c) 2020 NVIDIA CORPORATION. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import concurrent
  15. import math
  16. import os
  17. import queue
  18. import torch
  19. import numpy as np
  20. from torch.utils.data import Dataset
  21. from typing import Optional, Sequence, Tuple, Any, Dict
  22. from dlrm.data.utils import get_categorical_feature_type
  23. from dlrm.utils.distributed import get_rank
  24. class SyntheticDataset(Dataset):
  25. """Synthetic dataset version of criteo dataset."""
  26. def __init__(
  27. self,
  28. num_entries: int,
  29. device: str = 'cuda',
  30. batch_size: int = 1,
  31. numerical_features: Optional[int] = None,
  32. categorical_feature_sizes: Optional[Sequence[int]] = None,
  33. device_mapping: Optional[Dict[str, Any]] = None
  34. ):
  35. if device_mapping:
  36. # distributed setting
  37. rank = get_rank()
  38. numerical_features = numerical_features if device_mapping["bottom_mlp"] == rank else None
  39. categorical_feature_sizes = device_mapping["embedding"][rank]
  40. self.cat_features_count = len(categorical_feature_sizes) if categorical_feature_sizes is not None else 0
  41. self.num_features_count = numerical_features if numerical_features is not None else 0
  42. self.tot_fea = 1 + self.num_features_count + self.cat_features_count
  43. self.batch_size = batch_size
  44. self.batches_per_epoch = math.ceil(num_entries / batch_size)
  45. self.categorical_feature_sizes = categorical_feature_sizes
  46. self.device = device
  47. self.tensor = torch.randint(low=0, high=2, size=(self.batch_size, self.tot_fea), device=self.device)
  48. self.tensor = self.tensor.float()
  49. def __len__(self):
  50. return self.batches_per_epoch
  51. def __getitem__(self, idx: int):
  52. if idx >= self.batches_per_epoch:
  53. raise IndexError()
  54. numerical_features = (self.tensor[:, 1: 1 + self.num_features_count].to(torch.float32)
  55. if self.num_features_count > 0 else None)
  56. categorical_features = (self.tensor[:, 1 + self.num_features_count:].to(torch.long)
  57. if self.cat_features_count > 0 else None)
  58. target = self.tensor[:, 0].to(torch.float32)
  59. return numerical_features, categorical_features, target
  60. class CriteoBinDataset(Dataset):
  61. """Simple dataloader for a recommender system. Designed to work with a single binary file."""
  62. def __init__(
  63. self,
  64. data_file: str,
  65. batch_size: int = 1,
  66. subset: float = None,
  67. numerical_features: int = 13,
  68. categorical_features: int = 26,
  69. data_type: str = 'int32'
  70. ):
  71. self.data_type = np.__dict__[data_type]
  72. bytes_per_feature = self.data_type().nbytes
  73. self.tad_fea = 1 + numerical_features
  74. self.tot_fea = 1 + numerical_features + categorical_features
  75. self.batch_size = batch_size
  76. self.bytes_per_entry = (bytes_per_feature * self.tot_fea * batch_size)
  77. self.num_entries = math.ceil(os.path.getsize(data_file) / self.bytes_per_entry)
  78. if subset is not None:
  79. if subset <= 0 or subset > 1:
  80. raise ValueError('Subset parameter must be in (0,1) range')
  81. self.num_entries = math.ceil(self.num_entries * subset)
  82. self.file = open(data_file, 'rb')
  83. self._last_read_idx = -1
  84. def __len__(self):
  85. return self.num_entries
  86. def __getitem__(self, idx):
  87. if idx >= self.num_entries:
  88. raise IndexError()
  89. if idx == 0:
  90. self.file.seek(0, 0)
  91. elif self._last_read_idx != (idx - 1):
  92. self.file.seek(idx * self.bytes_per_entry, 0)
  93. raw_data = self.file.read(self.bytes_per_entry)
  94. self._last_read_idx = idx
  95. array = np.frombuffer(raw_data, dtype=self.data_type).reshape(-1, self.tot_fea)
  96. return array
  97. def __del__(self):
  98. self.file.close()
  99. class SplitCriteoDataset(Dataset):
  100. """Split version of Criteo dataset
  101. Args:
  102. data_path (str): Full path to split binary file of dataset. It must contain numerical.bin, label.bin and
  103. cat_0 ~ cat_25.bin
  104. batch_size (int):
  105. numerical_features(boolean): If True, load numerical features for bottom_mlp. Default False
  106. categorical_features (list or None): categorical features used by the rank
  107. prefetch_depth (int): How many samples to prefetch. Default 10.
  108. """
  109. def __init__(
  110. self,
  111. data_path: str,
  112. batch_size: int = 1,
  113. numerical_features: bool = False,
  114. categorical_features: Optional[Sequence[int]] = None,
  115. categorical_feature_sizes: Optional[Sequence[int]] = None,
  116. prefetch_depth: int = 10,
  117. drop_last_batch: bool = False,
  118. ):
  119. self._label_bytes_per_batch = np.dtype(np.bool).itemsize * batch_size
  120. self._numerical_bytes_per_batch = 13 * np.dtype(np.float16).itemsize * batch_size if numerical_features else 0
  121. self._categorical_feature_types = [
  122. get_categorical_feature_type(size) for size in categorical_feature_sizes
  123. ] if categorical_feature_sizes else []
  124. self._categorical_bytes_per_batch = [
  125. np.dtype(cat_type).itemsize * batch_size for cat_type in self._categorical_feature_types
  126. ]
  127. self._categorical_features = categorical_features
  128. self._batch_size = batch_size
  129. self._label_file = os.open(os.path.join(data_path, f"label.bin"), os.O_RDONLY)
  130. self._num_entries = int(math.ceil(os.fstat(self._label_file).st_size
  131. / self._label_bytes_per_batch)) if not drop_last_batch \
  132. else int(math.floor(os.fstat(self._label_file).st_size / self._label_bytes_per_batch))
  133. if numerical_features:
  134. self._numerical_features_file = os.open(os.path.join(data_path, "numerical.bin"), os.O_RDONLY)
  135. number_of_numerical_batches = math.ceil(os.fstat(self._numerical_features_file).st_size
  136. / self._numerical_bytes_per_batch) if not drop_last_batch \
  137. else math.floor(os.fstat(self._numerical_features_file).st_size
  138. / self._numerical_bytes_per_batch)
  139. if number_of_numerical_batches != self._num_entries:
  140. raise ValueError("Size mismatch in data files")
  141. else:
  142. self._numerical_features_file = None
  143. if categorical_features:
  144. self._categorical_features_files = []
  145. for cat_id in categorical_features:
  146. cat_file = os.open(os.path.join(data_path, f"cat_{cat_id}.bin"), os.O_RDONLY)
  147. cat_bytes = self._categorical_bytes_per_batch[cat_id]
  148. number_of_categorical_batches = math.ceil(os.fstat(cat_file).st_size / cat_bytes) if not drop_last_batch \
  149. else math.floor(os.fstat(cat_file).st_size / cat_bytes)
  150. if number_of_categorical_batches != self._num_entries:
  151. raise ValueError("Size mismatch in data files")
  152. self._categorical_features_files.append(cat_file)
  153. else:
  154. self._categorical_features_files = None
  155. self._prefetch_depth = min(prefetch_depth, self._num_entries)
  156. self._prefetch_queue = queue.Queue()
  157. self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
  158. def __len__(self):
  159. return self._num_entries
  160. def __getitem__(self, idx: int):
  161. if idx >= self._num_entries:
  162. raise IndexError()
  163. if self._prefetch_depth <= 1:
  164. return self._get_item(idx)
  165. if idx == 0:
  166. for i in range(self._prefetch_depth):
  167. self._prefetch_queue.put(self._executor.submit(self._get_item, (i)))
  168. if idx < self._num_entries - self._prefetch_depth:
  169. self._prefetch_queue.put(self._executor.submit(self._get_item, (idx + self._prefetch_depth)))
  170. return self._prefetch_queue.get().result()
  171. def _get_item(self, idx: int) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[torch.Tensor]]:
  172. click = self._get_label(idx)
  173. numerical_features = self._get_numerical_features(idx)
  174. categorical_features = self._get_categorical_features(idx)
  175. return numerical_features, categorical_features, click
  176. def _get_label(self, idx: int) -> torch.Tensor:
  177. raw_label_data = os.pread(self._label_file, self._label_bytes_per_batch,
  178. idx * self._label_bytes_per_batch)
  179. array = np.frombuffer(raw_label_data, dtype=np.bool)
  180. return torch.from_numpy(array).to(torch.float32)
  181. def _get_numerical_features(self, idx: int) -> Optional[torch.Tensor]:
  182. if self._numerical_features_file is None:
  183. return None
  184. raw_numerical_data = os.pread(self._numerical_features_file, self._numerical_bytes_per_batch,
  185. idx * self._numerical_bytes_per_batch)
  186. array = np.frombuffer(raw_numerical_data, dtype=np.float16)
  187. return torch.from_numpy(array).view(-1, 13)
  188. def _get_categorical_features(self, idx: int) -> Optional[torch.Tensor]:
  189. if self._categorical_features_files is None:
  190. return None
  191. categorical_features = []
  192. for cat_id, cat_file in zip(self._categorical_features, self._categorical_features_files):
  193. cat_bytes = self._categorical_bytes_per_batch[cat_id]
  194. cat_type = self._categorical_feature_types[cat_id]
  195. raw_cat_data = os.pread(cat_file, cat_bytes, idx * cat_bytes)
  196. array = np.frombuffer(raw_cat_data, dtype=cat_type)
  197. tensor = torch.from_numpy(array).unsqueeze(1).to(torch.long)
  198. categorical_features.append(tensor)
  199. return torch.cat(categorical_features, dim=1)
  200. def __del__(self):
  201. data_files = [self._label_file, self._numerical_features_file]
  202. if self._categorical_features_files is not None:
  203. data_files += self._categorical_features_files
  204. for data_file in data_files:
  205. if data_file is not None:
  206. os.close(data_file)