소스 검색

Redesign event subscription, using inherited coroutineContext

Him188 6 년 전
부모
커밋
05ca6424db

+ 0 - 7
mirai-core/src/androidMain/kotlin/net/mamoe/mirai/event/EventDispatcherAndroid.kt

@@ -1,7 +0,0 @@
-package net.mamoe.mirai.event
-
-import kotlinx.coroutines.CoroutineDispatcher
-import kotlinx.coroutines.asCoroutineDispatcher
-import java.util.concurrent.Executors
-
-internal actual val EventDispatcher: CoroutineDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher()

+ 2 - 2
mirai-core/src/commonMain/kotlin/net.mamoe.mirai/Bot.kt

@@ -121,7 +121,7 @@ class Bot(val account: BotAccount, val logger: MiraiLogger) : CoroutineScope {
         suspend fun getQQ(id: UInt): QQ =
             if (_qqs.containsKey(id)) _qqs[id]!!
             else qqsLock.withLock {
-                _qqs.getOrPut(id) { QQImpl(bot, id) }
+                _qqs.getOrPut(id) { QQImpl(bot, id, coroutineContext) }
             }
 
         /**
@@ -139,7 +139,7 @@ class Bot(val account: BotAccount, val logger: MiraiLogger) : CoroutineScope {
         suspend fun getGroup(id: GroupId): Group = id.value.let {
             if (_groups.containsKey(it)) _groups[it]!!
             else groupsLock.withLock {
-                _groups.getOrPut(it) { Group(bot, id) }
+                _groups.getOrPut(it) { Group(bot, id, coroutineContext) }
             }
         }
     }

+ 34 - 5
mirai-core/src/commonMain/kotlin/net.mamoe.mirai/contact/internal/ContactImpl.kt

@@ -2,19 +2,25 @@
 
 package net.mamoe.mirai.contact.internal
 
+import kotlinx.coroutines.CoroutineScope
 import net.mamoe.mirai.Bot
 import net.mamoe.mirai.contact.*
 import net.mamoe.mirai.contact.data.Profile
+import net.mamoe.mirai.event.subscribeAlways
 import net.mamoe.mirai.message.Message
 import net.mamoe.mirai.message.MessageChain
 import net.mamoe.mirai.message.chain
 import net.mamoe.mirai.message.singleChain
 import net.mamoe.mirai.network.protocol.tim.packet.action.*
+import net.mamoe.mirai.network.protocol.tim.packet.event.MemberJoinEventPacket
+import net.mamoe.mirai.network.protocol.tim.packet.event.MemberQuitEvent
 import net.mamoe.mirai.network.qqAccount
 import net.mamoe.mirai.network.sessionKey
 import net.mamoe.mirai.qqAccount
 import net.mamoe.mirai.sendPacket
+import net.mamoe.mirai.utils.MiraiInternalAPI
 import net.mamoe.mirai.withSession
+import kotlin.coroutines.CoroutineContext
 
 internal sealed class ContactImpl : Contact {
     abstract override suspend fun sendMessage(message: MessageChain)
@@ -23,6 +29,11 @@ internal sealed class ContactImpl : Contact {
     override suspend fun sendMessage(plain: String) = sendMessage(plain.singleChain())
 
     override suspend fun sendMessage(message: Message) = sendMessage(message.chain())
+
+    /**
+     * 开始监听事件, 以同步更新资料
+     */
+    internal abstract suspend fun CoroutineContext.startUpdater()
 }
 
 /**
@@ -30,17 +41,18 @@ internal sealed class ContactImpl : Contact {
  */
 @Suppress("FunctionName")
 @PublishedApi
