avatar.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. #!/usr/bin/env python3
  2. # -*- coding: UTF-8 -*-
  3. from PIL import Image
  4. import io
  5. import glob
  6. import os
  7. import numpy as np
  8. import logging
  9. import sqlite3
  10. logger = logging.getLogger(__name__)
  11. from .common.textutil import ensure_unicode, md5
  12. class AvatarReader(object):
  13. def __init__(self, res_dir, avt_db="avatar.index"):
  14. self.sfs_dir = os.path.join(res_dir, 'sfs')
  15. # new location of avatar, see #50
  16. self.avt_dir = os.path.join(res_dir, 'avatar')
  17. self.avt_db = avt_db
  18. self._use_avt = True
  19. if os.path.isdir(self.avt_dir) and len(os.listdir(self.avt_dir)):
  20. self.avt_use_db = False
  21. elif self.avt_db is not None \
  22. and os.path.isfile(self.avt_db) \
  23. and glob.glob(os.path.join(self.sfs_dir, 'avatar*')):
  24. self.avt_use_db = True
  25. else:
  26. logger.warn(
  27. "Cannot find avatar files. Will not use avatar!")
  28. self._use_avt = False
  29. def get_avatar(self, username):
  30. """ username: `username` field in db.rcontact"""
  31. if not self._use_avt:
  32. return None
  33. username = ensure_unicode(username)
  34. avtid = md5(username.encode('utf-8'))
  35. dir1, dir2 = avtid[:2], avtid[2:4]
  36. candidates = glob.glob(os.path.join(self.avt_dir, dir1, dir2, f"*{avtid}*"))
  37. default_candidate = os.path.join(self.avt_dir, dir1, dir2, f"user_{avtid}.png")
  38. candidates.append(default_candidate)
  39. def priority(s):
  40. if "_hd" in s and s.endswith(".png"):
  41. return 10
  42. else:
  43. return 1
  44. candidates = sorted(set(candidates), key=priority, reverse=True)
  45. for cand in candidates:
  46. try:
  47. if self.avt_use_db:
  48. pos, size = self.query_index(cand)
  49. return self.read_img(pos, size)
  50. else:
  51. if os.path.exists(cand):
  52. if cand.endswith(".bm"):
  53. return self.read_bm_file(cand)
  54. else:
  55. return Image.open(cand)
  56. except Exception:
  57. logger.exception("HHH")
  58. pass
  59. logger.warning("Avatar for {} not found in avatar database.".format(username))
  60. def read_img(self, pos, size):
  61. file_idx = pos >> 32
  62. fname = os.path.join(self.sfs_dir,
  63. 'avatar.block.' + '{:05d}'.format(file_idx))
  64. # a 64-byte offset of each block file
  65. start_pos = pos - file_idx * (2**32) + 64
  66. try:
  67. with open(fname, 'rb') as f:
  68. f.seek(start_pos)
  69. data = f.read(size)
  70. im = Image.open(io.BytesIO(data))
  71. return im
  72. except IOError as e:
  73. logger.warn("Cannot read avatar from {}: {}".format(fname, str(e)))
  74. return None
  75. def read_bm_file(self, fname):
  76. # history at https://github.com/ppwwyyxx/wechat-dump/pull/14
  77. with open(fname, 'rb') as f:
  78. # filesize is 36880=96x96x4+16
  79. size = (96, 96, 3)
  80. img = np.zeros(size, dtype='uint8')
  81. for i in range(96):
  82. for j in range(96):
  83. r, g, b, a = f.read(4)
  84. img[i,j] = (r, g, b)
  85. return Image.fromarray(img, mode="RGB")
  86. def query_index(self, filename):
  87. conn = sqlite3.connect(self.avt_db)
  88. cursor = conn.execute("select Offset,Size from Index_avatar where FileName='{}'".format(filename))
  89. pos, size = cursor.fetchone()
  90. return pos, size
  91. if __name__ == '__main__':
  92. import sys
  93. r = AvatarReader(sys.argv[1], sys.argv[2])
  94. print(r.get_avatar(sys.argv[3]))