msg.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. # -*- coding: UTF-8 -*-
  2. TYPE_MSG = 1
  3. TYPE_IMG = 3
  4. TYPE_SPEAK = 34
  5. TYPE_NAMECARD = 42
  6. TYPE_VIDEO_FILE = 43
  7. TYPE_EMOJI = 47
  8. TYPE_LOCATION = 48
  9. TYPE_LINK = 49 # link share OR file from web, see https://github.com/ppwwyyxx/wechat-dump/issues/52
  10. TYPE_VOIP = 50
  11. TYPE_WX_VIDEO = 62 # video took by wechat
  12. TYPE_SYSTEM = 10000
  13. TYPE_CUSTOM_EMOJI = 1048625
  14. TYPE_REDENVELOPE = 436207665
  15. TYPE_MONEY_TRANSFER = 419430449 # 微信转账
  16. TYPE_LOCATION_SHARING = -1879048186
  17. TYPE_REPLY = 822083633 # 回复的消息.
  18. TYPE_APP_MSG = 16777265
  19. _KNOWN_TYPES = [eval(k) for k in dir() if k.startswith('TYPE_')]
  20. import re
  21. import io
  22. from pyquery import PyQuery
  23. import xml.etree.ElementTree as ET
  24. import logging
  25. logger = logging.getLogger(__name__)
  26. from .common.textutil import ensure_unicode
  27. class WeChatMsg(object):
  28. @staticmethod
  29. def filter_type(tp):
  30. if tp in [TYPE_SYSTEM]:
  31. return True
  32. return False
  33. def __init__(self, values):
  34. for k, v in values.items():
  35. setattr(self, k, v)
  36. if self.type not in _KNOWN_TYPES:
  37. logger.warn("Unhandled message type: {}".format(self.type))
  38. # only to supress repeated warning:
  39. _KNOWN_TYPES.append(self.type)
  40. def msg_str(self):
  41. if self.type == TYPE_LOCATION:
  42. try:
  43. pq = PyQuery(self.content_xml_ready, parser='xml')
  44. loc = pq('location').attr
  45. label = loc['label']
  46. poiname = loc['poiname']
  47. if poiname:
  48. label = poiname
  49. return "LOCATION:" + label + " ({},{})".format(loc['x'], loc['y'])
  50. except:
  51. return "LOCATION: unknown"
  52. elif self.type == TYPE_LINK:
  53. pq = PyQuery(self.content_xml_ready)
  54. url = pq('url').text()
  55. if not url:
  56. # TODO: see https://github.com/ppwwyyxx/wechat-dump/issues/52 for
  57. # more logic to implement
  58. title = pq('title').text()
  59. if title: # may not be correct
  60. return "FILE:{}".format(title)
  61. return "NOT IMPLEMENTED: " + self.content_xml_ready
  62. return "URL:{}".format(url)
  63. elif self.type == TYPE_NAMECARD:
  64. pq = PyQuery(self.content_xml_ready, parser='xml')
  65. msg = pq('msg').attr
  66. name = msg['nickname']
  67. if not name:
  68. name = msg['alias']
  69. if not name:
  70. name = ""
  71. return "NAMECARD: {}".format(self.content_xml_ready)
  72. elif self.type == TYPE_APP_MSG:
  73. pq = PyQuery(self.content_xml_ready, parser='xml')
  74. return pq('title').text()
  75. elif self.type == TYPE_VIDEO_FILE:
  76. return "VIDEO FILE"
  77. elif self.type == TYPE_WX_VIDEO:
  78. return "WeChat VIDEO"
  79. elif self.type == TYPE_VOIP:
  80. return "REQUEST VIDEO CHAT"
  81. elif self.type == TYPE_LOCATION_SHARING:
  82. return "LOCATION SHARING"
  83. elif self.type == TYPE_EMOJI:
  84. # TODO add emoji name
  85. return self.content
  86. elif self.type == TYPE_REDENVELOPE:
  87. data_to_parse = io.BytesIO(self.content.encode('utf-8'))
  88. try:
  89. for event, elem in ET.iterparse(data_to_parse, events=('end',)):
  90. if elem.tag == 'sendertitle':
  91. title = elem.text
  92. return "[RED ENVELOPE]\n{}".format(title)
  93. except:
  94. pass
  95. return "[RED ENVELOPE]"
  96. elif self.type == TYPE_MONEY_TRANSFER:
  97. data_to_parse = io.BytesIO(self.content.encode('utf-8'))
  98. try:
  99. for event, elem in ET.iterparse(data_to_parse, events=('end',)):
  100. if elem.tag == 'des':
  101. title = elem.text
  102. return "[Money Transfer]\n{}".format(title)
  103. except:
  104. pass
  105. return "[Money Transfer]"
  106. elif self.type == TYPE_REPLY:
  107. pq = PyQuery(self.content_xml_ready)
  108. msg = pq('title').text()
  109. # TODO parse reply.
  110. return msg
  111. else:
  112. # TODO replace smiley with text
  113. return self.content
  114. @property
  115. def content_xml_ready(self):
  116. # remove xml headers to avoid possible errors it may create
  117. header = re.compile(r'<\?.*\?>')
  118. msg = header.sub("", self.content)
  119. return msg
  120. def __repr__(self):
  121. ret = "{}|{}:{}:{}".format(
  122. self.type,
  123. self.talker_nickname if not self.isSend else 'me',
  124. self.createTime,
  125. ensure_unicode(self.msg_str()))
  126. if self.imgPath:
  127. ret = "{}|img:{}".format(ensure_unicode(ret.strip()), self.imgPath)
  128. return ret
  129. else:
  130. return ret
  131. def __lt__(self, r):
  132. return self.createTime < r.createTime
  133. def is_chatroom(self):
  134. return self.talker != self.chat
  135. def get_chatroom(self):
  136. if self.is_chatroom():
  137. return self.chat
  138. else:
  139. return ''
  140. def get_emoji_product_id(self):
  141. assert self.type == TYPE_EMOJI, "Wrong call to get_emoji_product_id()!"
  142. pq = PyQuery(self.content_xml_ready, parser='xml')
  143. emoji = pq('emoji')
  144. if not emoji:
  145. return None
  146. return emoji.attrs['productid']