-internal suspend fun Group(bot: Bot, groupId: GroupId): Group {
+internal suspend fun Group(bot: Bot, groupId: GroupId, context: CoroutineContext): Group {
     val info: RawGroupInfo = try {
         bot.withSession { GroupPacket.QueryGroupInfo(qqAccount, groupId.toInternalId(), sessionKey).sendAndExpect() }
     } catch (e: Exception) {
         error("Cannot obtain group info for id ${groupId.value}")
     }
-    return GroupImpl(bot, groupId).apply { this.info = info.parseBy(this) }
+    return GroupImpl(bot, groupId, context).apply { this.info = info.parseBy(this) }
 }
 
 @Suppress("MemberVisibilityCanBePrivate", "CanBeParameter")
-internal data class GroupImpl internal constructor(override val bot: Bot, val groupId: GroupId) : ContactImpl(), Group {
+internal data class GroupImpl internal constructor(override val bot: Bot, val groupId: GroupId, override val coroutineContext: CoroutineContext) :
+    ContactImpl(), Group, CoroutineScope {
     override val id: UInt get() = groupId.value
     override val internalId = GroupId(id).toInternalId()
 
@@ -52,7 +64,7 @@ internal data class GroupImpl internal constructor(override val bot: Bot, val gr
 
     override suspend fun getMember(id: UInt): Member =
         if (members.containsKey(id)) members[id]!!
-        else throw NoSuchElementException("No such member whose id is $id in group $id")
+        else throw NoSuchElementException("No such member whose id is $id in group ${groupId.value.toLong()}")
 
     override suspend fun sendMessage(message: MessageChain) {
         bot.sendPacket(GroupPacket.Message(bot.qqAccount, internalId, bot.sessionKey, message))
@@ -66,12 +78,25 @@ internal data class GroupImpl internal constructor(override val bot: Bot, val gr
         GroupPacket.QuitGroup(qqAccount, sessionKey, internalId).sendAndExpect()
     }
 
+    @UseExperimental(MiraiInternalAPI::class)
+    override suspend fun CoroutineContext.startUpdater() {
+        subscribeAlways<MemberJoinEventPacket> {
+            // FIXME: 2019/11/29 非线程安全!!
+            members.delegate[it.member.id] = it.member
+        }
+        subscribeAlways<MemberQuitEvent> {
+            // FIXME: 2019/11/29 非线程安全!!
+            members.delegate.remove(it.member.id)
+        }
+    }
+
     override fun toString(): String = "Group(${this.id})"
 
     override fun iterator(): Iterator<Member> = members.delegate.values.iterator()
 }
 
-internal data class QQImpl internal constructor(override val bot: Bot, override val id: UInt) : ContactImpl(), QQ {
+internal data class QQImpl internal constructor(override val bot: Bot, override val id: UInt, override val coroutineContext: CoroutineContext) : ContactImpl(),
+    QQ, CoroutineScope {
     override suspend fun sendMessage(message: MessageChain) =
         bot.sendPacket(SendFriendMessagePacket(bot.qqAccount, id, bot.sessionKey, message))
 
@@ -87,6 +112,10 @@ internal data class QQImpl internal constructor(override val bot: Bot, override
         QueryFriendRemarkPacket(bot.qqAccount, sessionKey, id).sendAndExpect()
     }
 
+    override suspend fun CoroutineContext.startUpdater() {
+        // TODO: 2019/11/28 被删除好友事件
+    }
+
     override fun toString(): String = "QQ(${this.id})"
 }
 

+ 2 - 19
mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/Subscribable.kt

@@ -2,14 +2,10 @@
 
 package net.mamoe.mirai.event
 
-import kotlinx.coroutines.*
 import net.mamoe.mirai.event.internal.broadcastInternal
 import net.mamoe.mirai.utils.DefaultLogger
 import net.mamoe.mirai.utils.MiraiLogger
 import net.mamoe.mirai.utils.withSwitch
-import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.EmptyCoroutineContext
-import kotlin.jvm.JvmOverloads
 
 /**
  * 可被监听的.
@@ -77,19 +73,15 @@ interface Cancellable : Subscribable {
 
 /**
  * 广播一个事件的唯一途径.
- * 这个方法将会把处理挂起在 [context] 下运行. 默认为使用 [EventDispatcher] 调度事件协程.
- *
- * @param context 事件处理协程运行的 [CoroutineContext].
  */
 @Suppress("UNCHECKED_CAST")
-@JvmOverloads
-suspend fun <E : Subscribable> E.broadcast(context: CoroutineContext = EmptyCoroutineContext): E {
+suspend fun <E : Subscribable> E.broadcast(): E {
     if (EventDebuggingFlag) {
         EventDebugLogger.debug(this::class.simpleName + " pre broadcast")
     }
     try {
         @Suppress("EXPERIMENTAL_API_USAGE")
-        return withContext(EventScope.newCoroutineContext(context)) { [email protected]() }
+        return [email protected]()
     } finally {
         if (EventDebuggingFlag) {
             EventDebugLogger.debug(this::class.simpleName + " after broadcast")
@@ -97,15 +89,6 @@ suspend fun <E : Subscribable> E.broadcast(context: CoroutineContext = EmptyCoro
     }
 }
 
-internal expect val EventDispatcher: CoroutineDispatcher
-
-object EventScope : CoroutineScope {
-    override val coroutineContext: CoroutineContext =
-        EventDispatcher + CoroutineExceptionHandler { _, e ->
-            MiraiLogger.error("An exception is thrown in EventScope", e)
-        }
-}
-
 /**
  * 可控制是否需要广播这个事件包
  */

+ 31 - 18
mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/Subscribers.kt

@@ -17,8 +17,15 @@ import kotlin.reflect.KClass
  */ // Not using enum for Android
 inline class ListeningStatus(inline val listening: Boolean) {
     companion object {
+        /**
+         * 表示继续监听
+         */
         @JvmStatic
         val LISTENING = ListeningStatus(true)
+
+        /**
+         * 表示已停止
+         */
         @JvmStatic
         val STOPPED = ListeningStatus(false)
     }
@@ -29,28 +36,34 @@ inline class ListeningStatus(inline val listening: Boolean) {
 
 /**
  * 订阅所有 [E] 及其子类事件.
- * 在
+ *
+ * 将以当前协程的 job 为父 job 启动监听, 因此, 当当前协程运行结束后, 监听也会结束.
+ * [handler] 将会有当前协程上下文执行, 即会被调用 [subscribe] 时的协程调度器执行
  */
-suspend inline fun <reified E : Subscribable> subscribe(noinline handler: suspend (E) -> ListeningStatus) = E::class.subscribe(handler)
+suspend inline fun <reified E : Subscribable> subscribe(noinline handler: suspend (E) -> ListeningStatus): Listener<E> = E::class.subscribe(handler)
 
-suspend inline fun <reified E : Subscribable> subscribeAlways(noinline listener: suspend (E) -> Unit) = E::class.subscribeAlways(listener)
+suspend inline fun <reified E : Subscribable> subscribeAlways(noinline listener: suspend (E) -> Unit): Listener<E> = E::class.subscribeAlways(listener)
 
-suspend inline fun <reified E : Subscribable> subscribeOnce(noinline listener: suspend (E) -> Unit) = E::class.subscribeOnce(listener)
+suspend inline fun <reified E : Subscribable> subscribeOnce(noinline listener: suspend (E) -> Unit): Listener<E> = E::class.subscribeOnce(listener)
 
-suspend inline fun <reified E : Subscribable, T> subscribeUntil(valueIfStop: T, noinline listener: suspend (E) -> T) =
+suspend inline fun <reified E : Subscribable, T> subscribeUntil(valueIfStop: T, noinline listener: suspend (E) -> T): Listener<E> =
     E::class.subscribeUntil(valueIfStop, listener)
 
-suspend inline fun <reified E : Subscribable> subscribeUntilFalse(noinline listener: suspend (E) -> Boolean) = E::class.subscribeUntilFalse(listener)
-suspend inline fun <reified E : Subscribable> subscribeUntilTrue(noinline listener: suspend (E) -> Boolean) = E::class.subscribeUntilTrue(listener)
-suspend inline fun <reified E : Subscribable> subscribeUntilNull(noinline listener: suspend (E) -> Any?) = E::class.subscribeUntilNull(listener)
+suspend inline fun <reified E : Subscribable> subscribeUntilFalse(noinline listener: suspend (E) -> Boolean): Listener<E> =
+    E::class.subscribeUntilFalse(listener)
 
+suspend inline fun <reified E : Subscribable> subscribeUntilTrue(noinline listener: suspend (E) -> Boolean): Listener<E> = E::class.subscribeUntilTrue(listener)
+suspend inline fun <reified E : Subscribable> subscribeUntilNull(noinline listener: suspend (E) -> Any?): Listener<E> = E::class.subscribeUntilNull(listener)
 
-suspend inline fun <reified E : Subscribable, T> subscribeWhile(valueIfContinue: T, noinline listener: suspend (E) -> T) =
+
+suspend inline fun <reified E : Subscribable, T> subscribeWhile(valueIfContinue: T, noinline listener: suspend (E) -> T): Listener<E> =
     E::class.subscribeWhile(valueIfContinue, listener)
 
-suspend inline fun <reified E : Subscribable> subscribeWhileFalse(noinline listener: suspend (E) -> Boolean) = E::class.subscribeWhileFalse(listener)
-suspend inline fun <reified E : Subscribable> subscribeWhileTrue(noinline listener: suspend (E) -> Boolean) = E::class.subscribeWhileTrue(listener)
-suspend inline fun <reified E : Subscribable> subscribeWhileNull(noinline listener: suspend (E) -> Any?) = E::class.subscribeWhileNull(listener)
+suspend inline fun <reified E : Subscribable> subscribeWhileFalse(noinline listener: suspend (E) -> Boolean): Listener<E> =
+    E::class.subscribeWhileFalse(listener)
+
+suspend inline fun <reified E : Subscribable> subscribeWhileTrue(noinline listener: suspend (E) -> Boolean): Listener<E> = E::class.subscribeWhileTrue(listener)
+suspend inline fun <reified E : Subscribable> subscribeWhileNull(noinline listener: suspend (E) -> Any?): Listener<E> = E::class.subscribeWhileNull(listener)
 
 // endregion
 
@@ -73,13 +86,13 @@ internal suspend fun <E : Subscribable, T> KClass<E>.subscribeUntil(valueIfStop:
     subscribeInternal(Handler { if (listener(it) === valueIfStop) ListeningStatus.STOPPED else ListeningStatus.LISTENING })
 
 @PublishedApi
-internal suspend fun <E : Subscribable> KClass<E>.subscribeUntilFalse(listener: suspend (E) -> Boolean) = subscribeUntil(false, listener)
+internal suspend inline fun <E : Subscribable> KClass<E>.subscribeUntilFalse(noinline listener: suspend (E) -> Boolean) = subscribeUntil(false, listener)
 
 @PublishedApi
-internal suspend fun <E : Subscribable> KClass<E>.subscribeUntilTrue(listener: suspend (E) -> Boolean) = subscribeUntil(true, listener)
+internal suspend inline fun <E : Subscribable> KClass<E>.subscribeUntilTrue(noinline listener: suspend (E) -> Boolean) = subscribeUntil(true, listener)
 
 @PublishedApi
-internal suspend fun <E : Subscribable> KClass<E>.subscribeUntilNull(listener: suspend (E) -> Any?) = subscribeUntil(null, listener)
+internal suspend inline fun <E : Subscribable> KClass<E>.subscribeUntilNull(noinline listener: suspend (E) -> Any?) = subscribeUntil(null, listener)
 
 
 @PublishedApi
@@ -87,13 +100,13 @@ internal suspend fun <E : Subscribable, T> KClass<E>.subscribeWhile(valueIfConti
     subscribeInternal(Handler { if (listener(it) !== valueIfContinue) ListeningStatus.STOPPED else ListeningStatus.LISTENING })
 
 @PublishedApi
-internal suspend fun <E : Subscribable> KClass<E>.subscribeWhileFalse(listener: suspend (E) -> Boolean) = subscribeWhile(false, listener)
+internal suspend inline fun <E : Subscribable> KClass<E>.subscribeWhileFalse(noinline listener: suspend (E) -> Boolean) = subscribeWhile(false, listener)
 
 @PublishedApi
-internal suspend fun <E : Subscribable> KClass<E>.subscribeWhileTrue(listener: suspend (E) -> Boolean) = subscribeWhile(true, listener)
+internal suspend inline fun <E : Subscribable> KClass<E>.subscribeWhileTrue(noinline listener: suspend (E) -> Boolean) = subscribeWhile(true, listener)
 
 @PublishedApi
-internal suspend fun <E : Subscribable> KClass<E>.subscribeWhileNull(listener: suspend (E) -> Any?) = subscribeWhile(null, listener)
+internal suspend inline fun <E : Subscribable> KClass<E>.subscribeWhileNull(noinline listener: suspend (E) -> Any?) = subscribeWhile(null, listener)
 
 // endregion
 

+ 39 - 18
mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/SubscribersWithBot.kt

@@ -5,6 +5,7 @@ package net.mamoe.mirai.event
 import net.mamoe.mirai.Bot
 import net.mamoe.mirai.event.events.BotEvent
 import net.mamoe.mirai.event.internal.HandlerWithBot
+import net.mamoe.mirai.event.internal.Listener
 import net.mamoe.mirai.event.internal.subscribeInternal
 import kotlin.reflect.KClass
 
@@ -17,24 +18,37 @@ import kotlin.reflect.KClass
 
 // region 顶层方法
 
-suspend inline fun <reified E : BotEvent> Bot.subscribe(noinline handler: suspend Bot.(E) -> ListeningStatus) = E::class.subscribe(this, handler)
+suspend inline fun <reified E : BotEvent> Bot.subscribe(noinline handler: suspend Bot.(E) -> ListeningStatus): Listener<E> = E::class.subscribe(this, handler)
 
-suspend inline fun <reified E : BotEvent> Bot.subscribeAlways(noinline listener: suspend Bot.(E) -> Unit) = E::class.subscribeAlways(this, listener)
+suspend inline fun <reified E : BotEvent> Bot.subscribeAlways(noinline listener: suspend Bot.(E) -> Unit): Listener<E> =
+    E::class.subscribeAlways(this, listener)
 
-suspend inline fun <reified E : BotEvent> Bot.subscribeOnce(noinline listener: suspend Bot.(E) -> Unit) = E::class.subscribeOnce(this, listener)
+suspend inline fun <reified E : BotEvent> Bot.subscribeOnce(noinline listener: suspend Bot.(E) -> Unit): Listener<E> = E::class.subscribeOnce(this, listener)
 
-suspend inline fun <reified E : BotEvent, T> Bot.subscribeUntil(valueIfStop: T, noinline listener: suspend Bot.(E) -> T) = E::class.subscribeUntil(this, valueIfStop, listener)
-suspend inline fun <reified E : BotEvent> Bot.subscribeUntilFalse(noinline listener: suspend Bot.(E) -> Boolean) = E::class.subscribeUntilFalse(this, listener)
-suspend inline fun <reified E : BotEvent> Bot.subscribeUntilTrue(noinline listener: suspend Bot.(E) -> Boolean) = E::class.subscribeUntilTrue(this, listener)
-suspend inline fun <reified E : BotEvent> Bot.subscribeUntilNull(noinline listener: suspend Bot.(E) -> Any?) = E::class.subscribeUntilNull(this, listener)
+suspend inline fun <reified E : BotEvent, T> Bot.subscribeUntil(valueIfStop: T, noinline listener: suspend Bot.(E) -> T): Listener<E> =
+    E::class.subscribeUntil(this, valueIfStop, listener)
 
+suspend inline fun <reified E : BotEvent> Bot.subscribeUntilFalse(noinline listener: suspend Bot.(E) -> Boolean): Listener<E> =
+    E::class.subscribeUntilFalse(this, listener)
 
-suspend inline fun <reified E : BotEvent, T> Bot.subscribeWhile(valueIfContinue: T, noinline listener: suspend Bot.(E) -> T) =
+suspend inline fun <reified E : BotEvent> Bot.subscribeUntilTrue(noinline listener: suspend Bot.(E) -> Boolean): Listener<E> =
+    E::class.subscribeUntilTrue(this, listener)
+
+suspend inline fun <reified E : BotEvent> Bot.subscribeUntilNull(noinline listener: suspend Bot.(E) -> Any?): Listener<E> =
+    E::class.subscribeUntilNull(this, listener)
+
+
+suspend inline fun <reified E : BotEvent, T> Bot.subscribeWhile(valueIfContinue: T, noinline listener: suspend Bot.(E) -> T): Listener<E> =
     E::class.subscribeWhile(this, valueIfContinue, listener)
 
-suspend inline fun <reified E : BotEvent> Bot.subscribeWhileFalse(noinline listener: suspend Bot.(E) -> Boolean) = E::class.subscribeWhileFalse(this, listener)
-suspend inline fun <reified E : BotEvent> Bot.subscribeWhileTrue(noinline listener: suspend Bot.(E) -> Boolean) = E::class.subscribeWhileTrue(this, listener)
-suspend inline fun <reified E : BotEvent> Bot.subscribeWhileNull(noinline listener: suspend Bot.(E) -> Any?) = E::class.subscribeWhileNull(this, listener)
+suspend inline fun <reified E : BotEvent> Bot.subscribeWhileFalse(noinline listener: suspend Bot.(E) -> Boolean): Listener<E> =
+    E::class.subscribeWhileFalse(this, listener)
+
+suspend inline fun <reified E : BotEvent> Bot.subscribeWhileTrue(noinline listener: suspend Bot.(E) -> Boolean): Listener<E> =
+    E::class.subscribeWhileTrue(this, listener)
+
+suspend inline fun <reified E : BotEvent> Bot.subscribeWhileNull(noinline listener: suspend Bot.(E) -> Any?): Listener<E> =
+    E::class.subscribeWhileNull(this, listener)
 
 // endregion
 
@@ -42,7 +56,8 @@ suspend inline fun <reified E : BotEvent> Bot.subscribeWhileNull(noinline listen
 // region KClass 的扩展方法 (仅内部使用)
 
 @PublishedApi
-internal suspend fun <E : BotEvent> KClass<E>.subscribe(bot: Bot, handler: suspend Bot.(E) -> ListeningStatus) = this.subscribeInternal(HandlerWithBot(bot, handler))
+internal suspend fun <E : BotEvent> KClass<E>.subscribe(bot: Bot, handler: suspend Bot.(E) -> ListeningStatus) =
+    this.subscribeInternal(HandlerWithBot(bot, handler))
 
 @PublishedApi
 internal suspend fun <E : BotEvent> KClass<E>.subscribeAlways(bot: Bot, listener: suspend Bot.(E) -> Unit) =
@@ -57,13 +72,16 @@ internal suspend fun <E : BotEvent, T> KClass<E>.subscribeUntil(bot: Bot, valueI
     subscribeInternal(HandlerWithBot(bot) { if (listener(it) === valueIfStop) ListeningStatus.STOPPED else ListeningStatus.LISTENING })
 
 @PublishedApi
-internal suspend fun <E : BotEvent> KClass<E>.subscribeUntilFalse(bot: Bot, listener: suspend Bot.(E) -> Boolean) = subscribeUntil(bot, false, listener)
+internal suspend inline fun <E : BotEvent> KClass<E>.subscribeUntilFalse(bot: Bot, noinline listener: suspend Bot.(E) -> Boolean) =
+    subscribeUntil(bot, false, listener)
 
 @PublishedApi
-internal suspend fun <E : BotEvent> KClass<E>.subscribeUntilTrue(bot: Bot, listener: suspend Bot.(E) -> Boolean) = subscribeUntil(bot, true, listener)
+internal suspend inline fun <E : BotEvent> KClass<E>.subscribeUntilTrue(bot: Bot, noinline listener: suspend Bot.(E) -> Boolean) =
+    subscribeUntil(bot, true, listener)
 
 @PublishedApi
-internal suspend fun <E : BotEvent> KClass<E>.subscribeUntilNull(bot: Bot, listener: suspend Bot.(E) -> Any?) = subscribeUntil(bot, null, listener)
+internal suspend inline fun <E : BotEvent> KClass<E>.subscribeUntilNull(bot: Bot, noinline listener: suspend Bot.(E) -> Any?) =
+    subscribeUntil(bot, null, listener)
 
 
 @PublishedApi
@@ -71,12 +89,15 @@ internal suspend fun <E : BotEvent, T> KClass<E>.subscribeWhile(bot: Bot, valueI
     subscribeInternal(HandlerWithBot(bot) { if (listener(it) !== valueIfContinue) ListeningStatus.STOPPED else ListeningStatus.LISTENING })
 
 @PublishedApi
-internal suspend fun <E : BotEvent> KClass<E>.subscribeWhileFalse(bot: Bot, listener: suspend Bot.(E) -> Boolean) = subscribeWhile(bot, false, listener)
+internal suspend inline fun <E : BotEvent> KClass<E>.subscribeWhileFalse(bot: Bot, noinline listener: suspend Bot.(E) -> Boolean) =
+    subscribeWhile(bot, false, listener)
 
 @PublishedApi
-internal suspend fun <E : BotEvent> KClass<E>.subscribeWhileTrue(bot: Bot, listener: suspend Bot.(E) -> Boolean) = subscribeWhile(bot, true, listener)
+internal suspend inline fun <E : BotEvent> KClass<E>.subscribeWhileTrue(bot: Bot, noinline listener: suspend Bot.(E) -> Boolean) =
+    subscribeWhile(bot, true, listener)
 
 @PublishedApi
-internal suspend fun <E : BotEvent> KClass<E>.subscribeWhileNull(bot: Bot, listener: suspend Bot.(E) -> Any?) = subscribeWhile(bot, null, listener)
+internal suspend inline fun <E : BotEvent> KClass<E>.subscribeWhileNull(bot: Bot, noinline listener: suspend Bot.(E) -> Any?) =
+    subscribeWhile(bot, null, listener)
 
 // endregion

+ 66 - 22
mirai-core/src/commonMain/kotlin/net.mamoe.mirai/event/internal/InternalEventListeners.kt

@@ -1,22 +1,28 @@
 package net.mamoe.mirai.event.internal
 
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
+import kotlinx.coroutines.*
 import kotlinx.coroutines.sync.Mutex
 import kotlinx.coroutines.sync.withLock
 import net.mamoe.mirai.Bot
 import net.mamoe.mirai.event.EventDebugLogger
-import net.mamoe.mirai.event.EventScope
 import net.mamoe.mirai.event.ListeningStatus
 import net.mamoe.mirai.event.Subscribable
 import net.mamoe.mirai.event.events.BotEvent
 import net.mamoe.mirai.utils.internal.inlinedRemoveIf
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.coroutineContext
 import kotlin.jvm.JvmField
 import kotlin.reflect.KClass
 import kotlin.reflect.KFunction
 
+/**
+ * 设置为 `true` 以关闭事件.
+ * 所有的 `subscribe` 都能正常添加到监听器列表, 但所有的广播都会直接返回.
+ */
 var EventDisabled = false
 
+// TODO: 2019/11/29 修改监听为 lock-free 模式
+
 /**
  * 监听和广播实现.
  * 它会首先检查这个事件是否正在被广播
@@ -24,8 +30,8 @@ var EventDisabled = false
  *  - 如果不是, 则直接将新的监听者放入主列表
  *
  * @author Him188moe
- */
-internal suspend fun <E : Subscribable> KClass<E>.subscribeInternal(listener: Listener<E>): Unit = with(this.listeners()) {
+ */ // inline to avoid a Continuation creation
+internal suspend inline fun <L : Listener<E>, E : Subscribable> KClass<E>.subscribeInternal(listener: L): L = with(this.listeners()) {
     if (mainMutex.tryLock(listener)) {//能锁则代表这个事件目前没有正在广播.
         try {
             add(listener)//直接修改主监听者列表
@@ -33,7 +39,7 @@ internal suspend fun <E : Subscribable> KClass<E>.subscribeInternal(listener: Li
         } finally {
             mainMutex.unlock(listener)
         }
-        return
+        return listener
     }
 
     //不能锁住, 则这个事件正在广播, 那么要将新的监听者放入缓存
@@ -42,7 +48,7 @@ internal suspend fun <E : Subscribable> KClass<E>.subscribeInternal(listener: Li
         EventDebugLogger.debug("Added a listener to cache of ${[email protected]}")
     }
 
-    EventScope.launch {
+    GlobalScope.launch {
         //启动协程并等待正在进行的广播结束, 然后将缓存转移到主监听者列表
         //启动后的协程马上就会因为锁而被挂起
         mainMutex.withLock(listener) {
@@ -55,21 +61,53 @@ internal suspend fun <E : Subscribable> KClass<E>.subscribeInternal(listener: Li
             }
         }
     }
+
+    return@with listener
 }
 
 /**
- * 事件监听器
+ * 事件监听器.
+ *
+ * 它实现 [CompletableJob] 接口,
+ * 可通过 [CompletableJob.complete] 来正常结束监听, 或 [CompletableJob.completeExceptionally] 来异常地结束监听.
  *
  * @author Him188moe
  */
-sealed class Listener<in E : Subscribable> {
+sealed class Listener<in E : Subscribable> : CompletableJob {
     internal val lock = Mutex()
     abstract suspend fun onEvent(event: E): ListeningStatus
 }
 
 @PublishedApi
-internal class Handler<in E : Subscribable>(@JvmField val handler: suspend (E) -> ListeningStatus) : Listener<E>() {
-    override suspend fun onEvent(event: E): ListeningStatus = handler.invoke(event)
+@Suppress("FunctionName")
+internal suspend inline fun <E : Subscribable> Handler(noinline handler: suspend (E) -> ListeningStatus): Handler<E> {
+    return Handler(coroutineContext[Job], coroutineContext, handler)
+}
+
+/**
+ * 事件处理器.
+ */
+@PublishedApi
+internal class Handler<in E : Subscribable>
+@PublishedApi internal constructor(parentJob: Job?, private val context: CoroutineContext, @JvmField val handler: suspend (E) -> ListeningStatus) :
+    Listener<E>(), CompletableJob by Job(parentJob) {
+
+    override suspend fun onEvent(event: E): ListeningStatus {
+        if (isCompleted || isCancelled) return ListeningStatus.STOPPED
+        if (!isActive) return ListeningStatus.LISTENING
+        return try {
+            withContext(context) { handler.invoke(event) }.also { if (it == ListeningStatus.STOPPED) this.complete() }
+        } catch (e: Throwable) {
+            this.completeExceptionally(e)
+            ListeningStatus.STOPPED
+        }
+    }
+}
+
+@PublishedApi
+@Suppress("FunctionName")
+internal suspend inline fun <E : Subscribable> HandlerWithBot(bot: Bot, noinline handler: suspend Bot.(E) -> ListeningStatus): HandlerWithBot<E> {
+    return HandlerWithBot(bot, coroutineContext[Job], coroutineContext, handler)
 }
 
 /**
@@ -78,17 +116,23 @@ internal class Handler<in E : Subscribable>(@JvmField val handler: suspend (E) -
  * 所有的 [BotEvent.bot] `!==` `bot` 的事件都不会被处理
  */
 @PublishedApi
-internal class HandlerWithBot<E : Subscribable>(val bot: Bot, @JvmField val handler: suspend Bot.(E) -> ListeningStatus) :
-    Listener<E>() {
-    override suspend fun onEvent(event: E): ListeningStatus = with(bot) {
-        if (event !is BotEvent || event.bot !== this) {
-            return ListeningStatus.LISTENING
-        }
-
-        return if (bot !== this) {
-            ListeningStatus.LISTENING
-        } else {
-            handler(event)
+internal class HandlerWithBot<E : Subscribable> @PublishedApi internal constructor(
+    val bot: Bot,
+    parentJob: Job?, private val context: CoroutineContext, @JvmField val handler: suspend Bot.(E) -> ListeningStatus
+) :
+    Listener<E>(), CompletableJob by Job(parentJob) {
+
+    override suspend fun onEvent(event: E): ListeningStatus {
+        if (isCompleted || isCancelled) return ListeningStatus.STOPPED
+        if (!isActive) return ListeningStatus.LISTENING
+
+        if (event !is BotEvent || event.bot !== bot) return ListeningStatus.LISTENING
+
+        return try {
+            withContext(context) { bot.handler(event) }.also { if (it == ListeningStatus.STOPPED) complete() }
+        } catch (e: Throwable) {
+            completeExceptionally(e)
+            ListeningStatus.STOPPED
         }
     }
 }

+ 2 - 2
mirai-core/src/commonMain/kotlin/net.mamoe.mirai/network/protocol/tim/TIMBotNetworkHandler.kt

@@ -242,8 +242,8 @@ internal class TIMBotNetworkHandler internal constructor(coroutineContext: Corou
             }
 
             when (packet) {
-                is Cancellable /* Cancellable : Subscribable */ -> if ((packet as Cancellable).broadcast(coroutineContext).cancelled) return
-                is Subscribable -> if ((packet as? BroadcastControllable)?.shouldBroadcast != false) packet.broadcast(coroutineContext)
+                is Cancellable /* Cancellable : Subscribable */ -> if ((packet as Cancellable).broadcast().cancelled) return
+                is Subscribable -> if ((packet as? BroadcastControllable)?.shouldBroadcast != false) packet.broadcast()
             }
 
             // Remove first to release the lock

+ 36 - 25
mirai-core/src/commonMain/kotlin/net.mamoe.mirai/network/protocol/tim/packet/event/MemberJoin.kt

@@ -9,46 +9,55 @@ import net.mamoe.mirai.contact.Group
 import net.mamoe.mirai.contact.Member
 import net.mamoe.mirai.contact.MemberPermission
 import net.mamoe.mirai.contact.internal.MemberImpl
-import net.mamoe.mirai.event.Event
+import net.mamoe.mirai.event.Subscribable
+import net.mamoe.mirai.event.broadcast
 import net.mamoe.mirai.getGroup
 import net.mamoe.mirai.getQQ
 import net.mamoe.mirai.network.BotNetworkHandler
-import net.mamoe.mirai.network.protocol.tim.packet.Packet
 import net.mamoe.mirai.utils.MiraiInternalAPI
 import net.mamoe.mirai.utils.io.discardExact
 
-
-//群有新成员加入
-//事件 id 00 21
-//
-//00 00 00 08 00 0A 00 04 01 00 00 00 32 DC FC C8 01 2D 5C 53 A6 03 3E 03 3F A2 06 B4 B4 BD A8 D5 DF 00 30 42 34 37 31 30 36 43 30 44 44 34 41 34 44 30 35 30 39 44 45 31 32 30 42 43 35 45 34 44 38 45 42 37 30 36 39 31 45 36 44 45 36 44 39 46 37 36 30
-
-/**
- * 新成员加入. 此时这个人还没被添加到群列表
- */
-internal class MemberJoinEventPacket(
-    val member: Member,
-    val inviter: Member?
-) : Packet
-
 /**
  * 成员加入前的事件. 群的成员列表中还没有这个人
  */
-data class PreMemberJoinEvent(val member: Member, private val _inviter: Member?) : Event() {
-    val group: Group get() = member.group
-    val inviter: Member get() = _inviter ?: error("The new member is not a invitee")
-    val isInvitee: Boolean get() = _inviter != null
+@UseExperimental(MiraiInternalAPI::class)
+inline class PreMemberJoinEvent constructor(private val packet: MemberJoinEventPacket) : MemberJoinEvent {
+    override val member: Member get() = packet.member
+    override val group: Group get() = packet.member.group
+    override val inviter: Member get() = packet.inviter ?: error("The new member is not a invitee")
+    override val isInvitee: Boolean get() = packet.inviter != null
 }
 
 /**
  * 成员加入后的事件. 群的成员列表中已经有这个人
  */
-data class PostMemberJoinEvent(val member: Member, private val _inviter: Member?) : Event() {
-    val group: Group get() = member.group
-    val inviter: Member get() = _inviter ?: error("The new member is not a invitee")
-    val isInvitee: Boolean get() = _inviter != null
+@UseExperimental(MiraiInternalAPI::class)
+inline class PostMemberJoinEvent constructor(private val packet: MemberJoinEventPacket) : MemberJoinEvent {
+    override val member: Member get() = packet.member
+    override val group: Group get() = packet.member.group
+    override val inviter: Member get() = packet.inviter ?: error("The new member is not a invitee")
+    override val isInvitee: Boolean get() = packet.inviter != null
 }
 
+interface MemberJoinEvent : Subscribable {
+    val member: Member
+    val group: Group
+    val inviter: Member
+    val isInvitee: Boolean
+}
+
+
+/**
+ * 新成员加入. 此时这个人还没被添加到群列表
+ *
+ * 仅内部使用
+ */
+@MiraiInternalAPI
+class MemberJoinEventPacket(
+    val member: Member,
+    val inviter: Member?
+) : MemberListChangedEvent // only for internal subscribing
+
 @UseExperimental(MiraiInternalAPI::class)
 internal object MemberJoinPacketHandler : KnownEventParserAndHandler<MemberJoinEventPacket>(0x0021u) {
     override suspend fun ByteReadPacket.parse(bot: Bot, identity: EventPacketIdentity): MemberJoinEventPacket {
@@ -75,6 +84,8 @@ internal object MemberJoinPacketHandler : KnownEventParserAndHandler<MemberJoinE
     }
 
     override suspend fun BotNetworkHandler<*>.handlePacket(packet: MemberJoinEventPacket) {
-
+        PreMemberJoinEvent(packet).broadcast()
+        packet.broadcast()
+        PostMemberJoinEvent(packet).broadcast()
     }
 }

+ 0 - 7
mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/event/EventDispatcherJvm.kt

@@ -1,7 +0,0 @@
-package net.mamoe.mirai.event
-
-import kotlinx.coroutines.CoroutineDispatcher
-import kotlinx.coroutines.asCoroutineDispatcher
-import java.util.concurrent.Executors
-
-internal actual val EventDispatcher: CoroutineDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher()

+ 2 - 3
mirai-core/src/jvmMain/kotlin/net/mamoe/mirai/event/SubscribersJvm.kt

@@ -1,14 +1,13 @@
 package net.mamoe.mirai.event
 
-import kotlinx.coroutines.runBlocking
-
 
 // TODO 添加更多
 /**
  * Jvm 调用实现(阻塞)
  */
 object Events {
+    /*
     @JvmStatic
     fun <E : Subscribable> subscribe(type: Class<E>, handler: suspend (E) -> ListeningStatus) =
-        runBlocking { type.kotlin.subscribe(handler) }
+        runBlocking { type.kotlin.subscribe(handler) }*/
 }

+ 1 - 1
mirai-demos/mirai-demo-gentleman/src/main/kotlin/demo/gentleman/Main.kt

@@ -66,7 +66,7 @@ suspend fun main() {
 
         "群资料" reply {
             if (this is GroupMessage) {
-                group.queryGroupInfo().toString().reply()
+                group.updateGroupInfo().toString().reply()
             }
         }