avatar.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. #!/usr/bin/env python3
  2. # -*- coding: UTF-8 -*-
  3. from PIL import Image
  4. import io
  5. import glob
  6. import os
  7. import numpy as np
  8. import logging
  9. import sqlite3
  10. logger = logging.getLogger(__name__)
  11. from .common.textutil import ensure_unicode, md5
  12. def _filename_priority(s):
  13. if "_hd" in s and s.endswith(".png"):
  14. return 10
  15. else:
  16. return 1
  17. class AvatarReader(object):
  18. def __init__(self, res_dir, avt_db="avatar.index"):
  19. self.sfs_dir = os.path.join(res_dir, 'sfs')
  20. # new location of avatar, see #50
  21. self.avt_dir = os.path.join(res_dir, 'avatar')
  22. self.avt_db = avt_db
  23. self._use_avt = True
  24. if os.path.isdir(self.avt_dir) and len(os.listdir(self.avt_dir)):
  25. # has resource/avatar
  26. pass
  27. elif self.avt_db is not None \
  28. and os.path.isfile(self.avt_db) \
  29. and glob.glob(os.path.join(self.sfs_dir, 'avatar*')):
  30. # has sfs/avatar*
  31. pass
  32. else:
  33. logger.warn("Cannot find avatar storage. Will not use avatar!")
  34. self._use_avt = False
  35. def get_avatar(self, username):
  36. """ username: `username` field in db.rcontact"""
  37. if not self._use_avt:
  38. return None
  39. username = ensure_unicode(username)
  40. avtid = md5(username.encode('utf-8'))
  41. try:
  42. candidates = self.query_index(avtid)
  43. candidates = sorted(candidates, key=lambda x: _filename_priority(x[0]), reverse=True)
  44. for c in candidates:
  45. path, offset, size = c
  46. return self.read_img_from_block(path, offset, size)
  47. except:
  48. pass
  49. dir1, dir2 = avtid[:2], avtid[2:4]
  50. candidates = glob.glob(os.path.join(self.avt_dir, dir1, dir2, f"*{avtid}*"))
  51. default_candidate = os.path.join(self.avt_dir, dir1, dir2, f"user_{avtid}.png")
  52. candidates.append(default_candidate)
  53. candidates = sorted(set(candidates), key=_filename_priority, reverse=True)
  54. for cand in candidates:
  55. try:
  56. if os.path.exists(cand):
  57. if cand.endswith(".bm"):
  58. return self.read_bm_file(cand)
  59. else:
  60. return Image.open(cand)
  61. except Exception:
  62. logger.exception("")
  63. pass
  64. logger.warning("Avatar for {} not found anywhere.".format(username))
  65. def read_img_from_block(self, filename, pos, size):
  66. file_idx = pos >> 32
  67. fname = os.path.join(self.sfs_dir,
  68. 'avatar.block.' + '{:05d}'.format(file_idx))
  69. # offset of each block file: 17 + len(path)
  70. start_pos = pos - file_idx * (2**32) + 16 + len(filename) + 1
  71. try:
  72. with open(fname, 'rb') as f:
  73. f.seek(start_pos)
  74. data = f.read(size)
  75. im = Image.open(io.BytesIO(data))
  76. return im
  77. except IOError as e:
  78. logger.warn("Cannot read avatar from {}: {}".format(fname, str(e)))
  79. return None
  80. def read_bm_file(self, fname):
  81. # history at https://github.com/ppwwyyxx/wechat-dump/pull/14
  82. with open(fname, 'rb') as f:
  83. # filesize is 36880=96x96x4+16
  84. size = (96, 96, 3)
  85. img = np.zeros(size, dtype='uint8')
  86. for i in range(96):
  87. for j in range(96):
  88. r, g, b, a = f.read(4)
  89. img[i,j] = (r, g, b)
  90. return Image.fromarray(img, mode="RGB")
  91. def query_index(self, avtid):
  92. conn = sqlite3.connect(self.avt_db)
  93. cursor = conn.execute("select FileName,Offset,Size from Index_avatar")
  94. candidates = []
  95. for path, offset, size in cursor:
  96. if avtid in path:
  97. candidates.append((path, offset, size))
  98. return candidates
  99. if __name__ == '__main__':
  100. import sys
  101. r = AvatarReader(sys.argv[1], sys.argv[2])
  102. print(r.get_avatar(sys.argv[3]))