avatar.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. #!/usr/bin/env python3
  2. # -*- coding: UTF-8 -*-
  3. from PIL import Image
  4. import io
  5. import glob
  6. import os
  7. import numpy as np
  8. import logging
  9. import sqlite3
  10. logger = logging.getLogger(__name__)
  11. from .common.textutil import ensure_unicode, md5
  12. def _filename_priority(s):
  13. if "_hd" in s and s.endswith(".png"):
  14. return 10
  15. else:
  16. return 1
  17. class AvatarReader(object):
  18. def __init__(self, res_dir, avt_db="avatar.index"):
  19. self.sfs_dir = os.path.join(res_dir, 'sfs')
  20. # new location of avatar, see #50
  21. self.avt_dir = os.path.join(res_dir, 'avatar')
  22. if not os.path.isdir(self.avt_dir) or len(os.listdir(self.avt_dir)) == 0:
  23. self.avt_dir = None
  24. self.avt_db = avt_db
  25. self._use_avt = True
  26. if self.avt_db is not None:
  27. if len(glob.glob(os.path.join(self.sfs_dir, 'avatar*'))) == 0:
  28. # has sfs/avatar*
  29. self.avt_db = None
  30. if self.avt_dir is None and self.avt_db is None:
  31. logger.warn("Cannot find avatar storage. Will not use avatar!")
  32. self._use_avt = False
  33. def get_avatar_from_avtdb(self, avtid):
  34. try:
  35. candidates = self._search_avt_db(avtid)
  36. candidates = sorted(candidates, key=lambda x: _filename_priority(x[0]), reverse=True)
  37. for c in candidates:
  38. path, offset, size = c
  39. return self.read_img_from_block(path, offset, size)
  40. except Exception:
  41. pass
  42. def get_avatar_from_avtdir(self, avtid):
  43. dir1, dir2 = avtid[:2], avtid[2:4]
  44. candidates = glob.glob(os.path.join(self.avt_dir, dir1, dir2, f"*{avtid}*"))
  45. candidates = sorted(set(candidates), key=_filename_priority, reverse=True)
  46. for cand in candidates:
  47. if os.path.isdir(cand):
  48. candidates.extend(os.path.join(cand, x) for x in os.listdir(cand))
  49. for cand in candidates:
  50. if os.path.isdir(cand):
  51. continue
  52. try:
  53. if cand.endswith(".bm"):
  54. return self.read_bm_file(cand)
  55. else:
  56. return Image.open(cand)
  57. except Exception:
  58. logger.exception("")
  59. pass
  60. def get_avatar(self, username):
  61. """ username: `username` field in db.rcontact"""
  62. if not self._use_avt:
  63. return None
  64. username = ensure_unicode(username)
  65. avtid = md5(username.encode('utf-8'))
  66. if self.avt_db is not None:
  67. ret = self.get_avatar_from_avtdb(avtid)
  68. if ret is not None:
  69. return ret
  70. if self.avt_dir is not None:
  71. ret = self.get_avatar_from_avtdir(avtid)
  72. if ret is not None:
  73. return ret
  74. logger.warning("Avatar for {} not found anywhere.".format(username))
  75. def read_img_from_block(self, filename, pos, size):
  76. file_idx = pos >> 32
  77. fname = os.path.join(self.sfs_dir,
  78. 'avatar.block.' + '{:05d}'.format(file_idx))
  79. # offset of each block file: 17 + len(path)
  80. start_pos = pos - file_idx * (2**32) + 16 + len(filename) + 1
  81. try:
  82. with open(fname, 'rb') as f:
  83. f.seek(start_pos)
  84. data = f.read(size)
  85. im = Image.open(io.BytesIO(data))
  86. return im
  87. except IOError as e:
  88. logger.warn("Cannot read avatar from {}: {}".format(fname, str(e)))
  89. return None
  90. def read_bm_file(self, fname):
  91. # history at https://github.com/ppwwyyxx/wechat-dump/pull/14
  92. with open(fname, 'rb') as f:
  93. # filesize is 36880=96x96x4+16
  94. size = (96, 96, 3)
  95. img = np.zeros(size, dtype='uint8')
  96. for i in range(96):
  97. for j in range(96):
  98. r, g, b, a = f.read(4)
  99. img[i,j] = (r, g, b)
  100. return Image.fromarray(img, mode="RGB")
  101. def _search_avt_db(self, avtid):
  102. conn = sqlite3.connect(self.avt_db)
  103. cursor = conn.execute("select FileName,Offset,Size from Index_avatar")
  104. candidates = []
  105. for path, offset, size in cursor:
  106. if avtid in path:
  107. candidates.append((path, offset, size))
  108. return candidates
  109. if __name__ == '__main__':
  110. import sys
  111. r = AvatarReader(sys.argv[1], sys.argv[2])
  112. print(r.get_avatar(sys.argv[3]))