#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import os from collections import Counter from functools import lru_cache import glob from pyquery import PyQuery import logging logger = logging.getLogger(__name__) LIB_PATH = os.path.dirname(os.path.abspath(__file__)) STATIC_PATH = os.path.join(LIB_PATH, 'static') HTML_FILE = os.path.join(STATIC_PATH, 'TP_INDEX.html') TIME_HTML_FILE = os.path.join(STATIC_PATH, 'TP_TIME.html') FRIEND_AVATAR_CSS_FILE = os.path.join(STATIC_PATH, 'avatar.css.tpl') try: from csscompressor import compress as css_compress except ImportError: css_compress = lambda x: x from .msg import * from .common.textutil import get_file_b64 from .common.progress import ProgressReporter from .common.timer import timing from .smiley import SmileyProvider from .msgslice import MessageSlicerByTime, MessageSlicerBySize TEMPLATES_FILES = {TYPE_MSG: "TP_MSG", TYPE_IMG: "TP_IMG", TYPE_SPEAK: "TP_SPEAK", TYPE_EMOJI: "TP_EMOJI", TYPE_CUSTOM_EMOJI: "TP_EMOJI", TYPE_LINK: "TP_MSG", TYPE_REPLY: "TP_REPLY", TYPE_VIDEO_FILE: "TP_VIDEO_FILE", TYPE_QQMUSIC: "TP_QQMUSIC", } @lru_cache() def get_template(name: str | int) -> str | None: """Return the html template given a file name or msg type.""" if isinstance(name, int): name = TEMPLATES_FILES.get(name, None) if name is None: return None html_path = os.path.join(STATIC_PATH, f"{name}.html") with open(html_path) as f: return f.read() class HTMLRender(object): def __init__(self, parser, res=None): with open(HTML_FILE) as f: self.html = f.read() with open(TIME_HTML_FILE) as f: self.time_html = f.read() self.parser = parser self.res = res assert self.res is not None, \ "Resource Directory not given. Cannot render HTML." self.smiley = SmileyProvider() css_files = glob.glob(os.path.join(LIB_PATH, 'static/*.css')) self.css_string = [] # css to add for css in css_files: logger.info("Loading {}".format(os.path.basename(css))) with open(css) as f: self.css_string.append(f.read()) js_files = glob.glob(os.path.join(LIB_PATH, 'static/*.js')) # to load jquery before other js js_files = sorted(js_files, key=lambda f: 'jquery-latest' in f, reverse=True) self.js_string = [] for js in js_files: logger.info("Loading {}".format(os.path.basename(js))) with open(js) as f: self.js_string.append(f.read()) self.unknown_type_cnt = Counter() @property def all_css(self): # call after processing all messages, # because smiley css need to be included only when necessary def process(css): css = css_compress(css) return u''.format(css) if hasattr(self, 'final_css'): return self.final_css + process(self.smiley.gen_used_smiley_css()) self.final_css = u"\n".join(map(process, self.css_string)) return self.final_css + process(self.smiley.gen_used_smiley_css()) @property def all_js(self): if hasattr(self, 'final_js'): return self.final_js def process(js): # TODO: add js compress return u''.format(js) self.final_js = u"\n".join(map(process, self.js_string)) return self.final_js #@timing(total=True) def render_msg(self, msg: WeChatMsg): """ render a message, return the html block""" # TODO for chatroom, add nickname on avatar sender = u'you ' + msg.talker if not msg.isSend else 'me' format_dict = {'sender_label': sender, 'time': msg.createTime } if not msg.known_type: self.unknown_type_cnt[msg.type] += 1 if(not msg.isSend and msg.is_chatroom()): format_dict['nickname'] = '>\n
'+msg.talker_nickname+' str:
                return s.replace("{", "{{").replace("}", "}}")

            title = info.get("title") or ""
            reply_to = info.get("ref_name") or "unknown"
            reply_quote = info.get("ref_content") or ""
            ref_svrid = info.get("ref_svrid")

            if not title and not reply_quote:
                return fallback()

            format_dict["content"] = _escape_fmt(self.smiley.replace_smileycode(title))
            format_dict["reply_to"] = _escape_fmt(reply_to)

            reply_thumb_html = ""
            ref_msg = getattr(self, "_msg_by_svrid", {}).get(ref_svrid) if ref_svrid is not None else None
            if ref_msg is not None:
                try:
                    if ref_msg.type == TYPE_IMG and ref_msg.imgPath:
                        imgpath = ref_msg.imgPath.split("_")[-1]
                        bigimgpath = self.parser.imginfo.get(ref_msg.msgSvrId)
                        fnames = [k for k in [imgpath, bigimgpath] if k]
                        b64 = self.res.get_img_thumb(fnames, max_size=64)
                        if b64:
                            reply_thumb_html = f''
                    elif ref_msg.type in (TYPE_VIDEO_FILE, TYPE_WX_VIDEO) and ref_msg.imgPath:
                        b64 = self.res.get_video_thumb(ref_msg.imgPath, max_size=64)
                        if b64:
                            reply_thumb_html = f''
                    elif ref_msg.type in (TYPE_EMOJI, TYPE_CUSTOM_EMOJI):
                        if "emoticonmd5" in ref_msg.content:
                            pq = PyQuery(ref_msg.content)
                            md5 = pq("emoticonmd5").text()
                        else:
                            md5 = ref_msg.imgPath
                        if md5:
                            emoji_img, fmt = self.res.get_emoji_by_md5(md5)
                            if emoji_img and fmt:
                                fmt = fmt.lower()
                                if fmt == "jpg":
                                    fmt = "jpeg"
                                reply_thumb_html = (
                                    f''
                                )
                except Exception:
                    logger.exception("Failed to render reply thumbnail (%s).", ref_svrid)

            if reply_thumb_html:
                reply_quote_html = reply_thumb_html
            else:
                quote_text = self.smiley.replace_smileycode(reply_quote)
                reply_quote_html = f'{quote_text}'
            format_dict["reply_quote_html"] = _escape_fmt(reply_quote_html)

            template = template or get_template(TYPE_MSG)
            return template.format(**format_dict)
        elif msg.type == TYPE_EMOJI or msg.type == TYPE_CUSTOM_EMOJI:
            if 'emoticonmd5' in msg.content:
                pq = PyQuery(msg.content)
                md5 = pq('emoticonmd5').text()
            else:
                md5 = msg.imgPath
                # TODO md5 could exist in both.
                # first is emoji md5, second is image2/ md5
                # can use fallback here.
            if md5:
                emoji_img, format = self.res.get_emoji_by_md5(md5)
                format_dict['emoji_format'] = format
                format_dict['emoji_img'] = emoji_img
            else:
                import IPython as IP; IP.embed()
            return template.format(**format_dict)
        elif msg.type == TYPE_LINK:
            pq = PyQuery(msg.content_xml_ready)
            url = pq('url').text()
            if url:
                try:
                    title = pq('title')[0].text
                except Exception as e:
                    logger.warning('No title found in LINK message: ' + str(e))
                    title = url
                content = '{1}'.format(url, title)
                format_dict['content'] = content
                return template.format(**format_dict)
        elif msg.type == TYPE_VIDEO_FILE:
            video = self.res.get_video(msg.imgPath)
            if video is None:
                logger.warning(f"Cannot find video {msg.imgPath} ({msg.createTime})")
                # fallback
                format_dict['content'] = f"VIDEO FILE {msg.imgPath}"
                return get_template(TYPE_MSG).format(**format_dict)
            elif video.endswith(".mp4"):
                video_str = get_file_b64(video)
                format_dict["video_str"] = video_str
                return template.format(**format_dict)
            elif video.endswith(".jpg"):
                # only has thumbnail
                image_str = get_file_b64(video)
                format_dict["img"] = (image_str, 'jpeg')
                return get_template(TYPE_IMG).format(**format_dict)
        elif msg.type == TYPE_WX_VIDEO:
            # TODO: fetch video from resource
            return fallback()
        return fallback()

    def _render_partial_msgs(self, msgs):
        """ return single html"""
        self.smiley.reset()
        slicer = MessageSlicerByTime()
        slices = slicer.slice(msgs)

        blocks = []
        for idx, slice in enumerate(slices):
            nowtime = slice[0].createTime
            if idx == 0 or \
               slices[idx - 1][0].createTime.date() != nowtime.date():
                timestr = nowtime.strftime("%m/%d %H:%M:%S")
            else:
                timestr = nowtime.strftime("%H:%M:%S")
            blocks.append(self.time_html.format(time=timestr))
            blocks.extend([self.render_msg(m) for m in slice])
            self.prgs.trigger(len(slice))

        # string operation is extremely slow
        return self.html.format(extra_css=self.all_css,
                            extra_js=self.all_js,
                            chat=msgs[0].chat_nickname,
                            messages=u''.join(blocks)
                           )

    def prepare_avatar_css(self, talkers):
        with open(FRIEND_AVATAR_CSS_FILE) as f:
            avatar_tpl = f.read()
        my_avatar = self.res.get_avatar(self.parser.username)
        css = avatar_tpl.format(name='me', avatar=my_avatar)

        for talker in talkers:
            avatar = self.res.get_avatar(talker)
            css += avatar_tpl.format(name=talker, avatar=avatar)
        self.css_string.append(css)

    def render_msgs(self, msgs):
        """ render msgs of one chat, return a list of html"""
        chatid = msgs[0].chat
        self._msg_by_svrid = {m.msgSvrId: m for m in self.parser.msgs_by_chat.get(chatid, msgs)}
        if msgs[0].is_chatroom():
            talkers = set([m.talker for m in msgs])
        else:
            talkers = set([msgs[0].talker])
        self.prepare_avatar_css(talkers)

        self.res.cache_voice_mp3(msgs)

        chat = msgs[0].chat_nickname
        logger.info(u"Rendering {} messages of {}".format(
            len(msgs), chat))

        self.prgs = ProgressReporter("Render", total=len(msgs))
        slice_by_size = MessageSlicerBySize().slice(msgs)
        ret = [self._render_partial_msgs(s) for s in slice_by_size]
        self.prgs.finish()
        logger.warning("[HTMLRenderer] Unhandled messages (type->cnt): {}".format(self.unknown_type_cnt))
        return ret

if __name__ == '__main__':
    r = HTMLRender()
    with open('/tmp/a.html', 'w') as f:
        print >> f, r.html.format(style=r.css, talker='talker',
                                     messages='haha')