file_chunker_utils.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. # Copyright (c) Facebook, Inc. and its affiliates.
  2. #
  3. # This source code is licensed under the MIT license found in the
  4. # LICENSE file in the root directory of this source tree.
  5. # Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
  6. #
  7. # Licensed under the Apache License, Version 2.0 (the "License");
  8. # you may not use this file except in compliance with the License.
  9. # You may obtain a copy of the License at
  10. #
  11. # http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. # Unless required by applicable law or agreed to in writing, software
  14. # distributed under the License is distributed on an "AS IS" BASIS,
  15. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. # See the License for the specific language governing permissions and
  17. # limitations under the License.
  18. import os
  19. import typing as tp
  20. def _safe_readline(fd) -> str:
  21. pos = fd.tell()
  22. while True:
  23. try:
  24. return fd.readline()
  25. except UnicodeDecodeError:
  26. pos -= 1
  27. fd.seek(pos) # search where this character begins
  28. def find_offsets(filename: str, num_chunks: int) -> tp.List[int]:
  29. """
  30. given a file and a number of chuncks, find the offsets in the file
  31. to be able to chunk around full lines.
  32. """
  33. with open(filename, "r", encoding="utf-8") as f:
  34. size = os.fstat(f.fileno()).st_size
  35. chunk_size = size // num_chunks
  36. offsets = [0 for _ in range(num_chunks + 1)]
  37. for i in range(1, num_chunks):
  38. f.seek(chunk_size * i)
  39. _safe_readline(f)
  40. offsets[i] = f.tell()
  41. offsets[-1] = size
  42. return offsets
  43. class ChunkLineIterator:
  44. """
  45. Iterator to properly iterate over lines of a file chunck.
  46. """
  47. def __init__(self, fd, start_offset: int, end_offset: int):
  48. self._fd = fd
  49. self._start_offset = start_offset
  50. self._end_offset = end_offset
  51. def __iter__(self) -> tp.Iterable[str]:
  52. self._fd.seek(self._start_offset)
  53. # next(f) breaks f.tell(), hence readline() must be used
  54. line = _safe_readline(self._fd)
  55. while line:
  56. pos = self._fd.tell()
  57. # f.tell() does not always give the byte position in the file
  58. # sometimes it skips to a very large number
  59. # it is unlikely that through a normal read we go from
  60. # end bytes to end + 2**32 bytes (4 GB) and this makes it unlikely
  61. # that the procedure breaks by the undeterministic behavior of
  62. # f.tell()
  63. if (
  64. self._end_offset > 0
  65. and pos > self._end_offset
  66. and pos < self._end_offset + 2 ** 32
  67. ):
  68. break
  69. yield line
  70. line = self._fd.readline()
  71. class Chunker:
  72. """
  73. contextmanager to read a chunck of a file line by line.
  74. """
  75. def __init__(self, path: str, start_offset: int, end_offset: int):
  76. self.path = path
  77. self.start_offset = start_offset
  78. self.end_offset = end_offset
  79. def __enter__(self) -> ChunkLineIterator:
  80. self.fd = open(self.path, "r", encoding="utf-8")
  81. return ChunkLineIterator(self.fd, self.start_offset, self.end_offset)
  82. def __exit__(self, exc_type, exc_val, exc_tb) -> None:
  83. self.fd.close()