coco_pipeline.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. # Copyright (c) 2018-2019, NVIDIA CORPORATION. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import ctypes
  15. import time
  16. import logging
  17. import numpy as np
  18. import torch
  19. # DALI imports
  20. import nvidia.dali as dali
  21. from nvidia.dali.pipeline import Pipeline
  22. from nvidia.dali.types import to_numpy_type
  23. class COCOPipeline(Pipeline):
  24. def __init__(self, batch_size, file_root, annotations_file, default_boxes,
  25. device_id, num_shards,
  26. output_fp16=False, output_nhwc=False, pad_output=False,
  27. num_threads=1, seed=15):
  28. super(COCOPipeline, self).__init__(batch_size=batch_size,
  29. device_id=device_id,
  30. num_threads=num_threads,
  31. seed=seed)
  32. if torch.distributed.is_initialized():
  33. shard_id = torch.distributed.get_rank()
  34. else:
  35. shard_id = 0
  36. # Data loader and image decoder
  37. self.input = dali.ops.readers.COCO(file_root=file_root,
  38. annotations_file=annotations_file,
  39. shard_id=shard_id,
  40. num_shards=num_shards,
  41. ratio=True,
  42. ltrb=True,
  43. shuffle_after_epoch=True,
  44. skip_empty=True)
  45. self.decode_slice = dali.ops.decoders.ImageSlice(device="cpu",
  46. output_type=dali.types.RGB)
  47. # Augumentation techniques
  48. ## Random crop
  49. self.crop = dali.ops.RandomBBoxCrop(device="cpu",
  50. aspect_ratio=[0.5, 2.0],
  51. thresholds=[0, 0.1, 0.3, 0.5, 0.7, 0.9],
  52. scaling=[0.3, 1.0],
  53. bbox_layout="xyXY",
  54. allow_no_crop=True,
  55. num_attempts=1)
  56. ## Color twist
  57. self.hsv = dali.ops.Hsv(device="gpu",
  58. dtype=dali.types.FLOAT) # use float to avoid clipping and quantizing the intermediate result
  59. self.bc = dali.ops.BrightnessContrast(device="gpu",
  60. contrast_center=128, # input is in the [0, 255] range
  61. dtype=dali.types.UINT8)
  62. ## Cropping and normalization
  63. dtype = dali.types.FLOAT16 if output_fp16 else dali.types.FLOAT
  64. output_layout = dali.types.NHWC if output_nhwc else dali.types.NCHW
  65. self.normalize = dali.ops.CropMirrorNormalize(
  66. device="gpu",
  67. crop=(300, 300),
  68. mean=[0.0, 0.0, 0.0],
  69. std=[255.0, 255.0, 255.0],
  70. mirror=0,
  71. dtype=dtype,
  72. output_layout=output_layout,
  73. pad_output=pad_output)
  74. ## Flipping
  75. self.flip = dali.ops.Flip(device="cpu")
  76. self.bbflip = dali.ops.BbFlip(device="cpu", ltrb=True)
  77. # Resize
  78. self.resize = dali.ops.Resize(device="cpu",
  79. resize_x=300,
  80. resize_y=300)
  81. # Random variables
  82. self.rng1 = dali.ops.random.Uniform(range=[0.5, 1.5])
  83. self.rng2 = dali.ops.random.Uniform(range=[0.875, 1.125])
  84. self.rng3 = dali.ops.random.Uniform(range=[-0.5, 0.5])
  85. self.flip_coin = dali.ops.random.CoinFlip(probability=0.5)
  86. # bbox encoder
  87. self.anchors = default_boxes(order='ltrb').cpu().numpy().flatten().tolist()
  88. self.box_encoder = dali.ops.BoxEncoder(device="cpu",
  89. criteria=0.5,
  90. anchors=self.anchors)
  91. def define_graph(self):
  92. saturation = self.rng1()
  93. contrast = self.rng1()
  94. brightness = self.rng2()
  95. hue = self.rng3()
  96. coin_rnd = self.flip_coin()
  97. inputs, bboxes, labels = self.input(name="Reader")
  98. crop_begin, crop_size, bboxes, labels = self.crop(bboxes, labels)
  99. images = self.decode_slice(inputs, crop_begin, crop_size)
  100. images = self.flip(images, horizontal=coin_rnd)
  101. bboxes = self.bbflip(bboxes, horizontal=coin_rnd)
  102. images = self.resize(images)
  103. images = images.gpu()
  104. images = self.hsv(images, hue=hue, saturation=saturation)
  105. images = self.bc(images, brightness=brightness, contrast=contrast)
  106. images = self.normalize(images)
  107. bboxes, labels = self.box_encoder(bboxes, labels)
  108. # bboxes and images and labels on GPU
  109. return (images, bboxes.gpu(), labels.gpu())
  110. to_torch_type = {
  111. np.float32 : torch.float32,
  112. np.float64 : torch.float64,
  113. np.float16 : torch.float16,
  114. np.uint8 : torch.uint8,
  115. np.int8 : torch.int8,
  116. np.int16 : torch.int16,
  117. np.int32 : torch.int32,
  118. np.int64 : torch.int64
  119. }
  120. def feed_ndarray(dali_tensor, arr):
  121. """
  122. Copy contents of DALI tensor to pyTorch's Tensor.
  123. Parameters
  124. ----------
  125. `dali_tensor` : nvidia.dali.backend.TensorCPU or nvidia.dali.backend.TensorGPU
  126. Tensor from which to copy
  127. `arr` : torch.Tensor
  128. Destination of the copy
  129. """
  130. assert dali_tensor.shape() == list(arr.size()), \
  131. ("Shapes do not match: DALI tensor has size {0}"
  132. ", but PyTorch Tensor has size {1}".format(dali_tensor.shape(), list(arr.size())))
  133. #turn raw int to a c void pointer
  134. c_type_pointer = ctypes.c_void_p(arr.data_ptr())
  135. dali_tensor.copy_to_external(c_type_pointer)
  136. return arr
  137. class DALICOCOIterator(object):
  138. """
  139. COCO DALI iterator for pyTorch.
  140. Parameters
  141. ----------
  142. pipelines : list of nvidia.dali.pipeline.Pipeline
  143. List of pipelines to use
  144. size : int
  145. Epoch size.
  146. """
  147. def __init__(self, pipelines, size):
  148. if not isinstance(pipelines, list):
  149. pipelines = [pipelines]
  150. self._num_gpus = len(pipelines)
  151. assert pipelines is not None, "Number of provided pipelines has to be at least 1"
  152. self.batch_size = pipelines[0].max_batch_size
  153. self._size = size
  154. self._pipes = pipelines
  155. # Build all pipelines
  156. for p in self._pipes:
  157. p.build()
  158. # Use double-buffering of data batches
  159. self._data_batches = [[None, None, None, None] for i in range(self._num_gpus)]
  160. self._counter = 0
  161. self._current_data_batch = 0
  162. self.output_map = ["image", "bboxes", "labels"]
  163. # We need data about the batches (like shape information),
  164. # so we need to run a single batch as part of setup to get that info
  165. self._first_batch = None
  166. self._first_batch = self.next()
  167. def __next__(self):
  168. if self._first_batch is not None:
  169. batch = self._first_batch
  170. self._first_batch = None
  171. return batch
  172. if self._counter > self._size:
  173. raise StopIteration
  174. # Gather outputs
  175. outputs = []
  176. for p in self._pipes:
  177. p._prefetch()
  178. for p in self._pipes:
  179. outputs.append(p.share_outputs())
  180. for i in range(self._num_gpus):
  181. dev_id = self._pipes[i].device_id
  182. out_images = []
  183. bboxes = []
  184. labels = []
  185. # segregate outputs into image/labels/bboxes entries
  186. for j, out in enumerate(outputs[i]):
  187. if self.output_map[j] == "image":
  188. out_images.append(out)
  189. elif self.output_map[j] == "bboxes":
  190. bboxes.append(out)
  191. elif self.output_map[j] == "labels":
  192. labels.append(out)
  193. # Change DALI TensorLists into Tensors
  194. images = [x.as_tensor() for x in out_images]
  195. images_shape = [x.shape() for x in images]
  196. # Prepare bboxes shapes
  197. bboxes_shape = []
  198. for j in range(len(bboxes)):
  199. bboxes_shape.append([])
  200. for k in range(len(bboxes[j])):
  201. bboxes_shape[j].append(bboxes[j][k].shape())
  202. # Prepare labels shapes and offsets
  203. labels_shape = []
  204. bbox_offsets = []
  205. torch.cuda.synchronize()
  206. for j in range(len(labels)):
  207. labels_shape.append([])
  208. bbox_offsets.append([0])
  209. for k in range(len(labels[j])):
  210. lshape = labels[j][k].shape()
  211. bbox_offsets[j].append(bbox_offsets[j][k] + lshape[0])
  212. labels_shape[j].append(lshape)
  213. # We always need to alocate new memory as bboxes and labels varies in shape
  214. images_torch_type = to_torch_type[to_numpy_type(images[0].dtype)]
  215. bboxes_torch_type = to_torch_type[to_numpy_type(bboxes[0][0].dtype)]
  216. labels_torch_type = to_torch_type[to_numpy_type(labels[0][0].dtype)]
  217. torch_gpu_device = torch.device('cuda', dev_id)
  218. torch_cpu_device = torch.device('cpu')
  219. pyt_images = [torch.zeros(shape, dtype=images_torch_type, device=torch_gpu_device) for shape in images_shape]
  220. pyt_bboxes = [[torch.zeros(shape, dtype=bboxes_torch_type, device=torch_gpu_device) for shape in shape_list] for shape_list in bboxes_shape]
  221. pyt_labels = [[torch.zeros(shape, dtype=labels_torch_type, device=torch_gpu_device) for shape in shape_list] for shape_list in labels_shape]
  222. pyt_offsets = [torch.zeros(len(offset), dtype=torch.int32, device=torch_cpu_device) for offset in bbox_offsets]
  223. self._data_batches[i][self._current_data_batch] = (pyt_images, pyt_bboxes, pyt_labels, pyt_offsets)
  224. # Copy data from DALI Tensors to torch tensors
  225. for j, i_arr in enumerate(images):
  226. feed_ndarray(i_arr, pyt_images[j])
  227. for j, b_list in enumerate(bboxes):
  228. for k in range(len(b_list)):
  229. if (pyt_bboxes[j][k].shape[0] != 0):
  230. feed_ndarray(b_list[k], pyt_bboxes[j][k])
  231. pyt_bboxes[j] = torch.cat(pyt_bboxes[j])
  232. for j, l_list in enumerate(labels):
  233. for k in range(len(l_list)):
  234. if (pyt_labels[j][k].shape[0] != 0):
  235. feed_ndarray(l_list[k], pyt_labels[j][k])
  236. pyt_labels[j] = torch.cat(pyt_labels[j])
  237. for j in range(len(pyt_offsets)):
  238. pyt_offsets[j] = torch.IntTensor(bbox_offsets[j])
  239. for p in self._pipes:
  240. p.release_outputs()
  241. p.schedule_run()
  242. copy_db_index = self._current_data_batch
  243. # Change index for double buffering
  244. self._current_data_batch = (self._current_data_batch + 1) % 2
  245. self._counter += self._num_gpus * self.batch_size
  246. return [db[copy_db_index] for db in self._data_batches]
  247. def next(self):
  248. """
  249. Returns the next batch of data.
  250. """
  251. return self.__next__();
  252. def __iter__(self):
  253. return self
  254. def reset(self):
  255. """
  256. Resets the iterator after the full epoch.
  257. DALI iterators do not support resetting before the end of the epoch
  258. and will ignore such request.
  259. """
  260. if self._counter > self._size:
  261. self._counter = self._counter % self._size
  262. else:
  263. logging.warning("DALI iterator does not support resetting while epoch is not finished. Ignoring...")