Browse Source

support wxgf emoji

Yuxin Wu 1 năm trước cách đây
mục cha
commit
b271469b94

+ 0 - 1
dump-html.py

@@ -1,5 +1,4 @@
 #!/usr/bin/env python3
 #!/usr/bin/env python3
-# -*- coding: UTF-8 -*-
 import os
 import os
 import sys
 import sys
 import argparse
 import argparse

+ 16 - 8
wechat/emoji.py

@@ -1,7 +1,6 @@
 import os
 import os
 from pathlib import Path
 from pathlib import Path
 import logging
 import logging
-import tempfile
 import io
 import io
 import requests
 import requests
 import base64
 import base64
@@ -10,6 +9,7 @@ from PIL import Image
 import pickle
 import pickle
 from Crypto.Cipher import AES
 from Crypto.Cipher import AES
 
 
+from .wxgf import WxgfAndroidDecoder, is_wxgf_buffer
 from .parser import WeChatDBParser
 from .parser import WeChatDBParser
 from .common.textutil import md5 as get_md5_hex, get_file_b64, get_file_md5
 from .common.textutil import md5 as get_md5_hex, get_file_b64, get_file_md5
 
 
@@ -23,21 +23,20 @@ def _get_aes_key(md5):
     # ascii representation of the first half of md5 is used as aes key
     # ascii representation of the first half of md5 is used as aes key
     assert len(md5) == 32
     assert len(md5) == 32
     return md5[:16].encode('ascii')
     return md5[:16].encode('ascii')
-    # ret = ""
-    # for ch in md5[:16]:
-        # ret += format(ord(ch), 'x')
-    # return ret
 
 
 
 
 class EmojiReader:
 class EmojiReader:
     def __init__(self,
     def __init__(self,
         resource_dir: str,
         resource_dir: str,
         parser: WeChatDBParser,
         parser: WeChatDBParser,
+        *,
+        wxgf_decoder: WxgfAndroidDecoder,
         cache_file: str=None):
         cache_file: str=None):
         """
         """
         Args:
         Args:
             resource_dir: path to resource/
             resource_dir: path to resource/
             parser: Database parser
             parser: Database parser
+            wxgf_decoder: Wxgf image decoder
             cache_file: a cache file to store emoji downloaded from URLs.
             cache_file: a cache file to store emoji downloaded from URLs.
                 default to a emoji.cache file under wechat-dump.
                 default to a emoji.cache file under wechat-dump.
         """
         """
@@ -48,6 +47,7 @@ class EmojiReader:
         # mapping from md5 to the (cdnurl, encrypturl, aeskey)
         # mapping from md5 to the (cdnurl, encrypturl, aeskey)
         # columns in EmojiInfo table.
         # columns in EmojiInfo table.
         self.cache_file = cache_file or DEFAULT_EMOJI_CACHE
         self.cache_file = cache_file or DEFAULT_EMOJI_CACHE
+        self.wxgf_decoder = wxgf_decoder
 
 
         # cache stores md5 -> (base64str, format)
         # cache stores md5 -> (base64str, format)
         if os.path.isfile(self.cache_file):
         if os.path.isfile(self.cache_file):
@@ -133,9 +133,14 @@ class EmojiReader:
                 content = self._decode_emoji(fname)
                 content = self._decode_emoji(fname)
                 data_md5 = get_md5_hex(content)
                 data_md5 = get_md5_hex(content)
                 if data_md5 != md5:
                 if data_md5 != md5:
-                    if content.startswith(b"wxgf"):
-                        raise ValueError("Unsupported mysterious image format: wxgf")
-                    raise ValueError("Decoded data mismatch md5!")
+                    if is_wxgf_buffer(content):
+                        content = self.wxgf_decoder.decode_with_cache(fname, content)
+                        if content is None:
+                            if not self.wxgf_decoder.has_server():
+                                logger.warning("wxgf decoder server is not provided. Cannot decode wxgf emojis.")
+                            raise ValueError("Failed to decode wxgf file.")
+                    else:
+                        raise ValueError("Decoded data mismatch md5!")
                 im = Image.open(io.BytesIO(content))
                 im = Image.open(io.BytesIO(content))
                 return (base64.b64encode(content).decode('ascii'), im.format.lower())
                 return (base64.b64encode(content).decode('ascii'), im.format.lower())
             except Exception as e:
             except Exception as e:
@@ -183,6 +188,9 @@ class EmojiReader:
             try:
             try:
                 logger.info("Requesting encrypted emoji {} from {} ...".format(md5, encrypturl))
                 logger.info("Requesting encrypted emoji {} from {} ...".format(md5, encrypturl))
                 buf = requests.get(encrypturl).content
                 buf = requests.get(encrypturl).content
