Sfoglia il codice sorgente

Decode WXGF locally with ffmpeg

Original PR: https://github.com/ppwwyyxx/wechat-dump/pull/110
Yuxin Wu 2 settimane fa
parent
commit
be3009fbd2
6 ha cambiato i file con 265 aggiunte e 16 eliminazioni
  1. 7 4
      README.md
  2. 1 0
      requirements.txt
  3. 102 0
      tests/test_wxgf.py
  4. 3 3
      wechat/emoji.py
  5. 3 3
      wechat/res.py
  6. 149 6
      wechat/wxgf.py

+ 7 - 4
README.md

@@ -21,6 +21,7 @@ If the tool works for you, please take a moment to add your phone/OS to [the wik
 + Python >= 3.8
 + sox (command line tools)
 + Silk audio decoder (included; build it with `./third-party/compile_silk.sh`)
++ ffmpeg (optional; to decode WXGF images locally)
 + Other python dependencies: `pip install -r requirements.txt`.
 
 #### Get Necessary Data:
@@ -40,10 +41,12 @@ If the tool works for you, please take a moment to add your phone/OS to [the wik
   	  `busybox tar` is recommended as the Android system's `tar` may choke on long paths.
 	+ In the end, we need a `resource` directory with the following subdir: `avatar,emoji,image2,sfs,video,voice2`.
 
-4. (Optional) Install and start a WXGF decoder server on an android device. Without this, certain WXGF images will not be rendered or will be rendered in low resolution.
-   See [WXGFDecoder](WXGFDecoder) for instructions.
+4. (Optional) Decode WXGF images:
+   * If `ffmpeg`/`ffprobe` are available, WXGF images/emojis are decoded locally when possible.
+   * Otherwise, you can install and start a WXGF decoder server on an android device and pass `--wxgf-server ws://xx.xx.xx.xx:xxxx`.
+     See [WXGFDecoder](WXGFDecoder) for instructions.
 
-4. (Optional) Download the emoji cache from [here](https://github.com/ppwwyyxx/wechat-dump/releases/download/0.1/emoji.cache.tar.bz2)
+5. (Optional) Download the emoji cache from [here](https://github.com/ppwwyyxx/wechat-dump/releases/download/0.1/emoji.cache.tar.bz2)
 	and decompress it under `wechat-dump`. This will avoid downloading too many emojis during rendering.
 
         wget -c https://github.com/ppwwyyxx/wechat-dump/releases/download/0.1/emoji.cache.tar.bz2
@@ -87,7 +90,7 @@ See [here](http://ppwwyyxx.com/static/wechat/example.html) for an example html.
 ### TODO List (help needed!)
 * After chat history migration, some emojis in the `EmojiInfo` table don't have corresponding URLs but only a md5 -
   they are not downloaded by WeChat until the message needs to be displayed. We don't know how to manually download these emojis.
-* Decoding WXGF images using an android app is too complex. Looking for an easier way (e.g. qemu).
+* Verify/improve host-side WXGF decoding coverage.
 * Fix rare unhandled message types: > 10000 and < 0
 * Better user experiences... see `grep 'TODO' wechat -R`
 

+ 1 - 0
requirements.txt

@@ -8,3 +8,4 @@ csscompressor
 numpy
 ipython
 websocket-client
+pytest

+ 102 - 0
tests/test_wxgf.py

@@ -0,0 +1,102 @@
+#!/usr/bin/env python3
+"""Unit tests for WXGF decoding with ffmpeg."""
+
+import os
+import sys
+import shutil
+import pytest
+from io import BytesIO
+from PIL import Image
+
+# Add parent directory to path for imports
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from wechat.wxgf import (
+    decode_wxgf_with_ffmpeg,
+    is_wxgf_buffer,
+)
+
+
+
+# Path to the test WXGF file
+TEST_WXGF_FILE = os.path.join(
+    os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
+    "WXGFDecoder", "app", "src", "main", "res", "raw", "test_wxgf.jpg"
+)
+
+
+# Fixtures
[email protected](scope="module")
+def has_ffmpeg():
+    """Check if ffmpeg and ffprobe are available."""
+    return shutil.which("ffmpeg") is not None and shutil.which("ffprobe") is not None
+
+
[email protected](scope="module")
+def has_test_file():
+    """Check if test WXGF file exists."""
+    return os.path.exists(TEST_WXGF_FILE)
+
+
[email protected](scope="module")
+def test_wxgf_data(has_test_file):
+    """Load test WXGF file data."""
+    if not has_test_file:
+        pytest.skip("Test WXGF file not available")
+    with open(TEST_WXGF_FILE, 'rb') as f:
+        return f.read()
+
+
+def test_is_wxgf_buffer():
+    """Test is_wxgf_buffer function."""
+    assert is_wxgf_buffer(b'wxgf\x00\x00\x00') is True
+    assert is_wxgf_buffer(b'\x89PNG\r\n\x1a\n') is False
+    assert is_wxgf_buffer(b'') is False
+    assert is_wxgf_buffer(b'abc') is False
+
+
[email protected](
+    not shutil.which("ffmpeg") or not shutil.which("ffprobe"),
+    reason="ffmpeg/ffprobe not available"
+)
+def test_decode_wxgf_with_ffmpeg_success(test_wxgf_data):
+    """Test successful WXGF decoding with ffmpeg."""
+    data = test_wxgf_data
+
+    # Decode the WXGF file
+    result = decode_wxgf_with_ffmpeg(data)
+
+    # Should return PNG bytes (starts with PNG signature)
+    assert result is not None, "Decoding should succeed"
+    assert isinstance(result, bytes)
+    assert result.startswith(b'\x89PNG'), "Decoded output should be a PNG file"
+
+    # PNG should have reasonable size
+    assert len(result) > 100, "PNG should have reasonable size"
+
+    img = Image.open(BytesIO(result))
+    assert img.size == (1920, 1080), f"Expected image size (1920, 1080), got {img.size}"
+
+
[email protected](
+    not shutil.which("ffmpeg") or not shutil.which("ffprobe"),
+    reason="ffmpeg/ffprobe not available"
+)
+def test_decode_wxgf_with_ffmpeg_invalid_data():
+    """Test ffmpeg decoding with invalid data."""
+    # Not a WXGF file
+    result = decode_wxgf_with_ffmpeg(b'\x89PNG\r\n\x1a\n')
+    assert result is None
+
+    # WXGF header but no valid HEVC data
+    result = decode_wxgf_with_ffmpeg(b'wxgf\x00\x00\x00\x01\x00\x00')
+    assert result is None
+
+
+def test_decode_wxgf_without_ffmpeg(test_wxgf_data):
+    """Test decoding when ffmpeg is not available."""
+    data = test_wxgf_data
+
+    # Use non-existent ffmpeg paths
+    result = decode_wxgf_with_ffmpeg(data, ffmpeg="nonexistent_ffmpeg", ffprobe="nonexistent_ffprobe")
+    assert result is None

+ 3 - 3
wechat/emoji.py

@@ -8,7 +8,7 @@ from PIL import Image
 import pickle
 from Crypto.Cipher import AES
 
-from .wxgf import WxgfAndroidDecoder, is_wxgf_buffer
+from .wxgf import WxgfDecoder, is_wxgf_buffer
 from .parser import WeChatDBParser
 from .common.imgutil import what as img_what
 from .common.textutil import md5 as get_md5_hex, get_file_b64, get_file_md5
@@ -30,7 +30,7 @@ class EmojiReader:
         resource_dir: str,
         parser: WeChatDBParser,
         *,
-        wxgf_decoder: WxgfAndroidDecoder,
+        wxgf_decoder: WxgfDecoder,
         cache_file: str=None):
         """
         Args:
@@ -137,7 +137,7 @@ class EmojiReader:
                         content = self.wxgf_decoder.decode_with_cache(fname, content)
                         if content is None:
                             if not self.wxgf_decoder.has_server():
-                                logger.warning("wxgf decoder server is not provided. Cannot decode wxgf emojis.")
+                                logger.warning("Cannot decode wxgf emojis. Install ffmpeg+ffprobe or provide a wxgf decoder server with --wxgf-server.")
                             raise ValueError("Failed to decode wxgf file.")
                     else:
                         raise ValueError("Decoded data mismatch md5!")

+ 3 - 3
wechat/res.py

@@ -17,7 +17,7 @@ from .common.imgutil import what as img_what
 from .common.textutil import md5 as get_md5_hex, get_file_b64
 from .msg import TYPE_SPEAK
 from .audio import parse_wechat_audio_file
-from .wxgf import WxgfAndroidDecoder, is_wxgf_file
+from .wxgf import WxgfDecoder, is_wxgf_file
 
 LIB_PATH = os.path.dirname(os.path.abspath(__file__))
 VOICE_DIRNAME = 'voice2'
@@ -53,7 +53,7 @@ class Resource(object):
         self.voice_dir = os.path.join(res_dir, VOICE_DIRNAME)
         self.video_dir = os.path.join(res_dir, VIDEO_DIRNAME)
         self.avt_reader = AvatarReader(res_dir, avt_db)
-        self.wxgf_decoder = WxgfAndroidDecoder(wxgf_server)
+        self.wxgf_decoder = WxgfDecoder(wxgf_server)
         self.emoji_reader = EmojiReader(res_dir, self.parser, wxgf_decoder=self.wxgf_decoder)
 
     def _get_voice_filename(self, imgpath):
@@ -179,7 +179,7 @@ class Resource(object):
             buf = self.wxgf_decoder.decode_with_cache(img_file, None)
             if buf is None:
                 if not self.wxgf_decoder.has_server():
-                    logger.warning("wxgf decoder server is not provided. Cannot decode wxgf images. Please follow instructions to create wxgf decoder server if these images need to be decoded.")
+                    logger.warning("Cannot decode wxgf images. Install ffmpeg+ffprobe or provide a wxgf decoder server with --wxgf-server.")
                 else:
                     logger.error("Failed to decode wxgf file: {}".format(img_file))
                 return None

+ 149 - 6
wechat/wxgf.py

@@ -1,6 +1,8 @@
 from websocket import create_connection
 import os
 import logging
+import shutil
+import subprocess
 
 
 logger = logging.getLogger(__name__)
@@ -8,8 +10,148 @@ logger = logging.getLogger(__name__)
 WXGF_HEADER = b'wxgf'
 FAILURE_MESSAGE = b'FAILED'
 
-
-class WxgfAndroidDecoder:
+_HEVC_START_CODE_4 = b"\x00\x00\x00\x01"
+_HEVC_START_CODE_3 = b"\x00\x00\x01"
+
+
+def extract_hevc_bitstream_from_wxgf(data: bytes) -> bytes | None:
+    """Extract Annex-B HEVC bitstream from WXGF container.
+
+    Returns:
+        HEVC bitstream bytes starting with a start-code, or None if unknown format.
+    """
+    if not data.startswith(WXGF_HEADER):
+        return None
+    start = data.find(_HEVC_START_CODE_4)
+    if start < 0:
+        start = data.find(_HEVC_START_CODE_3)
+    if start < 0:
+        return None
+    return data[start:]
+
+
+def _subprocess_run_bytes(cmd: list[str], *, stdin: bytes) -> bytes | None:
+    try:
+        p = subprocess.run(
+            cmd,
+            input=stdin,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            check=False,
+        )
+    except FileNotFoundError:
+        return None
+    if p.returncode != 0:
+        logger.debug(
+            "Command failed (%s): rc=%d stderr=%s",
+            " ".join(cmd),
+            p.returncode,
+            p.stderr[:2000].decode("utf-8", errors="replace"),
+        )
+        return None
+    return p.stdout
+
+
+def _ffprobe_count_frames_hevc(hevc: bytes, *, ffprobe: str = "ffprobe") -> int | None:
+    out = _subprocess_run_bytes(
+        [
+            ffprobe,
+            "-v",
+            "error",
+            "-count_frames",
+            "-select_streams",
+            "v:0",
+            "-show_entries",
+            "stream=nb_read_frames",
+            "-of",
+            "default=nw=1:nk=1",
+            "-f",
+            "hevc",
+            "-i",
+            "pipe:0",
+        ],
+        stdin=hevc,
+    )
+    if out is None:
+        return None
+    try:
+        return int(out.strip().splitlines()[-1])
+    except Exception:
+        return None
+
+
+def decode_wxgf_with_ffmpeg(
+    data: bytes,
+    *,
+    ffmpeg: str = "ffmpeg",
+    ffprobe: str = "ffprobe",
+) -> bytes | None:
+    """Decode WXGF into a standard image/animation using ffmpeg.
+
+    Args:
+        ffmpeg, ffprobe: path to ffmpeg and ffprobe executables.
+
+    Returns:
+        - PNG bytes for 1-frame WXGF
+        - GIF bytes for multi-frame WXGF
+        - None if decoding fails or ffmpeg/ffprobe is unavailable.
+    """
+    if shutil.which(ffmpeg) is None or shutil.which(ffprobe) is None:
+        return None
+    hevc = extract_hevc_bitstream_from_wxgf(data)
+    if hevc is None:
+        return None
+
+    frames = _ffprobe_count_frames_hevc(hevc, ffprobe=ffprobe)
+    if frames is not None and frames > 1:
+        # Use palettegen/paletteuse for higher-quality gifs.
+        out = _subprocess_run_bytes(
+            [
+                ffmpeg,
+                "-hide_banner",
+                "-loglevel",
+                "error",
+                "-f",
+                "hevc",
+                "-i",
+                "pipe:0",
+                "-filter_complex",
+                "[0:v]split[s0][s1];[s0]palettegen[p];[s1][p]paletteuse",
+                "-loop",
+                "0",
+                "-f",
+                "gif",
+                "-",
+            ],
+            stdin=hevc,
+        )
+        if out is not None:
+            return out
+
+    # Default: decode the first frame to PNG (keeps quality and alpha).
+    return _subprocess_run_bytes(
+        [
+            ffmpeg,
+            "-hide_banner",
+            "-loglevel",
+            "error",
+            "-f",
+            "hevc",
+            "-i",
+            "pipe:0",
+            "-frames:v",
+            "1",
+            "-f",
+            "image2pipe",
+            "-vcodec",
+            "png",
+            "-",
+        ],
+        stdin=hevc,
+    )
+
+
+class WxgfDecoder:
 
     def __init__(self, server: str | None):
         """server: hostname:port"""
@@ -27,7 +169,7 @@ class WxgfAndroidDecoder:
     def has_server(self) -> bool:
         return hasattr(self, 'ws')
 
-    def decode(self, data: bytes) -> bytes | None:
+    def decode_with_server(self, data: bytes) -> bytes | None:
         assert data[:4] == WXGF_HEADER, data[:20]
         try:
             self.ws.send(data, opcode=0x2)
@@ -62,9 +204,10 @@ class WxgfAndroidDecoder:
             with open(out_fname, 'rb') as f:
                 return f.read()
 
-        if not self.has_server():
-            return None
-        res = self.decode(data)
+        # Prefer host-side decoding via ffmpeg to avoid Android dependencies.
+        res = decode_wxgf_with_ffmpeg(data)
+        if res is None and self.has_server():
+            res = self.decode_with_server(data)
 
         if res is not None:
             with open(out_fname, 'wb') as f: