dali.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. # Copyright (c) 2022 NVIDIA Corporation. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import ctypes
  15. import os
  16. from dataclasses import dataclass
  17. from cuda import cudart
  18. import paddle
  19. import numpy as np
  20. from nvidia.dali.backend import TensorListCPU
  21. import nvidia.dali.ops as ops
  22. import nvidia.dali.fn as fn
  23. import nvidia.dali.types as types
  24. from nvidia.dali.pipeline import Pipeline
  25. from nvidia.dali.plugin.paddle import DALIGenericIterator
  26. from utils.mode import Mode
  27. from utils.utility import get_num_trainers, get_trainer_id
  28. @dataclass
  29. class PipeOpMeta:
  30. crop: int
  31. resize_shorter: int
  32. min_area: float
  33. max_area: float
  34. lower: float
  35. upper: float
  36. interp: types.DALIInterpType
  37. mean: float
  38. std: float
  39. output_dtype: types.DALIDataType
  40. output_layout: str
  41. pad_output: bool
  42. class HybridPipeBase(Pipeline):
  43. def __init__(self,
  44. file_root,
  45. batch_size,
  46. device_id,
  47. ops_meta,
  48. num_threads=4,
  49. seed=42,
  50. shard_id=0,
  51. num_shards=1,
  52. random_shuffle=True,
  53. dont_use_mmap=True):
  54. super().__init__(batch_size, num_threads, device_id, seed=seed)
  55. self.input = ops.readers.File(
  56. file_root=file_root,
  57. shard_id=shard_id,
  58. num_shards=num_shards,
  59. random_shuffle=random_shuffle,
  60. dont_use_mmap=dont_use_mmap)
  61. self.build_ops(ops_meta)
  62. def build_ops(self, ops_meta):
  63. pass
  64. def __len__(self):
  65. return self.epoch_size("Reader")
  66. class HybridTrainPipe(HybridPipeBase):
  67. def build_ops(self, ops_meta):
  68. # Set internal nvJPEG buffers size to handle full-sized ImageNet images
  69. # without additional reallocations
  70. device_memory_padding = 211025920
  71. host_memory_padding = 140544512
  72. self.decode = ops.decoders.ImageRandomCrop(
  73. device='mixed',
  74. output_type=types.DALIImageType.RGB,
  75. device_memory_padding=device_memory_padding,
  76. host_memory_padding=host_memory_padding,
  77. random_aspect_ratio=[ops_meta.lower, ops_meta.upper],
  78. random_area=[ops_meta.min_area, ops_meta.max_area],
  79. num_attempts=100)
  80. self.res = ops.Resize(
  81. device='gpu',
  82. resize_x=ops_meta.crop,
  83. resize_y=ops_meta.crop,
  84. interp_type=ops_meta.interp)
  85. self.cmnp = ops.CropMirrorNormalize(
  86. device="gpu",
  87. dtype=ops_meta.output_dtype,
  88. output_layout=ops_meta.output_layout,
  89. crop=(ops_meta.crop, ops_meta.crop),
  90. mean=ops_meta.mean,
  91. std=ops_meta.std,
  92. pad_output=ops_meta.pad_output)
  93. self.coin = ops.random.CoinFlip(probability=0.5)
  94. self.to_int64 = ops.Cast(dtype=types.DALIDataType.INT64, device="gpu")
  95. def define_graph(self):
  96. rng = self.coin()
  97. jpegs, labels = self.input(name="Reader")
  98. images = self.decode(jpegs)
  99. images = self.res(images)
  100. output = self.cmnp(images.gpu(), mirror=rng)
  101. return [output, self.to_int64(labels.gpu())]
  102. class HybridValPipe(HybridPipeBase):
  103. def build_ops(self, ops_meta):
  104. self.decode = ops.decoders.Image(device="mixed")
  105. self.res = ops.Resize(
  106. device="gpu",
  107. resize_shorter=ops_meta.resize_shorter,
  108. interp_type=ops_meta.interp)
  109. self.cmnp = ops.CropMirrorNormalize(
  110. device="gpu",
  111. dtype=ops_meta.output_dtype,
  112. output_layout=ops_meta.output_layout,
  113. crop=(ops_meta.crop, ops_meta.crop),
  114. mean=ops_meta.mean,
  115. std=ops_meta.std,
  116. pad_output=ops_meta.pad_output)
  117. self.to_int64 = ops.Cast(dtype=types.DALIDataType.INT64, device="gpu")
  118. def define_graph(self):
  119. jpegs, labels = self.input(name="Reader")
  120. images = self.decode(jpegs)
  121. images = self.res(images)
  122. output = self.cmnp(images)
  123. return [output, self.to_int64(labels.gpu())]
  124. def dali_dataloader(args, mode, device):
  125. """
  126. Define a dali dataloader with configuration to operate datasets.
  127. Args:
  128. args(Namespace): Arguments obtained from ArgumentParser.
  129. mode(utils.Mode): Train or eval mode.
  130. device(int): Id of GPU to load data.
  131. Outputs:
  132. DALIGenericIterator(nvidia.dali.plugin.paddle.DALIGenericIterator)
  133. Iteratable outputs of DALI pipeline,
  134. including "data" and "label" in type of Paddle's Tensor.
  135. """
  136. assert "gpu" in device, "gpu training is required for DALI"
  137. assert mode in Mode, "Dataset mode should be in supported Modes"
  138. device_id = int(device.split(':')[1])
  139. seed = args.dali_random_seed
  140. num_threads = args.dali_num_threads
  141. batch_size = args.batch_size
  142. interp = 1 # settings.interpolation or 1 # default to linear
  143. interp_map = {
  144. # cv2.INTER_NEAREST
  145. 0: types.DALIInterpType.INTERP_NN,
  146. # cv2.INTER_LINEAR
  147. 1: types.DALIInterpType.INTERP_LINEAR,
  148. # cv2.INTER_CUBIC
  149. 2: types.DALIInterpType.INTERP_CUBIC,
  150. # LANCZOS3 for cv2.INTER_LANCZOS4
  151. 3: types.DALIInterpType.INTERP_LANCZOS3
  152. }
  153. assert interp in interp_map, "interpolation method not supported by DALI"
  154. interp = interp_map[interp]
  155. normalize_scale = args.normalize_scale
  156. normalize_mean = args.normalize_mean
  157. normalize_std = args.normalize_std
  158. normalize_mean = [v / normalize_scale for v in normalize_mean]
  159. normalize_std = [v / normalize_scale for v in normalize_std]
  160. output_layout = args.data_layout[1:] # NCHW -> CHW or NHWC -> HWC
  161. pad_output = args.image_channel == 4
  162. output_dtype = types.FLOAT16 if args.dali_output_fp16 else types.FLOAT
  163. shard_id = get_trainer_id()
  164. num_shards = get_num_trainers()
  165. scale = args.rand_crop_scale
  166. ratio = args.rand_crop_ratio
  167. ops_meta = PipeOpMeta(
  168. crop=args.crop_size,
  169. resize_shorter=args.resize_short,
  170. min_area=scale[0],
  171. max_area=scale[1],
  172. lower=ratio[0],
  173. upper=ratio[1],
  174. interp=interp,
  175. mean=normalize_mean,
  176. std=normalize_std,
  177. output_dtype=output_dtype,
  178. output_layout=output_layout,
  179. pad_output=pad_output)
  180. file_root = args.image_root
  181. pipe_class = None
  182. if mode == Mode.TRAIN:
  183. file_root = os.path.join(file_root, 'train')
  184. pipe_class = HybridTrainPipe
  185. else:
  186. file_root = os.path.join(file_root, 'val')
  187. pipe_class = HybridValPipe
  188. pipe = pipe_class(
  189. file_root,
  190. batch_size,
  191. device_id,
  192. ops_meta,
  193. num_threads=num_threads,
  194. seed=seed + shard_id,
  195. shard_id=shard_id,
  196. num_shards=num_shards)
  197. pipe.build()
  198. return DALIGenericIterator([pipe], ['data', 'label'], reader_name='Reader')
  199. def build_dataloader(args, mode):
  200. """
  201. Build a dataloader to process datasets. Only DALI dataloader is supported now.
  202. Args:
  203. args(Namespace): Arguments obtained from ArgumentParser.
  204. mode(utils.Mode): Train or eval mode.
  205. Returns:
  206. dataloader(nvidia.dali.plugin.paddle.DALIGenericIterator):
  207. Iteratable outputs of DALI pipeline,
  208. including "data" and "label" in type of Paddle's Tensor.
  209. """
  210. assert mode in Mode, "Dataset mode should be in supported Modes (train or eval)"
  211. return dali_dataloader(args, mode, paddle.device.get_device())
  212. def dali_synthetic_dataloader(args, device):
  213. """
  214. Define a dali dataloader with synthetic data.
  215. Args:
  216. args(Namespace): Arguments obtained from ArgumentParser.
  217. device(int): Id of GPU to load data.
  218. Outputs:
  219. DALIGenericIterator(nvidia.dali.plugin.paddle.DALIGenericIterator)
  220. Iteratable outputs of DALI pipeline,
  221. including "data" in type of Paddle's Tensor.
  222. """
  223. assert "gpu" in device, "gpu training is required for DALI"
  224. device_id = int(device.split(':')[1])
  225. batch_size = args.batch_size
  226. image_shape = args.image_shape
  227. output_dtype = types.FLOAT16 if args.dali_output_fp16 else types.FLOAT
  228. num_threads = args.dali_num_threads
  229. class ExternalInputIterator(object):
  230. def __init__(self, batch_size, image_shape):
  231. n_bytes = int(batch_size * np.prod(image_shape) * 4)
  232. err, mem = cudart.cudaMallocHost(n_bytes)
  233. assert err == cudart.cudaError_t.cudaSuccess
  234. mem_ptr = ctypes.cast(mem, ctypes.POINTER(ctypes.c_float))
  235. self.synthetic_data = np.ctypeslib.as_array(mem_ptr, shape=(batch_size, *image_shape))
  236. self.n = args.benchmark_steps
  237. def __iter__(self):
  238. self.i = 0
  239. return self
  240. def __next__(self):
  241. if self.i >= self.n:
  242. self.__iter__()
  243. raise StopIteration()
  244. self.i += 1
  245. return TensorListCPU(self.synthetic_data, is_pinned=True)
  246. eli = ExternalInputIterator(batch_size, image_shape)
  247. pipe = Pipeline(batch_size=batch_size, num_threads=num_threads, device_id=device_id)
  248. with pipe:
  249. images = fn.external_source(source=eli, no_copy=True, dtype=output_dtype)
  250. images = images.gpu()
  251. pipe.set_outputs(images)
  252. pipe.build()
  253. return DALIGenericIterator([pipe], ['data'])