瀏覽代碼

decrypt local emoji

Yuxin Wu 5 年之前
父節點
當前提交
641af7a3cd
共有 8 個文件被更改,包括 246 次插入161 次删除
  1. 2 2
      README.md
  2. 1 1
      dump-html.py
  3. 1 1
      wechat/__init__.py
  4. 2 0
      wechat/common/procutil.py
  5. 4 4
      wechat/common/textutil.py
  6. 211 0
      wechat/emoji.py
  7. 17 15
      wechat/parser.py
  8. 8 138
      wechat/res.py

+ 2 - 2
README.md

@@ -16,9 +16,9 @@ If it doesn't work, you probably have to investigate it as the behavior may be d
 + adb and rooted android phone connected to a Linux/Mac OSX/Win10+Bash.
   If the phone does not come with adb support, you can download an app such as https://play.google.com/store/apps/details?id=eu.chainfire.adbd
 + Python >= 3.6
-+ [PyQuery](https://pypi.python.org/pypi/pyquery/), [javaobj-py3](https://pypi.org/project/javaobj-py3), Pillow, requests
++ [PyQuery](https://pypi.python.org/pypi/pyquery/), [javaobj-py3](https://pypi.org/project/javaobj-py3), [PyCryptodome](https://github.com/Legrandin/pycryptodome) Pillow, requests
 + [sqlcipher](https://github.com/sqlcipher/sqlcipher) >= 4.1, [pysqlcipher3](https://pypi.python.org/pypi/pysqlcipher3)
-+ sox, openssl (command line tools)
++ sox (command line tools)
 + csscompressor (suggested, optional)
 + Silk audio decoder (included; build with `./third-party/compile_silk.sh`)
 

+ 1 - 1
dump-html.py

@@ -55,4 +55,4 @@ if __name__ == '__main__':
         for idx, html in enumerate(htmls):
             with open(basename + f'{idx:02d}.html', 'w') as f:
                 f.write(html)
-    res.emoji_cache.flush()
+    res.emoji_reader.flush_cache()

+ 1 - 1
wechat/__init__.py

@@ -26,5 +26,5 @@ set_level_color(logging.ERROR, '\033[1;31m')
 
 if __name__ == '__main__':
     logger.info("info")
-    logger.warn("warn")
+    logger.warning("warn")
 

+ 2 - 0
wechat/common/procutil.py

@@ -1,6 +1,8 @@
 # -*- coding: utf-8 -*-
 
 import subprocess
+import logging
+logger = logging.getLogger(__name__)
 
 def subproc_call(cmd, timeout=None):
     """

+ 4 - 4
wechat/common/textutil.py

@@ -17,12 +17,12 @@ def md5(s):
     return m.hexdigest()
 
 def get_file_b64(fname):
-    data = open(fname, 'rb').read()
-    return base64.b64encode(data).decode('ascii')
+    with open(fname, 'rb') as f:
+        return base64.b64encode(f.read()).decode('ascii')
 
 def get_file_md5(fname):
-    data = open(fname, 'rb').read()
-    return md5(data)
+    with open(fname, 'rb') as f:
+        return md5(f.read())
 
 def safe_filename(fname):
     filename = ensure_unicode(fname)

+ 211 - 0
wechat/emoji.py

@@ -0,0 +1,211 @@
+import os
+from pathlib import Path
+import logging
+import tempfile
+import io
+import requests
+import base64
+import imghdr
+from PIL import Image
+import pickle
+from Crypto.Cipher import AES
+
+from .parser import WeChatDBParser
+from .common.textutil import md5 as get_md5_hex, get_file_b64, get_file_md5
+
+
+LIB_PATH = os.path.dirname(os.path.abspath(__file__))
+DEFAULT_EMOJI_CACHE = os.path.join(LIB_PATH, '..', 'emoji.cache.pkl')
+logger = logging.getLogger(__name__)
+
+
+def _get_aes_key(md5):
+    # ascii representation of the first half of md5 is used as aes key
+    assert len(md5) == 32
+    return md5[:16].encode('ascii')
+    # ret = ""
+    # for ch in md5[:16]:
+        # ret += format(ord(ch), 'x')
+    # return ret
+
+
+class EmojiReader:
+    def __init__(self,
+        resource_dir: str,
+        parser: WeChatDBParser,
+        cache_file: str=None):
+        """
+        Args:
+            resource_dir: path to resource/
+            parser: Database parser
+            cache_file: a cache file to store emoji downloaded from URLs.
+                default to a emoji.cache.pkl file under wechat-dump.
+        """
+        self.emoji_dir = Path(resource_dir) / 'emoji'
+        assert self.emoji_dir.is_dir(), self.emoji_dir
+        self.parser = parser
+        self.emoji_info = parser.emoji_info or {}
+        # mapping from md5 to the (cdnurl, encrypturl, aeskey)
+        # columns in EmojiInfo table.
+        self.cache_file = cache_file or DEFAULT_EMOJI_CACHE
+
+        # cache stores md5 -> (base64str, format)
+        if os.path.isfile(self.cache_file):
+            with open(self.cache_file, "rb") as f:
+                self._cache = pickle.load(f)
+        else:
+            self._cache = {}
+        self._cache_size = len(self._cache)
+        self.encryption_key = parser.get_emoji_encryption_key()
+        if self.encryption_key is not None:
+            self.encryption_key = _get_aes_key(self.encryption_key)
+
+    def get_emoji(self, md5):
+        """ Returns: (b64 encoded img string, format) """
+
+        assert md5, f"Invalid md5 {md5}!"
+        # check cache
+        img, format = self._cache_query(md5)
+        if format:
+            return img, format
+
+        # check resource/
+        subdir = self.parser.emoji_groups.get(md5, '')
+        dir_to_search = self.emoji_dir / subdir
+        img, format = self._search_in_res(dir_to_search, md5, False)
+        if format:
+            return img, format
+
+        emoji_info = self.emoji_info.get(md5, None)
+        if emoji_info:
+            catalog, cdnurl, encrypturl, aeskey = emoji_info
+            img, format = self._fetch(md5, cdnurl, encrypturl, aeskey)
+            if format:
+                return img, format
+
+        img, format = self._search_in_res(dir_to_search, md5, True)
+        if format:
+            logger.info(f"Using fallback for emoji {md5}")
+            return img, format
+        else:
+            emoji_in_table = emoji_info is not None
+            msg = "not in database" if not emoji_in_table else f"group={subdir}"
+            logger.warning(f"Cannot find emoji {md5}: {msg}")
+            return None, None
+
+    def _cache_query(self, md5):
+        data, format = self._cache.get(md5, (None, None))
+        if data is not None and not isinstance(data, str):
+            data = data.decode('ascii')
+        return data, format
+
+    def _cache_add(self, md5, values):
+        self._cache[md5] = values
+        if len(self._cache) >= self._cache_size + 15:
+            self.flush_cache()
+
+    def flush_cache(self):
+        if len(self._cache) > self._cache_size:
+            self._cache_size = len(self._cache)
+            with open(self.cache_file, 'wb') as f:
+                pickle.dump(self._cache, f, protocol=-1)
+
+    def _search_in_res(self, dir, md5, allow_fallback=False):
+        if allow_fallback:
+            candidates = dir.glob(f'{md5}*')
+            # There are misc low-quality matches, e.g.:
+            # 'md5_{0..15}' for each frame of gif, non-animated md5_thumb, md5_cover
+            # candidates = [k for k in candidates if not re.match('.*_[0-9]+$', k)]
+            # candidates = [k for k in candidates if (not k.endswith('_cover') and not k.endswith('_thumb')))]
+        else:
+            if (dir / md5).is_file():
+                candidates = [dir / md5]
+            else:
+                candidates = []
+
+        def get_data_no_fallback(fname):
+            if imghdr.what(fname):
+                data_md5 = get_file_md5(fname)
+                if data_md5 == md5:
+                    return get_file_b64(fname), imghdr.what(fname)
+
+            content = self._decrypt_emoji(fname)
+            try:
+                data_md5 = get_md5_hex(content)
+                assert data_md5 == md5
+                im = Image.open(io.BytesIO(content))
+                return (base64.b64encode(content).decode('ascii'), im.format.lower())
+            except:
+                logger.exception(f"Error decrypting emoji {fname}.")
+
+        def get_data_fallback(fname):
+            if not imghdr.what(fname):
+                return  # fallback files are not encrypted
+            return get_file_b64(fname), imghdr.what(fname)
+
+        get_data_func = get_data_fallback if allow_fallback else get_data_no_fallback
+        results = [(x, get_data_func(x)) for x in candidates]
+        results = [(a, b) for a, b in results if b is not None]
+        # maybe sort candidates by heuristics?
+        if len(results):
+            return results[0][1]
+        return (None, None)
+
+    def _decrypt_emoji(self, fname):
+        cipher = AES.new(self.encryption_key, AES.MODE_ECB)
+        with open(fname, 'rb') as f:
+            head = f.read(1024)
+            plain_head = cipher.decrypt(head)
+            data = plain_head + f.read()
+        return data
+
+    def _fetch(self, md5, cdnurl, encrypturl, aeskey):
+        ret = None
+        if cdnurl:
+            try:
+                logger.info("Requesting emoji {} from {} ...".format(md5, cdnurl))
+                r = requests.get(cdnurl).content
+                emoji_md5 = get_md5_hex(r)
+                im = Image.open(io.BytesIO(r))
+                ret = (base64.b64encode(r).decode('ascii'), im.format.lower())
+                if emoji_md5 == md5:
+                    self._cache_add(md5, ret)
+                    return ret
+                else:
+                    raise ValueError("Emoji MD5 from CDNURL does not match")
+            except Exception:
+                logger.debug("Error processing cdnurl {}".format(cdnurl))
+
+        if encrypturl:
+            try:
+                logger.info("Requesting encrypted emoji {} from {} ...".format(md5, encrypturl))
+                buf = requests.get(encrypturl).content
+                aeskey = bytes.fromhex(aeskey)
+                cipher = AES.new(aeskey, AES.MODE_CBC, iv=aeskey)
+                decrypted_buf = cipher.decrypt(buf)
+
+                im = Image.open(io.BytesIO(decrypted_buf))
+                ret = (base64.b64encode(decrypted_buf).decode('ascii'), im.format.lower())
+                self._cache_add(md5, ret)
+                return ret
+            except Exception:
+                logger.exception("Error processing encrypturl {}".format(encrypturl))
+        if ret is not None:
+            # ret may become something with wrong md5. Try it anyway, but don't cache.
+            return ret
+        return None, None
+
+if __name__ == "__main__":
+    logger.setLevel(logging.DEBUG)
+    handler = logging.StreamHandler()
+    logger.addHandler(handler)
+
+    class Dummy():
+        def _cache_add(self, md5, ret):
+            pass
+    # test decryption
+    md5 = '5a7fc462d63ef845e6d99c1523bbc91e'
+    encurl = 'http://emoji.qpic.cn/wx_emoji/CQmBgayyMuvscRVEKN9s4HyTjKVU9iacqqhyCpdtqOVcCql5JaibjDFg/'
+    enckey = '8ba7f51f9f3ac58cf8ed937fc90200a6'
+    b64, format = EmojiReader._fetch(Dummy(), md5, None, encurl, enckey)
+    print("format=", format)

+ 17 - 15
wechat/parser.py

@@ -31,8 +31,9 @@ class WeChatDBParser(object):
         self.contacts_rev = defaultdict(list)
         self.msgs_by_chat = defaultdict(list)
         self.emoji_groups = {}
-        self.emoji_url = {}
+        self.emoji_info = {}
         self.internal_emojis = {}
+        self.emoji_encryption_key = None
         self._parse()
 
     def _parse_contact(self):
@@ -94,30 +95,22 @@ SELECT {} FROM message
 
     def _parse_emoji(self):
         # wechat provided emojis
-        emojiinfo_q = self.cc.execute(
+        query = self.cc.execute(
 """ SELECT md5, groupid FROM EmojiInfoDesc """)
-        for row in emojiinfo_q:
+        for row in query:
             md5, group = row
             self.emoji_groups[md5] = group
 
-        HAS_EMOJI_CATALOG = [49, 50, 17]  # these are included in static/
         try:
-            emojiinfo_q = self.cc.execute(
+            query = self.cc.execute(
     """ SELECT md5, catalog, name, cdnUrl, encrypturl, aeskey FROM EmojiInfo""")
         except: # old database does not have cdnurl
-            emojiinfo_q = self.cc.execute(
-    """ SELECT md5, catalog, name FROM EmojiInfo""")
-            for row in emojiinfo_q:
-                md5, catalog, name = row
-                if name and catalog in HAS_EMOJI_CATALOG:
-                    self.internal_emojis[md5] = name
+            pass
         else:
-            for row in emojiinfo_q:
+            for row in query:
                 md5, catalog, name, cdnUrl, encrypturl, aeskey = row
                 if cdnUrl or encrypturl:
-                    self.emoji_url[md5] = (cdnUrl, encrypturl, aeskey)
-                if name and catalog in HAS_EMOJI_CATALOG:
-                    self.internal_emojis[md5] = name
+                    self.emoji_info[md5] = (catalog, cdnUrl, encrypturl, aeskey)
 
 
     def _parse(self):
@@ -127,6 +120,15 @@ SELECT {} FROM message
         self._parse_imginfo()
         self._parse_emoji()
 
+    def get_emoji_encryption_key(self):
+        # obtain local encryption key in a special entry in the database
+        query = self.cc.execute("SELECT md5 FROM EmojiInfo where catalog == 153")
+        results = list(query)
+        if len(results):
+            assert len(results) == 1, "Found > 1 encryption keys in EmojiInfo. This is a bug!"
+            return results[0][0]
+        return None
+
     # process the values in a row
     def _parse_msg_row(self, row):
         """ parse a record of message into my format"""

+ 8 - 138
wechat/res.py

@@ -4,7 +4,6 @@ import glob
 import os
 import re
 from PIL import Image
-import tempfile
 import io
 import base64
 import logging
@@ -12,18 +11,15 @@ logger = logging.getLogger(__name__)
 import imghdr
 from multiprocessing import Pool
 import atexit
-import pickle
-import requests
 
+from .emoji import EmojiReader
 from .avatar import AvatarReader
-from .common.textutil import md5 as get_md5_hex, get_file_b64, get_file_md5
-from .common.procutil import subproc_succ
+from .common.textutil import md5 as get_md5_hex, get_file_b64
 from .common.timer import timing
 from .msg import TYPE_SPEAK
 from .audio import parse_wechat_audio_file
 
 LIB_PATH = os.path.dirname(os.path.abspath(__file__))
-INTERNAL_EMOJI_DIR = os.path.join(LIB_PATH, 'static', 'internal_emoji')
 VOICE_DIRNAME = 'voice2'
 IMG_DIRNAME = 'image2'
 EMOJI_DIRNAME = 'emoji'
@@ -31,73 +27,6 @@ VIDEO_DIRNAME = 'video'
 
 JPEG_QUALITY = 50
 
-class EmojiCache(object):
-    def __init__(self, fname):
-        self.fname = fname
-        if os.path.isfile(fname):
-            with open(fname, 'rb') as f:
-                self.dic = pickle.load(f)
-        else:
-            self.dic = {}
-
-        self._curr_size = len(self.dic)
-
-    def query(self, md5):
-        data, format = self.dic.get(md5, (None, None))
-        if data is not None and not isinstance(data, str):
-            data = data.decode('ascii')
-        return data, format
-
-    def fetch(self, md5, urls):
-        cdnurl, encrypturl, aeskey = urls
-        ret = None
-        if cdnurl:
-            try:
-                logger.info("Requesting emoji {} from {} ...".format(md5, cdnurl))
-                r = requests.get(cdnurl).content
-                emoji_md5 = get_md5_hex(r)
-                im = Image.open(io.BytesIO(r))
-                ret = (base64.b64encode(r).decode('ascii'), im.format.lower())
-                if emoji_md5 == md5:
-                    self.add(md5, ret)
-                    return ret
-                else:
-                    raise ValueError("Emoji MD5 from CDNURL does not match")
-            except Exception:
-                logger.exception("Error processing cdnurl {}".format(cdnurl))
-
-        if encrypturl:
-            try:
-                logger.info("Requesting encrypted emoji {} from {} ...".format(md5, encrypturl))
-                buf = requests.get(encrypturl).content
-                with tempfile.TemporaryDirectory(prefix="wechat_dump_download") as d:
-                    fname = os.path.join(d, md5)
-                    with open(fname, 'wb') as f:
-                        f.write(buf)
-                    cmd = f"openssl enc -d -aes-128-cbc -in {fname} -K {aeskey} -iv {aeskey}"
-                    decrypted_buf = subproc_succ(cmd)
-                im = Image.open(io.BytesIO(decrypted_buf))
-                ret = (base64.b64encode(decrypted_buf).decode('ascii'), im.format.lower())
-                self.add(md5, ret)
-                return ret
-            except Exception:
-                logger.exception("Error processing encrypturl {}".format(encrypturl))
-        if ret is not None:
-            # ret may become something with wrong md5. Try it anyway, but don't cache.
-            return ret
-        return None, None
-
-    def add(self, md5, values):
-        self.dic[md5] = values
-        if len(self.dic) >= self._curr_size + 10:
-            self.flush()
-
-    def flush(self):
-        if len(self.dic) > self._curr_size:
-            self._curr_size = len(self.dic)
-            with open(self.fname, 'wb') as f:
-                pickle.dump(self.dic, f)
-
 class Resource(object):
     """ multimedia resources in chat"""
     def __init__(self, parser, res_dir, avt_db):
@@ -106,19 +35,16 @@ class Resource(object):
             assert os.path.isdir(dir_to_check), f"No such directory: {dir_to_check}"
         [check(k) for k in ['', IMG_DIRNAME, EMOJI_DIRNAME, VOICE_DIRNAME]]
 
-        self.emoji_cache = EmojiCache(
-                os.path.join(os.path.dirname(os.path.abspath(__file__)),
-                    '..', 'emoji.cache'))
         self.res_dir = res_dir
         self.parser = parser
         self.voice_cache_idx = {}
         self.img_dir = os.path.join(res_dir, IMG_DIRNAME)
         self.voice_dir = os.path.join(res_dir, VOICE_DIRNAME)
-        self.emoji_dir = os.path.join(res_dir, EMOJI_DIRNAME)
         self.video_dir = os.path.join(res_dir, VIDEO_DIRNAME)
         self.avt_reader = AvatarReader(res_dir, avt_db)
+        self.emoji_reader = EmojiReader(res_dir, self.parser)
 
-    def get_voice_filename(self, imgpath):
+    def _get_voice_filename(self, imgpath):
         fname = get_md5_hex(imgpath.encode('ascii'))
         dir1, dir2 = fname[:2], fname[2:4]
         ret = os.path.join(self.voice_dir, dir1, dir2,
@@ -133,7 +59,7 @@ class Resource(object):
         idx = self.voice_cache_idx.get(imgpath)
         if idx is None:
             return parse_wechat_audio_file(
-                self.get_voice_filename(imgpath))
+                self._get_voice_filename(imgpath))
         return self.voice_cache[idx].get()
 
     def cache_voice_mp3(self, msgs):
@@ -145,7 +71,7 @@ class Resource(object):
         pool = Pool(3)
         atexit.register(lambda x: x.terminate(), pool)
         self.voice_cache = [pool.apply_async(parse_wechat_audio_file,
-                                             (self.get_voice_filename(k),)) for k in voice_paths]
+                                             (self._get_voice_filename(k),)) for k in voice_paths]
 
     def get_avatar(self, username):
         """ return base64 unicode string"""
@@ -228,65 +154,9 @@ class Resource(object):
             return big_file
         return get_jpg_b64(small_file)
 
-    def _get_res_emoji(self, md5, pack_id, fallback=False):
-        """
-        pack_id: can be None
-        fallback:
-            1. allow cover/thumb that are non-animated.
-            2. allow file to have mismatch md5 (often non-animated cover as well)
-        """
-        path = os.path.join(self.emoji_dir, pack_id or '')
-        candidates = glob.glob(os.path.join(path, '{}*'.format(md5)))
-        candidates = [k for k in candidates if not re.match('.*_[0-9]+$', k)]
-        candidates = [k for k in candidates if (fallback or (not k.endswith('_cover') and not k.endswith('_thumb')))]
-
-        for cand in candidates:
-            if imghdr.what(cand):  # does not recognize
-                if not fallback:
-                    emoji_md5 = get_file_md5(cand)
-                    if emoji_md5 != md5:
-                        continue
-                return get_file_b64(cand), imghdr.what(cand)
-        return None, None
-
-    def _get_internal_emoji(self, fname):
-        f = os.path.join(INTERNAL_EMOJI_DIR, fname)
-        return get_file_b64(f), imghdr.what(f)
-
     def get_emoji_by_md5(self, md5):
-        """ :returns: (b64 unicode img, format)"""
-        assert md5, md5
-        if md5 in self.parser.internal_emojis:
-            emoji_img, format = self._get_internal_emoji(self.parser.internal_emojis[md5])
-            return emoji_img, format
-        else:
-            # check cache
-            img, format = self.emoji_cache.query(md5)
-            if format:
-                return img, format
-
-            # check resource/emoji/ dir
-            group = self.parser.emoji_groups.get(md5, None)
-            emoji_img, format = self._get_res_emoji(md5, group)
-            if format:
-                return emoji_img, format
-
-            # check url
-            urls = self.parser.emoji_url.get(md5, None)
-            if urls:
-                emoji_img, format = self.emoji_cache.fetch(md5, urls)
-                if format:
-                    return emoji_img, format
-
-            # check resource/emoji dir again, with fallback
-            emoji_img, format = self._get_res_emoji(md5, group, fallback=True)
-            if format:
-                logger.info(f"Using fallback for emoji {md5}")
-                return emoji_img, format
-
-            # TODO: first 1k in emoji is encrypted
-            logger.warning("Cannot get emoji {} in group {}".format(md5, group))
-            return None, None
+        """ Returns: (b64 encoded img string, format) """
+        return self.emoji_reader.get_emoji(md5)
 
     def get_video(self, videoid):
         video_file = os.path.join(self.video_dir, videoid + ".mp4")