Просмотр исходного кода

[core] add seq based roaming support for group

StageGuard 3 лет назад
Родитель
Сommit
25f96c6bc3

+ 4 - 0
mirai-core-api/compatibility-validation/android/api/android.api

@@ -1067,6 +1067,10 @@ public abstract interface class net/mamoe/mirai/contact/roaming/RoamingMessages
 	public static synthetic fun getAllMessagesStream$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;ILjava/lang/Object;)Ljava/util/stream/Stream;
 	public static synthetic fun getAllMessagesStream$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
 	public static synthetic fun getAllMessagesStream$suspendImpl (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public fun getMessagesBefore (Lnet/mamoe/mirai/message/data/MessageSource;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;)Lnet/mamoe/mirai/utils/Streamable;
+	public abstract fun getMessagesBefore (Lnet/mamoe/mirai/message/data/MessageSource;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static synthetic fun getMessagesBefore$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/message/data/MessageSource;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;ILjava/lang/Object;)Lnet/mamoe/mirai/utils/Streamable;
+	public static synthetic fun getMessagesBefore$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/message/data/MessageSource;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
 	public fun getMessagesIn (JJLnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;)Lkotlinx/coroutines/flow/Flow;
 	public abstract fun getMessagesIn (JJLnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static synthetic fun getMessagesIn$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;JJLnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;

+ 4 - 0
mirai-core-api/compatibility-validation/jvm/api/jvm.api

@@ -1067,6 +1067,10 @@ public abstract interface class net/mamoe/mirai/contact/roaming/RoamingMessages
 	public static synthetic fun getAllMessagesStream$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;ILjava/lang/Object;)Ljava/util/stream/Stream;
 	public static synthetic fun getAllMessagesStream$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
 	public static synthetic fun getAllMessagesStream$suspendImpl (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public fun getMessagesBefore (Lnet/mamoe/mirai/message/data/MessageSource;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;)Lnet/mamoe/mirai/utils/Streamable;
+	public abstract fun getMessagesBefore (Lnet/mamoe/mirai/message/data/MessageSource;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static synthetic fun getMessagesBefore$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/message/data/MessageSource;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;ILjava/lang/Object;)Lnet/mamoe/mirai/utils/Streamable;
+	public static synthetic fun getMessagesBefore$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/message/data/MessageSource;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
 	public fun getMessagesIn (JJLnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;)Lkotlinx/coroutines/flow/Flow;
 	public abstract fun getMessagesIn (JJLnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static synthetic fun getMessagesIn$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;JJLnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;

+ 26 - 0
mirai-core-api/src/commonMain/kotlin/contact/roaming/RoamingMessages.kt

@@ -12,6 +12,8 @@ package net.mamoe.mirai.contact.roaming
 import kotlinx.coroutines.flow.Flow
 import net.mamoe.mirai.message.data.MessageChain
 import net.mamoe.mirai.message.data.MessageSource
+import net.mamoe.mirai.message.data.OnlineMessageSource
+import net.mamoe.mirai.utils.Streamable
 
 /**
  * 漫游消息记录管理器. 可通过 [RoamingSupported.roamingMessages] 获得.
@@ -48,6 +50,30 @@ public expect interface RoamingMessages {
         filter: RoamingMessageFilter? = null
     ): Flow<MessageChain>
 
+    /**
+     * 查询指定消息之前的消息记录
+     *
+     * 返回查询到的漫游消息记录, 顺序为由新到旧. 这些 [MessageChain] 与从事件中收到的消息链相似, 属于在线消息.
+     * 可从 [MessageChain] 获取 [MessageSource] 来确定发送人等相关信息, 也可以进行引用回复或撤回.
+     *
+     * 注意, 返回的消息记录既包含机器人发送给目标用户的消息, 也包含目标用户发送给机器人的消息.
+     * 可通过 [MessageChain] 获取 [MessageSource] (用法为 `messageChain.source`), 判断 [MessageSource.fromId] (发送人).
+     * 消息的其他*元数据*信息也要通过 [MessageSource] 获取 (如 [MessageSource.time] 获取时间).
+     *
+     * 若只需要获取单向消息 (机器人发送给目标用户的消息或反之), 可使用 [RoamingMessageFilter.SENT] 或 [RoamingMessageFilter.RECEIVED] 作为 [filter] 参数传递.
+     *
+     * 性能提示: 请在 [filter] 执行筛选, 若 [filter] 返回 `false` 则不会解析消息链, 这对本函数的处理速度有决定性影响.
+     *
+     * @param source 消息源,当不为 `null` 时必须为 [OnlineMessageSource],结果不包含当前消息;
+     * 为 `null` 时从最近一条消息开始获取且包含该消息.
+     * @param filter 过滤器.
+     * @since 2.15
+     */
+    public suspend fun getMessagesBefore(
+        source: MessageSource? = null,
+        filter: RoamingMessageFilter? = null
+    ): Streamable<MessageChain>
+
     /**
      * 查询所有漫游消息记录.
      *

+ 27 - 0
mirai-core-api/src/jvmBaseMain/kotlin/contact/roaming/RoamingMessages.kt

@@ -15,8 +15,10 @@ import kotlinx.coroutines.flow.Flow
 import me.him188.kotlin.jvm.blocking.bridge.JvmBlockingBridge
 import net.mamoe.mirai.message.data.MessageChain
 import net.mamoe.mirai.message.data.MessageSource
+import net.mamoe.mirai.message.data.OnlineMessageSource
 import net.mamoe.mirai.utils.JavaFriendlyAPI
 import net.mamoe.mirai.utils.JdkStreamSupport.toStream
+import net.mamoe.mirai.utils.Streamable
 import java.util.stream.Stream
 
 
@@ -56,6 +58,31 @@ public actual interface RoamingMessages {
         filter: RoamingMessageFilter? = null
     ): Flow<MessageChain>
 
+    /**
+     * 查询指定消息之前的消息记录
+     *
+     * 返回查询到的漫游消息记录, 顺序为由新到旧. 这些 [MessageChain] 与从事件中收到的消息链相似, 属于在线消息.
+     * 可从 [MessageChain] 获取 [MessageSource] 来确定发送人等相关信息, 也可以进行引用回复或撤回.
+     *
+     * 注意, 返回的消息记录既包含机器人发送给目标用户的消息, 也包含目标用户发送给机器人的消息.
+     * 可通过 [MessageChain] 获取 [MessageSource] (用法为 `messageChain.source`), 判断 [MessageSource.fromId] (发送人).
+     * 消息的其他*元数据*信息也要通过 [MessageSource] 获取 (如 [MessageSource.time] 获取时间).
+     *
+     * 若只需要获取单向消息 (机器人发送给目标用户的消息或反之), 可使用 [RoamingMessageFilter.SENT] 或 [RoamingMessageFilter.RECEIVED] 作为 [filter] 参数传递.
+     *
+     * 性能提示: 请在 [filter] 执行筛选, 若 [filter] 返回 `false` 则不会解析消息链, 这对本函数的处理速度有决定性影响.
+     *
+     * @param source 消息源,当不为 `null` 时必须为 [OnlineMessageSource],结果不包含当前消息;
+     * 为 `null` 时从最近一条消息开始获取且包含该消息.
+     * @param filter 过滤器.
+     * @since 2.15
+     */
+    @Suppress("ACTUAL_FUNCTION_WITH_DEFAULT_ARGUMENTS")
+    public actual suspend fun getMessagesBefore(
+        source: MessageSource?,
+        filter: RoamingMessageFilter? = null
+    ): Streamable<MessageChain>
+
     /**
      * 查询所有漫游消息记录. Java Stream 方法查看 [getAllMessagesStream].
      *

+ 26 - 0
mirai-core-api/src/nativeMain/kotlin/contact/roaming/RoamingMessages.kt

@@ -12,6 +12,8 @@ package net.mamoe.mirai.contact.roaming
 import kotlinx.coroutines.flow.Flow
 import net.mamoe.mirai.message.data.MessageChain
 import net.mamoe.mirai.message.data.MessageSource
+import net.mamoe.mirai.message.data.OnlineMessageSource
+import net.mamoe.mirai.utils.Streamable
 
 
 /**
@@ -49,6 +51,30 @@ public actual interface RoamingMessages {
         filter: RoamingMessageFilter?
     ): Flow<MessageChain>
 
+    /**
+     * 查询指定消息之前的消息记录
+     *
+     * 返回查询到的漫游消息记录, 顺序为由新到旧. 这些 [MessageChain] 与从事件中收到的消息链相似, 属于在线消息.
+     * 可从 [MessageChain] 获取 [MessageSource] 来确定发送人等相关信息, 也可以进行引用回复或撤回.
+     *
+     * 注意, 返回的消息记录既包含机器人发送给目标用户的消息, 也包含目标用户发送给机器人的消息.
+     * 可通过 [MessageChain] 获取 [MessageSource] (用法为 `messageChain.source`), 判断 [MessageSource.fromId] (发送人).
+     * 消息的其他*元数据*信息也要通过 [MessageSource] 获取 (如 [MessageSource.time] 获取时间).
+     *
+     * 若只需要获取单向消息 (机器人发送给目标用户的消息或反之), 可使用 [RoamingMessageFilter.SENT] 或 [RoamingMessageFilter.RECEIVED] 作为 [filter] 参数传递.
+     *
+     * 性能提示: 请在 [filter] 执行筛选, 若 [filter] 返回 `false` 则不会解析消息链, 这对本函数的处理速度有决定性影响.
+     *
+     * @param source 消息源,当不为 `null` 时必须为 [OnlineMessageSource],结果不包含当前消息;
+     * 为 `null` 时从最近一条消息开始获取且包含该消息.
+     * @param filter 过滤器.
+     * @since 2.15
+     */
+    public actual suspend fun getMessagesBefore(
+        source: MessageSource?,
+        filter: RoamingMessageFilter?
+    ): Streamable<MessageChain>
+
     /**
      * 查询所有漫游消息记录.
      *

+ 55 - 33
mirai-core/src/commonMain/kotlin/contact/roaming/RoamingMessagesImplGroup.kt

@@ -15,15 +15,20 @@ import net.mamoe.mirai.internal.contact.CommonGroupImpl
 import net.mamoe.mirai.internal.message.getMessageSourceKindFromC2cCmdOrNull
 import net.mamoe.mirai.internal.message.toMessageChainOnline
 import net.mamoe.mirai.internal.network.protocol.data.proto.MsgComm
-import net.mamoe.mirai.internal.network.protocol.packet.chat.TroopManagement
 import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.MessageSvcPbGetGroupMsg
 import net.mamoe.mirai.message.data.MessageChain
+import net.mamoe.mirai.message.data.MessageSource
+import net.mamoe.mirai.message.data.OnlineMessageSource
 
 internal class RoamingMessagesImplGroup(
     override val contact: CommonGroupImpl
-) : AbstractRoamingMessages() {
-    private val bot get() = contact.bot
+) : SequenceBasedRoamingMessagesImpl() {
 
+    /**
+     * time-based roaming without extending [TimeBasedRoamingMessagesImpl]
+     * because protocol MessageSvc.PbGetGroupMsg doesn't support querying via time.
+     * so this is actually sequence-based roaming.
+     */
     override suspend fun getMessagesIn(
         timeStart: Long,
         timeEnd: Long,
@@ -33,25 +38,11 @@ internal class RoamingMessagesImplGroup(
 
         return flow {
             while (true) {
-                val resp = contact.bot.network.sendAndExpect(
-                    MessageSvcPbGetGroupMsg(
-                        client = contact.bot.client,
-                        groupUin = contact.uin,
-                        messageSequence = currentSeq.toLong(),
-                        count = 20 // maximum 20
-                    )
-                )
-
-                if (resp is MessageSvcPbGetGroupMsg.Failed) break
-                resp as MessageSvcPbGetGroupMsg.Success // stupid smart cast
-                if (resp.msgElem.isEmpty()) break
+                val resp = getGroupMsg(currentSeq.toLong()) ?: break
 
                 // the message may be sorted increasing by message time,
                 // if so, additional sortBy will not take cost.
-                val messageTimeSequence = resp.msgElem.asSequence().map { it.time }
-
-                val maxTime = messageTimeSequence.max()
-
+                val maxTime = resp.msgElem.maxOf { it.time }
                 if (maxTime < timeStart) break // we have fetched all messages
 
                 emitAll(
@@ -69,24 +60,55 @@ internal class RoamingMessagesImplGroup(
         }
     }
 
-    private val MsgComm.Msg.time get() = msgHead.msgTime
+    override suspend fun getMessagesBeforeFlow(
+        source: MessageSource?,
+        filter: RoamingMessageFilter?
+    ): Flow<MessageChain> {
+        var currentSeq = if (source != null) {
+            (source as? OnlineMessageSource) ?: error("source is not OnlineMessageSource")
+            source.ids.firstOrNull()?.minus(1) ?: return emptyFlow()
+        } else {
+            getLastMsgSeq() ?: return emptyFlow()
+        }
 
-    private fun RoamingMessageFilter?.apply(
-        it: MsgComm.Msg
-    ) = this?.invoke(createRoamingMessage(it, listOf())) != false
+        return flow {
+            while (true) {
+                val resp = getGroupMsg(currentSeq.toLong()) ?: break
 
-    private suspend fun getLastMsgSeq(): Int? {
-        // Iterate from the newest message to find messages within [timeStart] and [timeEnd]
-        val lastMsgSeqResp = bot.network.sendAndExpect(
-            TroopManagement.GetGroupLastMsgSeq(
-                client = bot.client,
-                groupUin = contact.uin
+                emitAll(
+                    resp.msgElem.asSequence()
+                        .filter { getMessageSourceKindFromC2cCmdOrNull(it.msgHead.c2cCmd) != null } // ignore unsupported messages
+                        .sortedByDescending { it.time } // Ensure caller receiver newer messages first
+                        .filter { filter.apply(it) } // Call filter after sort
+                        .asFlow()
+                        .map { it.toMessageChainOnline(bot) }
+                )
+
+                currentSeq = resp.msgElem.minBy { it.time }.msgHead.msgSeq
+            }
+        }
+    }
+
+    private suspend fun getGroupMsg(seq: Long): MessageSvcPbGetGroupMsg.Success? {
+        val resp = contact.bot.network.sendAndExpect(
+            MessageSvcPbGetGroupMsg(
+                client = contact.bot.client,
+                groupUin = contact.uin,
+                messageSequence = seq,
+                count = 20 // maximum 20
             )
         )
 
-        return when (lastMsgSeqResp) {
-            TroopManagement.GetGroupLastMsgSeq.Response.Failed -> null
-            is TroopManagement.GetGroupLastMsgSeq.Response.Success -> lastMsgSeqResp.seq
-        }
+        if (resp is MessageSvcPbGetGroupMsg.Failed) return null
+        resp as MessageSvcPbGetGroupMsg.Success
+        if (resp.msgElem.isEmpty()) return null
+
+        return resp
     }
+
+    private val MsgComm.Msg.time get() = msgHead.msgTime
+
+    private fun RoamingMessageFilter?.apply(
+        it: MsgComm.Msg
+    ) = this?.invoke(createRoamingMessage(it, listOf())) != false
 }

+ 61 - 0
mirai-core/src/commonMain/kotlin/contact/roaming/SequenceBasedRoamingMessagesImpl.kt

@@ -0,0 +1,61 @@
+/*
+ * Copyright 2019-2023 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+package net.mamoe.mirai.internal.contact.roaming
+
+import kotlinx.coroutines.flow.Flow
+import net.mamoe.mirai.contact.roaming.RoamingMessageFilter
+import net.mamoe.mirai.internal.contact.uin
+import net.mamoe.mirai.internal.network.protocol.packet.chat.TroopManagement
+import net.mamoe.mirai.message.data.MessageChain
+import net.mamoe.mirai.message.data.MessageSource
+import net.mamoe.mirai.utils.Streamable
+
+internal sealed class SequenceBasedRoamingMessagesImpl : AbstractRoamingMessages() {
+    internal val bot get() = contact.bot
+    override suspend fun getMessagesIn(
+        timeStart: Long,
+        timeEnd: Long,
+        filter: RoamingMessageFilter?
+    ): Flow<MessageChain> {
+        error("not implemented in SequenceBasedRoamingMessage.")
+    }
+
+    override suspend fun getMessagesBefore(
+        source: MessageSource?,
+        filter: RoamingMessageFilter?
+    ): Streamable<MessageChain> {
+        val flow = getMessagesBeforeFlow(source, filter)
+        return object : Streamable<MessageChain> {
+            override fun asFlow(): Flow<MessageChain> {
+                return flow
+            }
+        }
+    }
+
+    abstract suspend fun getMessagesBeforeFlow(
+        source: MessageSource?,
+        filter: RoamingMessageFilter?
+    ): Flow<MessageChain>
+
+    internal suspend fun getLastMsgSeq(): Int? {
+        // Iterate from the newest message to find messages within [timeStart] and [timeEnd]
+        val lastMsgSeqResp = bot.network.sendAndExpect(
+            TroopManagement.GetGroupLastMsgSeq(
+                client = bot.client,
+                groupUin = contact.uin
+            )
+        )
+
+        return when (lastMsgSeqResp) {
+            TroopManagement.GetGroupLastMsgSeq.Response.Failed -> null
+            is TroopManagement.GetGroupLastMsgSeq.Response.Success -> lastMsgSeqResp.seq
+        }
+    }
+}

+ 9 - 0
mirai-core/src/commonMain/kotlin/contact/roaming/TimeBasedRoamingMessagesImpl.kt

@@ -19,6 +19,8 @@ import net.mamoe.mirai.contact.roaming.RoamingMessageFilter
 import net.mamoe.mirai.internal.message.toMessageChainOnline
 import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.MessageSvcPbGetRoamMsgReq
 import net.mamoe.mirai.message.data.MessageChain
+import net.mamoe.mirai.message.data.MessageSource
+import net.mamoe.mirai.utils.Streamable
 
 internal sealed class TimeBasedRoamingMessagesImpl : AbstractRoamingMessages() {
     override suspend fun getMessagesIn(
@@ -49,6 +51,13 @@ internal sealed class TimeBasedRoamingMessagesImpl : AbstractRoamingMessages() {
         }
     }
 
+    override suspend fun getMessagesBefore(
+        source: MessageSource?,
+        filter: RoamingMessageFilter?
+    ): Streamable<MessageChain> {
+        error("not implemented in TimeBasedRoamingMessage.")
+    }
+
     abstract suspend fun requestRoamMsg(
         timeStart: Long,
         lastMessageTime: Long,