+                if buf == b'':
+                    logger.error(f"Failed to download emoji {md5}")
+                    return None, None
                 aeskey = bytes.fromhex(aeskey)
                 aeskey = bytes.fromhex(aeskey)
                 cipher = AES.new(aeskey, AES.MODE_CBC, iv=aeskey)
                 cipher = AES.new(aeskey, AES.MODE_CBC, iv=aeskey)
                 decoded_buf = cipher.decrypt(buf)
                 decoded_buf = cipher.decrypt(buf)

+ 30 - 15
wechat/render.py

@@ -3,6 +3,7 @@
 
 
 import os
 import os
 from collections import Counter
 from collections import Counter
+from functools import lru_cache
 import glob
 import glob
 from pyquery import PyQuery
 from pyquery import PyQuery
 import logging
 import logging
@@ -35,10 +36,19 @@ TEMPLATES_FILES = {TYPE_MSG: "TP_MSG",
                    TYPE_VIDEO_FILE: "TP_VIDEO_FILE",
                    TYPE_VIDEO_FILE: "TP_VIDEO_FILE",
                    TYPE_QQMUSIC: "TP_QQMUSIC",
                    TYPE_QQMUSIC: "TP_QQMUSIC",
                   }
                   }
-TEMPLATES = {
-    k: open(os.path.join(STATIC_PATH, '{}.html'.format(v))).read()
-    for k, v in TEMPLATES_FILES.items()
-}
+
+
+@lru_cache()
+def get_template(name: str | int) -> str | None:
+    """Return the html template given a file name or msg type."""
+    if isinstance(name, int):
+        name = TEMPLATES_FILES.get(name, None)
+        if name is None:
+            return None
+    html_path = os.path.join(STATIC_PATH, f"{name}.html")
+    with open(html_path) as f:
+        return f.read()
+
 
 
 class HTMLRender(object):
 class HTMLRender(object):
     def __init__(self, parser, res=None):
     def __init__(self, parser, res=None):
@@ -107,7 +117,7 @@ class HTMLRender(object):
             format_dict['nickname'] = ' '
             format_dict['nickname'] = ' '
 
 
         def fallback():
         def fallback():
-            template = TEMPLATES[TYPE_MSG]
+            template = get_template(TYPE_MSG)
             content = msg.msg_str()
             content = msg.msg_str()
             content = self.smiley.replace_smileycode(content)
             content = self.smiley.replace_smileycode(content)
             if not msg.known_type:
             if not msg.known_type:
@@ -115,7 +125,7 @@ class HTMLRender(object):
                 content = html.escape(content)
                 content = html.escape(content)
             return template.format(content=content, **format_dict)
             return template.format(content=content, **format_dict)
 
 
-        template = TEMPLATES.get(msg.type)
+        template = get_template(msg.type)
         if msg.type == TYPE_SPEAK:
         if msg.type == TYPE_SPEAK:
             audio_str, duration = self.res.get_voice_mp3(msg.imgPath)
             audio_str, duration = self.res.get_voice_mp3(msg.imgPath)
             format_dict['voice_duration'] = duration
             format_dict['voice_duration'] = duration
@@ -140,10 +150,13 @@ class HTMLRender(object):
             jobj = json.loads(msg.msg_str())
             jobj = json.loads(msg.msg_str())
             content = f"{jobj['title']} - {jobj['singer']}"
             content = f"{jobj['title']} - {jobj['singer']}"
 
 
-            # imgPath was original THUMBNAIL_DIRPATH://th_xxxxxxxxx
-            imgpath = msg.imgPath.split('_')[-1]
-            img = self.res.get_img([imgpath])
-            format_dict['img'] = (img, 'jpeg')
+            if msg.imgPath is not None:
+                # imgPath was original THUMBNAIL_DIRPATH://th_xxxxxxxxx
+                imgpath = msg.imgPath.split('_')[-1]
+                img = self.res.get_img([imgpath])
+                format_dict['img'] = (img, 'jpeg')
+            else:
+                template = get_template("TP_QQMUSIC_NOIMG")
             return template.format(url=jobj['url'], content=content, **format_dict)
             return template.format(url=jobj['url'], content=content, **format_dict)
         elif msg.type == TYPE_EMOJI or msg.type == TYPE_CUSTOM_EMOJI:
         elif msg.type == TYPE_EMOJI or msg.type == TYPE_CUSTOM_EMOJI:
             if 'emoticonmd5' in msg.content:
             if 'emoticonmd5' in msg.content:
@@ -171,7 +184,12 @@ class HTMLRender(object):
                 return template.format(**format_dict)
                 return template.format(**format_dict)
         elif msg.type == TYPE_VIDEO_FILE:
         elif msg.type == TYPE_VIDEO_FILE:
             video = self.res.get_video(msg.imgPath)
             video = self.res.get_video(msg.imgPath)
