|
|
@@ -26,6 +26,7 @@ EMOJI_DIRNAME = 'emoji'
|
|
|
VIDEO_DIRNAME = 'video'
|
|
|
|
|
|
JPEG_QUALITY = 50
|
|
|
+THUMB_JPEG_QUALITY = 35
|
|
|
|
|
|
class Resource(object):
|
|
|
""" Multimedia resources parser."""
|
|
|
@@ -160,49 +161,72 @@ class Resource(object):
|
|
|
"""
|
|
|
fnames = [k for k in fnames if k] # filter out empty string
|
|
|
big_file, small_file = self._get_img_file(fnames)
|
|
|
-
|
|
|
- def get_jpg_b64(img_file):
|
|
|
- if not img_file:
|
|
|
- return None
|
|
|
-
|
|
|
- # True jpeg. Simplest case.
|
|
|
- if img_file.endswith('jpg') and \
|
|
|
- img_what(img_file) == 'jpeg':
|
|
|
- return get_file_b64(img_file)
|
|
|
-
|
|
|
- if is_wxgf_file(img_file):
|
|
|
- start = time.time()
|
|
|
- buf = self.wxgf_decoder.decode_with_cache(img_file, None)
|
|
|
- if buf is None:
|
|
|
- if not self.wxgf_decoder.has_server():
|
|
|
- logger.warning("wxgf decoder server is not provided. Cannot decode wxgf images. Please follow instructions to create wxgf decoder server if these images need to be decoded.")
|
|
|
- else:
|
|
|
- logger.error("Failed to decode wxgf file: {}".format(img_file))
|
|
|
- return None
|
|
|
+ big_file = self._img_file_to_jpg_b64(big_file)
|
|
|
+ if big_file:
|
|
|
+ return big_file
|
|
|
+ return self._img_file_to_jpg_b64(small_file)
|
|
|
+
|
|
|
+ def _img_file_to_jpg_b64(self, img_file: str, *, max_size: int | None = None, quality: int = JPEG_QUALITY) -> str | None:
|
|
|
+ if not img_file:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # True jpeg. Simplest case. Avoid re-compressing.
|
|
|
+ if max_size is None and img_file.endswith('jpg') and img_what(img_file) == 'jpeg':
|
|
|
+ return get_file_b64(img_file)
|
|
|
+
|
|
|
+ if is_wxgf_file(img_file):
|
|
|
+ start = time.time()
|
|
|
+ buf = self.wxgf_decoder.decode_with_cache(img_file, None)
|
|
|
+ if buf is None:
|
|
|
+ if not self.wxgf_decoder.has_server():
|
|
|
+ logger.warning("wxgf decoder server is not provided. Cannot decode wxgf images. Please follow instructions to create wxgf decoder server if these images need to be decoded.")
|
|
|
else:
|
|
|
- elapsed = time.time() - start
|
|
|
- if elapsed > 0.01 and self.wxgf_decoder.has_server():
|
|
|
- logger.info(f"Decoded {img_file} in {elapsed:.2f} seconds")
|
|
|
+ logger.error("Failed to decode wxgf file: {}".format(img_file))
|
|
|
+ return None
|
|
|
else:
|
|
|
- with open(img_file, 'rb') as f:
|
|
|
- buf = f.read()
|
|
|
-
|
|
|
- # File is not actually jpeg. Convert.
|
|
|
- if img_what(file=None, h=buf) != 'jpeg':
|
|
|
- try:
|
|
|
- im = Image.open(io.BytesIO(buf))
|
|
|
- except:
|
|
|
- return None
|
|
|
- else:
|
|
|
- bufio = io.BytesIO()
|
|
|
- im.convert('RGB').save(bufio, 'JPEG', quality=JPEG_QUALITY)
|
|
|
- buf = bufio.getvalue()
|
|
|
+ elapsed = time.time() - start
|
|
|
+ if elapsed > 0.01 and self.wxgf_decoder.has_server():
|
|
|
+ logger.info(f"Decoded {img_file} in {elapsed:.2f} seconds")
|
|
|
+ else:
|
|
|
+ with open(img_file, "rb") as f:
|
|
|
+ buf = f.read()
|
|
|
+
|
|
|
+ # If we don't need resize/convert and it's already jpeg, avoid re-compressing.
|
|
|
+ if max_size is None and img_what(file=None, h=buf) == 'jpeg':
|
|
|
return base64.b64encode(buf).decode('ascii')
|
|
|
|
|
|
- big_file = get_jpg_b64(big_file)
|
|
|
- if big_file:
|
|
|
- return big_file
|
|
|
- return get_jpg_b64(small_file)
|
|
|
+ try:
|
|
|
+ im = Image.open(io.BytesIO(buf))
|
|
|
+ except Exception:
|
|
|
+ return None
|
|
|
+
|
|
|
+ try:
|
|
|
+ im = im.convert("RGB")
|
|
|
+ if max_size:
|
|
|
+ im.thumbnail((max_size, max_size))
|
|
|
+ bufio = io.BytesIO()
|
|
|
+ im.save(bufio, "JPEG", quality=quality)
|
|
|
+ return base64.b64encode(bufio.getvalue()).decode("ascii")
|
|
|
+ except Exception:
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_img_thumb(self, fnames, *, max_size: int = 64) -> str | None:
|
|
|
+ """Return a small JPEG thumbnail (b64) for an image message."""
|
|
|
+ fnames = [k for k in fnames if k]
|
|
|
+ big_file, small_file = self._get_img_file(fnames)
|
|
|
+ return (
|
|
|
+ self._img_file_to_jpg_b64(small_file, max_size=max_size, quality=THUMB_JPEG_QUALITY)
|
|
|
+ or self._img_file_to_jpg_b64(big_file, max_size=max_size, quality=THUMB_JPEG_QUALITY)
|
|
|
+ )
|
|
|
+
|
|
|
+ def get_video_thumb(self, videoid: str, *, max_size: int = 64) -> str | None:
|
|
|
+ """Return a small JPEG thumbnail (b64) for a video message, if available."""
|
|
|
+ if not videoid:
|
|
|
+ return None
|
|
|
+ video_thumbnail_file = os.path.join(self.video_dir, videoid + ".jpg")
|
|
|
+ if not os.path.exists(video_thumbnail_file):
|
|
|
+ return None
|
|
|
+ return self._img_file_to_jpg_b64(video_thumbnail_file, max_size=max_size, quality=THUMB_JPEG_QUALITY)
|
|
|
|
|
|
def get_emoji_by_md5(self, md5):
|
|
|
""" Returns: (b64 encoded img string, format) """
|