ops.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. # Copyright (c) 2022 NVIDIA CORPORATION. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from typing import List, Optional
  16. import cudf
  17. import cupy
  18. import numba.cuda
  19. from nvtabular import ops
  20. from nvtabular.dispatch import _build_cudf_list_column as nvt_build_list_column
  21. THREADS = 32
  22. logging.getLogger("numba").setLevel(logging.WARNING)
  23. def list_slice(seq_col, start: int, end: Optional[int] = None):
  24. """Slices a list column
  25. This is an nvtabular.ops.ListSlice wrapper that can be used with cuDF or dask-cuDF.
  26. """
  27. df = cudf.DataFrame(seq_col)
  28. col_selector = ops.ColumnSelector(seq_col.name)
  29. slicer = ops.ListSlice(start, end)
  30. transformed = slicer.transform(col_selector, df)
  31. return transformed[seq_col.name]
  32. @numba.cuda.jit
  33. def _calculate_row_sizes(offsets, row_sizes, max_elements):
  34. rowid = numba.cuda.grid(1)
  35. if rowid < offsets.size - 1:
  36. original_row_size = offsets[rowid + 1] - offsets[rowid]
  37. for i in range(original_row_size):
  38. row_sizes[1 + offsets[rowid] + i] = min(i + 1, max_elements)
  39. @numba.cuda.jit
  40. def _generate_rows(offsets, chunk_offsets, elements, new_elements, max_elements):
  41. rowid = numba.cuda.grid(1)
  42. if rowid < offsets.size - 1:
  43. original_row_size = offsets[rowid + 1] - offsets[rowid]
  44. chunk_offset = chunk_offsets[rowid]
  45. row_offset = 0
  46. for current_row_size in range(1, original_row_size + 1):
  47. original_row_offset = offsets[rowid] + max(0, current_row_size - max_elements)
  48. current_row_size = min(current_row_size, max_elements)
  49. for i in range(current_row_size):
  50. new_elements[chunk_offset + row_offset + i] = elements[original_row_offset + i]
  51. row_offset += current_row_size
  52. @numba.cuda.jit
  53. def _preserve_data(offsets, values, new_values):
  54. rowid = numba.cuda.grid(1)
  55. if rowid < offsets.size - 1:
  56. for i in range(offsets[rowid], offsets[rowid + 1]):
  57. new_values[i] = values[rowid]
  58. @numba.cuda.jit
  59. def _slice_rjust(max_elements, offsets, elements, new_offsets, new_elements):
  60. rowid = numba.cuda.grid(1)
  61. if rowid < new_offsets.size - 1:
  62. row_size = min(offsets[rowid + 1] - offsets[rowid], max_elements)
  63. offset = offsets[rowid + 1] - row_size
  64. new_start = new_offsets[rowid + 1] - row_size
  65. for i in range(row_size):
  66. new_elements[new_start + i] = elements[offset + i]
  67. def slice_and_pad_left(seq_col, max_elements, pad_value=0):
  68. c = seq_col._column
  69. offsets = c.offsets.values
  70. elements = c.elements.values
  71. threads = THREADS
  72. blocks = (offsets.size + threads - 1) // threads
  73. new_offsets = cupy.arange(offsets.size, dtype=offsets.dtype) * max_elements
  74. new_elements = cupy.full(
  75. new_offsets[-1].item(), fill_value=pad_value, dtype=elements.dtype
  76. )
  77. _slice_rjust[blocks, threads](
  78. max_elements, offsets, elements, new_offsets, new_elements
  79. )
  80. new_col = nvt_build_list_column(new_elements, new_offsets)
  81. return new_col
  82. class ExplodeSequence:
  83. """
  84. For each row create a new one with a subsequence of the original list columns.
  85. Keep at most `max_elements` of elements of a list.
  86. WARNING: All lists in the same row must have equal lengths!
  87. """
  88. def __init__(
  89. self,
  90. col_names: List[str],
  91. keep_cols: List[str],
  92. max_elements: int,
  93. ):
  94. self.col_names = col_names
  95. self.keep_cols = keep_cols
  96. self.max_elements = max_elements
  97. if not self.col_names:
  98. raise ValueError("`col_names` cannot be empty")
  99. def transform(self, df: cudf.DataFrame) -> cudf.DataFrame:
  100. ret = cudf.DataFrame()
  101. for col_name in self.col_names:
  102. c = df[col_name]._column
  103. offsets = c.offsets.values
  104. elements = c.elements.values
  105. threads = THREADS
  106. blocks = (offsets.size + threads - 1) // threads
  107. lengths = df[col_name].list.len().values
  108. sizes = cupy.minimum(lengths, self.max_elements)
  109. sizes = sizes * (sizes + 1) / 2
  110. truncated = cupy.maximum(lengths - self.max_elements, 0) * self.max_elements
  111. chunk_sizes = (sizes + truncated).astype(offsets.dtype)
  112. chunk_offsets = cupy.zeros(len(offsets), dtype=offsets.dtype)
  113. cupy.cumsum(chunk_sizes, dtype=offsets.dtype, out=chunk_offsets[1:])
  114. new_offsets_size = int(lengths.sum() + 1)
  115. new_elements_size = int(chunk_sizes.sum())
  116. new_offsets = cupy.zeros(new_offsets_size, dtype=offsets.dtype)
  117. _calculate_row_sizes[blocks, threads](
  118. offsets, new_offsets, self.max_elements
  119. )
  120. new_offsets = cupy.cumsum(new_offsets).astype(offsets.dtype)
  121. new_elements = cupy.zeros(new_elements_size, dtype=elements.dtype)
  122. _generate_rows[blocks, threads](
  123. offsets, chunk_offsets, elements, new_elements, self.max_elements
  124. )
  125. col = nvt_build_list_column(new_elements, new_offsets)
  126. ret[col_name] = col
  127. for col in self.keep_cols:
  128. new_values = cupy.zeros(new_offsets_size - 1, dtype=int)
  129. _preserve_data[blocks, threads](
  130. offsets, df[col].values, new_values
  131. )
  132. ret[col] = new_values
  133. ret = ret[self.keep_cols + self.col_names]
  134. return ret
  135. def add_negative_sequence(seq_col, samples):
  136. c = seq_col._column
  137. offsets = c.offsets.values
  138. elements = c.elements.values
  139. new_offsets = offsets.copy()
  140. new_elements = cupy.empty_like(elements)
  141. new_elements = cupy.array(samples.to_gpu_array())
  142. col = nvt_build_list_column(new_elements, new_offsets)
  143. return col