|
|
@@ -30,6 +30,7 @@ internal class RoamingMessagesImplGroup(
|
|
|
filter: RoamingMessageFilter?
|
|
|
): Flow<MessageChain> {
|
|
|
var currentSeq: Int = getLastMsgSeq() ?: return emptyFlow()
|
|
|
+ var lastOfferedSeq = -1
|
|
|
|
|
|
return flow {
|
|
|
while (true) {
|
|
|
@@ -52,10 +53,14 @@ internal class RoamingMessagesImplGroup(
|
|
|
|
|
|
val maxTime = messageTimeSequence.max()
|
|
|
|
|
|
- if (maxTime < timeStart) break // we have fetched all messages
|
|
|
+
|
|
|
+ // we have fetched all messages
|
|
|
+ // note: maxTime = 0 means all fetched messages were recalled
|
|
|
+ if (maxTime < timeStart && maxTime != 0) break
|
|
|
|
|
|
emitAll(
|
|
|
resp.msgElem.asSequence()
|
|
|
+ .filter { lastOfferedSeq == -1 || it.msgHead.msgSeq < lastOfferedSeq }
|
|
|
.filter { it.time in timeStart..timeEnd }
|
|
|
.sortedByDescending { it.msgHead.msgSeq } // Ensure caller receiver newer messages first
|
|
|
.filter { filter.apply(it) } // Call filter after sort
|
|
|
@@ -63,7 +68,8 @@ internal class RoamingMessagesImplGroup(
|
|
|
.map { listOf(it).toMessageChainOnline(bot, contact.id, MessageSourceKind.GROUP) }
|
|
|
)
|
|
|
|
|
|
- currentSeq = resp.msgElem.minBy { it.time }.msgHead.msgSeq
|
|
|
+ currentSeq = resp.msgElem.first().msgHead.msgSeq
|
|
|
+ lastOfferedSeq = currentSeq
|
|
|
}
|
|
|
}
|
|
|
}
|