-            if video.endswith(".mp4"):
+            if video is None:
+                logger.warning(f"Cannot find video {msg.imgPath} ({msg.createTime})")
+                # fallback
+                format_dict['content'] = f"VIDEO FILE {msg.imgPath}"
+                return get_template(TYPE_MSG).format(**format_dict)
+            elif video.endswith(".mp4"):
                 video_str = get_file_b64(video)
                 video_str = get_file_b64(video)
                 format_dict["video_str"] = video_str
                 format_dict["video_str"] = video_str
                 return template.format(**format_dict)
                 return template.format(**format_dict)
@@ -179,10 +197,7 @@ class HTMLRender(object):
                 # only has thumbnail
                 # only has thumbnail
                 image_str = get_file_b64(video)
                 image_str = get_file_b64(video)
                 format_dict["img"] = (image_str, 'jpeg')
                 format_dict["img"] = (image_str, 'jpeg')
-                return TEMPLATES[TYPE_IMG].format(**format_dict)
-            # fallback
-            format_dict['content'] = f"VIDEO FILE {msg.imgPath}"
-            return TEMPLATES_FILES[TYPE_MSG].format(**format_dict)
+                return get_template(TYPE_IMG).format(**format_dict)
         elif msg.type == TYPE_WX_VIDEO:
         elif msg.type == TYPE_WX_VIDEO:
             # TODO: fetch video from resource
             # TODO: fetch video from resource
             return fallback()
             return fallback()

+ 3 - 4
wechat/res.py

@@ -52,7 +52,7 @@ class Resource(object):
         self.video_dir = os.path.join(res_dir, VIDEO_DIRNAME)
         self.video_dir = os.path.join(res_dir, VIDEO_DIRNAME)
         self.avt_reader = AvatarReader(res_dir, avt_db)
         self.avt_reader = AvatarReader(res_dir, avt_db)
         self.wxgf_decoder = WxgfAndroidDecoder(wxgf_server)
         self.wxgf_decoder = WxgfAndroidDecoder(wxgf_server)
-        self.emoji_reader = EmojiReader(res_dir, self.parser)
+        self.emoji_reader = EmojiReader(res_dir, self.parser, wxgf_decoder=self.wxgf_decoder)
 
 
     def _get_voice_filename(self, imgpath):
     def _get_voice_filename(self, imgpath):
         fname = get_md5_hex(imgpath.encode('ascii'))
         fname = get_md5_hex(imgpath.encode('ascii'))
@@ -195,12 +195,11 @@ class Resource(object):
         """ Returns: (b64 encoded img string, format) """
         """ Returns: (b64 encoded img string, format) """
         return self.emoji_reader.get_emoji(md5)
         return self.emoji_reader.get_emoji(md5)
 
 
-    def get_video(self, videoid):
+    def get_video(self, videoid) -> str | None:
         video_file = os.path.join(self.video_dir, videoid + ".mp4")
         video_file = os.path.join(self.video_dir, videoid + ".mp4")
         video_thumbnail_file = os.path.join(self.video_dir, videoid + ".jpg")
         video_thumbnail_file = os.path.join(self.video_dir, videoid + ".jpg")
         if os.path.exists(video_file):
         if os.path.exists(video_file):
             return video_file
             return video_file
         elif os.path.exists(video_thumbnail_file):
         elif os.path.exists(video_thumbnail_file):
             return video_thumbnail_file
             return video_thumbnail_file
-        logger.warning(f"Cannot find video {videoid}")
-        return ""
+        return None

+ 15 - 0
wechat/static/TP_QQMUSIC_NOIMG.html

@@ -0,0 +1,15 @@
+<div class="chatItem {sender_label}">
+  <div class="chatItemContent">
+    <span class="avatar"></span>
+    <div class="cloud cloudText">
+      <div class="cloudPannel" title="{time}" {nickname}>
+        <div class="cloudBody">
+          <div class="cloudContent">
+            <pre style="white-space:pre-wrap">
+QQ Music:<a href="{url}" target="_blank">{content}</a> </pre>
+          </div>
+        </div>
+      </div>
+    </div>
+  </div>
+</div>

+ 3 - 0
wechat/wxgf.py

@@ -59,3 +59,6 @@ class WxgfAndroidDecoder:
 def is_wxgf_file(fname):
 def is_wxgf_file(fname):
     with open(fname, 'rb') as f:
     with open(fname, 'rb') as f:
         return f.read(4) == WXGF_HEADER
         return f.read(4) == WXGF_HEADER
+
+def is_wxgf_buffer(buf: bytes):
+    return buf[:4] == WXGF_HEADER