EventChannel.jvm.kt 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618
  1. /*
  2. * Copyright 2019-2021 Mamoe Technologies and contributors.
  3. *
  4. * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
  5. * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
  6. *
  7. * https://github.com/mamoe/mirai/blob/master/LICENSE
  8. */
  9. @file:Suppress(
  10. "INVISIBLE_MEMBER",
  11. "INVISIBLE_REFERENCE",
  12. "MemberVisibilityCanBePrivate",
  13. "unused",
  14. "ACTUAL_FUNCTION_WITH_DEFAULT_ARGUMENTS"
  15. )
  16. package net.mamoe.mirai.event
  17. import kotlinx.coroutines.*
  18. import kotlinx.coroutines.channels.Channel
  19. import kotlinx.coroutines.sync.Mutex
  20. import net.mamoe.mirai.Bot
  21. import net.mamoe.mirai.event.ConcurrencyKind.CONCURRENT
  22. import net.mamoe.mirai.event.ConcurrencyKind.LOCKED
  23. import net.mamoe.mirai.event.events.BotEvent
  24. import net.mamoe.mirai.internal.event.GlobalEventListeners
  25. import net.mamoe.mirai.internal.event.Handler
  26. import net.mamoe.mirai.internal.event.ListenerRegistry
  27. import net.mamoe.mirai.internal.event.registerEventHandler
  28. import net.mamoe.mirai.utils.MiraiExperimentalApi
  29. import net.mamoe.mirai.utils.MiraiLogger
  30. import net.mamoe.mirai.utils.cast
  31. import java.util.function.Consumer
  32. import kotlin.coroutines.CoroutineContext
  33. import kotlin.coroutines.EmptyCoroutineContext
  34. import kotlin.internal.LowPriorityInOverloadResolution
  35. import kotlin.reflect.KClass
  36. /**
  37. * 事件通道. 事件通道是监听事件的入口. **在不同的事件通道中可以监听到不同类型的事件**.
  38. *
  39. * [GlobalEventChannel] 是最大的通道: 所有的事件都可以在 [GlobalEventChannel] 监听到.
  40. * 通过 [Bot.eventChannel] 得到的通道只能监听到来自这个 [Bot] 的事件.
  41. *
  42. * ### 对通道的操作
  43. * - "缩窄" 通道: 通过 [EventChannel.filter]. 例如 `filter { it is BotEvent }` 得到一个只能监听到 [BotEvent] 的事件通道.
  44. * - 转换为 Kotlin 协程 [Channel]: [EventChannel.asChannel]
  45. * - 添加 [CoroutineContext]: [context], [parentJob], [parentScope], [exceptionHandler]
  46. *
  47. * ### 创建事件监听
  48. * - [EventChannel.subscribe] 创建带条件的一个事件监听器.
  49. * - [EventChannel.subscribeAlways] 创建一个总是监听事件的事件监听器.
  50. * - [EventChannel.subscribeOnce] 创建一个只监听单次的事件监听器.
  51. *
  52. * ### 获取事件通道
  53. * - 全局事件通道: [GlobalEventChannel]
  54. * - [BotEvent] 通道: [Bot.eventChannel]
  55. *
  56. * @see subscribe
  57. */
  58. public actual open class EventChannel<out BaseEvent : Event> @JvmOverloads internal actual constructor(
  59. public actual val baseEventClass: KClass<out BaseEvent>,
  60. /**
  61. * 此事件通道的默认 [CoroutineScope.coroutineContext]. 将会被添加给所有注册的事件监听器.
  62. */
  63. public actual val defaultCoroutineContext: CoroutineContext = EmptyCoroutineContext
  64. ) {
  65. /**
  66. * 创建事件监听并将监听结果发送在 [Channel]. 将返回值 [Channel] [关闭][Channel.close] 时将会同时关闭事件监听.
  67. *
  68. * 标注 [ExperimentalCoroutinesApi] 是因为使用了 [Channel.invokeOnClose]
  69. *
  70. * @param capacity Channel 容量. 详见 [Channel] 构造.
  71. *
  72. * @see subscribeAlways
  73. * @see Channel
  74. */
  75. @MiraiExperimentalApi
  76. @ExperimentalCoroutinesApi
  77. public actual fun asChannel(
  78. capacity: Int = Channel.RENDEZVOUS,
  79. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  80. concurrency: ConcurrencyKind = CONCURRENT,
  81. priority: EventPriority = EventPriority.NORMAL,
  82. ): Channel<out BaseEvent> {
  83. val channel = Channel<BaseEvent>(capacity)
  84. val listener = subscribeAlways(baseEventClass, coroutineContext, concurrency, priority) { channel.send(it) }
  85. channel.invokeOnClose {
  86. if (it != null) listener.completeExceptionally(it)
  87. else listener.complete()
  88. }
  89. return channel
  90. }
  91. // region transforming operations
  92. /**
  93. * 添加一个过滤器. 过滤器将在收到任何事件之后, 传递给通过 [EventChannel.subscribe] 注册的监听器之前调用.
  94. *
  95. * 若 [filter] 返回 `true`, 该事件将会被传给监听器. 否则将会被忽略, **监听器继续监听**.
  96. *
  97. * ## 线性顺序
  98. * 多个 [filter] 的处理是线性且有顺序的. 若一个 [filter] 已经返回了 `false` (代表忽略这个事件), 则会立即忽略, 而不会传递给后续过滤器.
  99. *
  100. * 示例:
  101. * ```
  102. * GlobalEventChannel // GlobalEventChannel 会收到全局所有事件, 事件类型是 Event
  103. * .filterIsInstance<BotEvent>() // 过滤, 只接受 BotEvent
  104. * .filter { event: BotEvent ->
  105. * // 此时的 event 一定是 BotEvent
  106. * event.bot.id == 123456 // 再过滤 event 的 bot.id
  107. * }
  108. * .subscribeAlways { event: BotEvent ->
  109. * // 现在 event 是 BotEvent, 且 bot.id == 123456
  110. * }
  111. * ```
  112. *
  113. * ## 过滤器挂起
  114. * [filter] 允许挂起协程. **过滤器的挂起将被认为是事件监听器的挂起**.
  115. *
  116. * 过滤器挂起是否会影响事件处理,
  117. * 取决于 [subscribe] 时的 [ConcurrencyKind] 和 [EventPriority].
  118. *
  119. * ## 过滤器异常处理
  120. * 若 [filter] 抛出异常, 将被包装为 [ExceptionInEventChannelFilterException] 并重新抛出.
  121. *
  122. * @see filterIsInstance 过滤指定类型的事件
  123. */
  124. @JvmSynthetic
  125. public actual fun filter(filter: suspend (event: BaseEvent) -> Boolean): EventChannel<BaseEvent> {
  126. val parent = this
  127. return object : EventChannel<BaseEvent>(baseEventClass, defaultCoroutineContext) {
  128. private inline val innerThis get() = this
  129. override fun <E : Event> (suspend (E) -> ListeningStatus).intercepted(): suspend (E) -> ListeningStatus {
  130. val thisIntercepted: suspend (E) -> ListeningStatus = { ev ->
  131. val filterResult = try {
  132. @Suppress("UNCHECKED_CAST")
  133. baseEventClass.isInstance(ev) && filter(ev as BaseEvent)
  134. } catch (e: Throwable) {
  135. if (e is ExceptionInEventChannelFilterException) throw e // wrapped by another filter
  136. throw ExceptionInEventChannelFilterException(ev, innerThis, cause = e)
  137. }
  138. if (filterResult) [email protected](ev)
  139. else ListeningStatus.LISTENING
  140. }
  141. return parent.intercept(thisIntercepted)
  142. }
  143. }
  144. }
  145. /**
  146. * [EventChannel.filter] 的 Java 版本.
  147. *
  148. * 添加一个过滤器. 过滤器将在收到任何事件之后, 传递给通过 [EventChannel.subscribe] 注册的监听器之前调用.
  149. *
  150. * 若 [filter] 返回 `true`, 该事件将会被传给监听器. 否则将会被忽略, **监听器继续监听**.
  151. *
  152. * ## 线性顺序
  153. * 多个 [filter] 的处理是线性且有顺序的. 若一个 [filter] 已经返回了 `false` (代表忽略这个事件), 则会立即忽略, 而不会传递给后续过滤器.
  154. *
  155. * 示例:
  156. * ```
  157. * GlobalEventChannel // GlobalEventChannel 会收到全局所有事件, 事件类型是 Event
  158. * .filterIsInstance(BotEvent.class) // 过滤, 只接受 BotEvent
  159. * .filter(event ->
  160. * // 此时的 event 一定是 BotEvent
  161. * event.bot.id == 123456 // 再过滤 event 的 bot.id
  162. * )
  163. * .subscribeAlways(event -> {
  164. * // 现在 event 是 BotEvent, 且 bot.id == 123456
  165. * })
  166. * ```
  167. *
  168. * ## 过滤器阻塞
  169. * [filter] 允许阻塞线程. **过滤器的阻塞将被认为是事件监听器的阻塞**.
  170. *
  171. * 过滤器阻塞是否会影响事件处理,
  172. * 取决于 [subscribe] 时的 [ConcurrencyKind] 和 [EventPriority].
  173. *
  174. * ## 过滤器异常处理
  175. * 若 [filter] 抛出异常, 将被包装为 [ExceptionInEventChannelFilterException] 并重新抛出.
  176. *
  177. * @see filterIsInstance 过滤指定类型的事件
  178. *
  179. * @since 2.2
  180. */
  181. @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
  182. @kotlin.internal.LowPriorityInOverloadResolution
  183. public actual fun filter(filter: (event: BaseEvent) -> Boolean): EventChannel<BaseEvent> {
  184. return filter { runInterruptible { filter(it) } }
  185. }
  186. /**
  187. * 过滤事件的类型. 返回一个只包含 [E] 类型事件的 [EventChannel]
  188. * @see filter 获取更多信息
  189. */
  190. @JvmSynthetic
  191. public actual inline fun <reified E : Event> filterIsInstance(): EventChannel<E> =
  192. filterIsInstance(E::class)
  193. /**
  194. * 过滤事件的类型. 返回一个只包含 [E] 类型事件的 [EventChannel]
  195. * @see filter 获取更多信息
  196. */
  197. public actual fun <E : Event> filterIsInstance(kClass: KClass<out E>): EventChannel<E> {
  198. return filter { kClass.isInstance(it) }.cast()
  199. }
  200. /**
  201. * 过滤事件的类型. 返回一个只包含 [E] 类型事件的 [EventChannel]
  202. * @see filter 获取更多信息
  203. */
  204. public actual fun <E : Event> filterIsInstance(clazz: Class<out E>): EventChannel<E> =
  205. filterIsInstance(clazz.kotlin)
  206. /**
  207. * 创建一个新的 [EventChannel], 该 [EventChannel] 包含 [`this.coroutineContext`][defaultCoroutineContext] 和添加的 [coroutineContexts].
  208. * [coroutineContexts] 会覆盖 [defaultCoroutineContext] 中的重复元素.
  209. *
  210. * 此操作不会修改 [`this.coroutineContext`][defaultCoroutineContext], 只会创建一个新的 [EventChannel].
  211. */
  212. public actual fun context(vararg coroutineContexts: CoroutineContext): EventChannel<BaseEvent> {
  213. val origin = this
  214. return object : EventChannel<BaseEvent>(
  215. baseEventClass,
  216. coroutineContexts.fold(this.defaultCoroutineContext) { acc, element -> acc + element }
  217. ) {
  218. override fun <E : Event> (suspend (E) -> ListeningStatus).intercepted(): suspend (E) -> ListeningStatus {
  219. return origin.intercept(this)
  220. }
  221. }
  222. }
  223. /**
  224. * 创建一个新的 [EventChannel], 该 [EventChannel] 包含 [this.coroutineContext][defaultCoroutineContext] 和添加的 [coroutineExceptionHandler]
  225. * @see context
  226. */
  227. @LowPriorityInOverloadResolution
  228. public actual fun exceptionHandler(coroutineExceptionHandler: CoroutineExceptionHandler): EventChannel<BaseEvent> {
  229. return context(coroutineExceptionHandler)
  230. }
  231. /**
  232. * 创建一个新的 [EventChannel], 该 [EventChannel] 包含 [`this.coroutineContext`][defaultCoroutineContext] 和添加的 [coroutineExceptionHandler]
  233. * @see context
  234. */
  235. public actual fun exceptionHandler(coroutineExceptionHandler: (exception: Throwable) -> Unit): EventChannel<BaseEvent> {
  236. return context(CoroutineExceptionHandler { _, throwable ->
  237. coroutineExceptionHandler(throwable)
  238. })
  239. }
  240. /**
  241. * 将 [coroutineScope] 作为这个 [EventChannel] 的父作用域.
  242. *
  243. * 实际作用为创建一个新的 [EventChannel],
  244. * 该 [EventChannel] 包含 [`this.coroutineContext`][defaultCoroutineContext] 和添加的 [CoroutineScope.coroutineContext],
  245. * 并以 [CoroutineScope] 中 [Job] (如果有) [作为父 Job][parentJob]
  246. *
  247. * @see parentJob
  248. * @see context
  249. *
  250. * @see CoroutineScope.globalEventChannel `GlobalEventChannel.parentScope()` 的扩展
  251. */
  252. public actual fun parentScope(coroutineScope: CoroutineScope): EventChannel<BaseEvent> {
  253. return context(coroutineScope.coroutineContext)
  254. }
  255. /**
  256. * 指定协程父 [Job]. 之后在此 [EventChannel] 下创建的事件监听器都会成为 [job] 的子任务, 当 [job] 被取消时, 所有的事件监听器都会被取消.
  257. *
  258. * 注意: 监听器不会失败 ([Job.cancel]). 监听器处理过程的异常都会被捕获然后交由 [CoroutineExceptionHandler] 处理, 因此 [job] 不会因为子任务监听器的失败而被取消.
  259. *
  260. * @see parentScope
  261. * @see context
  262. */
  263. public actual fun parentJob(job: Job): EventChannel<BaseEvent> {
  264. return context(job)
  265. }
  266. // endregion
  267. // region subscribe
  268. /**
  269. * 创建一个事件监听器, 监听事件通道中所有 [E] 及其子类事件.
  270. *
  271. * 每当 [事件广播][Event.broadcast] 时, [handler] 都会被执行.
  272. *
  273. *
  274. * ## 创建监听
  275. * 调用本函数:
  276. * ```
  277. * eventChannel.subscribe<E> { /* 会收到此通道中的所有是 E 的事件 */ }
  278. * ```
  279. *
  280. * ## 生命周期
  281. *
  282. * ### 通过协程作用域管理监听器
  283. * 本函数将会创建一个 [Job], 成为 [parentJob] 中的子任务. 可创建一个 [CoroutineScope] 来管理所有的监听器:
  284. * ```
  285. * val scope = CoroutineScope(SupervisorJob())
  286. *
  287. * val scopedChannel = eventChannel.parentScope(scope) // 将协程作用域 scope 附加到这个 EventChannel
  288. *
  289. * scopedChannel.subscribeAlways<MemberJoinEvent> { /* ... */ } // 启动监听, 监听器协程会作为 scope 的子任务
  290. * scopedChannel.subscribeAlways<MemberMuteEvent> { /* ... */ } // 启动监听, 监听器协程会作为 scope 的子任务
  291. *
  292. * scope.cancel() // 停止了协程作用域, 也就取消了两个监听器
  293. * ```
  294. *
  295. * 这个函数返回 [Listener], 它是一个 [CompletableJob]. 它会成为 [parentJob] 或 [parentScope] 的一个 [子任务][Job]
  296. *
  297. * ### 停止监听
  298. * 如果 [handler] 返回 [ListeningStatus.STOPPED] 监听器将被停止.
  299. *
  300. * 也可以通过 [subscribe] 返回值 [Listener] 的 [Listener.complete]
  301. *
  302. * ## 监听器调度
  303. * 监听器会被创建一个协程任务, 语义上在 [parentScope] 下运行.
  304. * 通过 Kotlin [默认协程调度器][Dispatchers.Default] 在固定的全局共享线程池里执行, 除非有 [coroutineContext] 指定.
  305. *
  306. * 默认在 [handler] 中不能处理阻塞任务. 阻塞任务将会阻塞一个 Kotlin 全局协程调度线程并可能导致严重问题.
  307. * 请通过 `withContext(Dispatchers.IO) { }` 等方法执行阻塞工作.
  308. *
  309. * ## 异常处理
  310. * - 当参数 [handler] 处理抛出异常时, 将会按如下顺序寻找 [CoroutineExceptionHandler] 处理异常:
  311. * 1. 参数 [coroutineContext]
  312. * 2. [EventChannel.defaultCoroutineContext]
  313. * 3. [Event.broadcast] 调用者的 [coroutineContext]
  314. * 4. 若事件为 [BotEvent], 则从 [BotEvent.bot] 获取到 [Bot], 进而在 [Bot.coroutineContext] 中寻找
  315. * 5. 若以上四个步骤均无法获取 [CoroutineExceptionHandler], 则使用 [MiraiLogger.Companion] 通过日志记录. 但这种情况理论上不应发生.
  316. *
  317. *
  318. * 事件处理时抛出异常不会停止监听器.
  319. *
  320. * 建议在事件处理中 (即 [handler] 里) 处理异常,
  321. * 或在参数 [coroutineContext] 中添加 [CoroutineExceptionHandler], 或通过 [EventChannel.exceptionHandler].
  322. *
  323. * ## 并发安全性
  324. * 基于 [concurrency] 参数, 事件监听器可以被允许并行执行.
  325. *
  326. * - 若 [concurrency] 为 [ConcurrencyKind.CONCURRENT], [handler] 可能被并行调用, 需要保证并发安全.
  327. * - 若 [concurrency] 为 [ConcurrencyKind.LOCKED], [handler] 会被 [Mutex] 限制.
  328. *
  329. * @param coroutineContext 在 [defaultCoroutineContext] 的基础上, 给事件监听协程的额外的 [CoroutineContext].
  330. * @param concurrency 并发类型. 查看 [ConcurrencyKind]
  331. * @param priority 监听优先级,优先级越高越先执行
  332. * @param handler 事件处理器. 在接收到事件时会调用这个处理器. 其返回值意义参考 [ListeningStatus]. 其异常处理参考上文
  333. *
  334. * @return 监听器实例. 此监听器已经注册到指定事件上, 在事件广播时将会调用 [handler]
  335. *
  336. * @see syncFromEvent 挂起当前协程, 监听一个事件, 并尝试从这个事件中**同步**一个值
  337. * @see asyncFromEvent 异步监听一个事件, 并尝试从这个事件中获取一个值.
  338. *
  339. * @see nextEvent 挂起当前协程, 直到监听到事件 [E] 的广播, 返回这个事件实例.
  340. *
  341. * @see selectMessages 以 `when` 的语法 '选择' 即将到来的一条消息.
  342. * @see whileSelectMessages 以 `when` 的语法 '选择' 即将到来的所有消息, 直到不满足筛选结果.
  343. *
  344. * @see subscribeAlways 一直监听
  345. * @see subscribeOnce 只监听一次
  346. *
  347. * @see subscribeMessages 监听消息 DSL
  348. */
  349. @JvmSynthetic
  350. public actual inline fun <reified E : Event> subscribe(
  351. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  352. concurrency: ConcurrencyKind = LOCKED,
  353. priority: EventPriority = EventPriority.NORMAL,
  354. noinline handler: suspend E.(E) -> ListeningStatus
  355. ): Listener<E> = subscribe(E::class, coroutineContext, concurrency, priority, handler)
  356. /**
  357. * 与 [subscribe] 的区别是接受 [eventClass] 参数, 而不使用 `reified` 泛型. 通常推荐使用具体化类型参数.
  358. *
  359. * @return 监听器实例. 此监听器已经注册到指定事件上, 在事件广播时将会调用 [handler]
  360. * @see subscribe
  361. */
  362. @JvmSynthetic
  363. public actual fun <E : Event> subscribe(
  364. eventClass: KClass<out E>,
  365. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  366. concurrency: ConcurrencyKind = LOCKED,
  367. priority: EventPriority = EventPriority.NORMAL,
  368. handler: suspend E.(E) -> ListeningStatus
  369. ): Listener<E> = subscribeInternal(
  370. eventClass,
  371. createListener(coroutineContext, concurrency, priority) { it.handler(it); }
  372. )
  373. /**
  374. * 创建一个事件监听器, 监听事件通道中所有 [E] 及其子类事件.
  375. * 每当 [事件广播][Event.broadcast] 时, [handler] 都会被执行.
  376. *
  377. * 可在任意时候通过 [Listener.complete] 来主动停止监听.
  378. *
  379. * @param concurrency 并发类型默认为 [CONCURRENT]
  380. * @param coroutineContext 在 [defaultCoroutineContext] 的基础上, 给事件监听协程的额外的 [CoroutineContext]
  381. * @param priority 处理优先级, 优先级高的先执行
  382. *
  383. * @return 监听器实例. 此监听器已经注册到指定事件上, 在事件广播时将会调用 [handler]
  384. *
  385. * @see subscribe 获取更多说明
  386. */
  387. @JvmSynthetic
  388. public actual inline fun <reified E : Event> subscribeAlways(
  389. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  390. concurrency: ConcurrencyKind = CONCURRENT,
  391. priority: EventPriority = EventPriority.NORMAL,
  392. noinline handler: suspend E.(E) -> Unit
  393. ): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority, handler)
  394. /**
  395. * @see subscribe
  396. * @see subscribeAlways
  397. */
  398. @JvmSynthetic
  399. public actual fun <E : Event> subscribeAlways(
  400. eventClass: KClass<out E>,
  401. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  402. concurrency: ConcurrencyKind = CONCURRENT,
  403. priority: EventPriority = EventPriority.NORMAL,
  404. handler: suspend E.(E) -> Unit
  405. ): Listener<E> = subscribeInternal(
  406. eventClass,
  407. createListener(coroutineContext, concurrency, priority) { it.handler(it); ListeningStatus.LISTENING }
  408. )
  409. /**
  410. * 创建一个事件监听器, 监听事件通道中所有 [E] 及其子类事件.
  411. * 每当 [事件广播][Event.broadcast] 时, [handler] 都会被执行.
  412. *
  413. * 可在任意时候通过 [Listener.complete] 来主动停止监听.
  414. *
  415. * @param coroutineContext 在 [defaultCoroutineContext] 的基础上, 给事件监听协程的额外的 [CoroutineContext]
  416. * @param priority 处理优先级, 优先级高的先执行
  417. *
  418. * @see subscribe 获取更多说明
  419. */
  420. @JvmSynthetic
  421. public actual inline fun <reified E : Event> subscribeOnce(
  422. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  423. priority: EventPriority = EventPriority.NORMAL,
  424. noinline handler: suspend E.(E) -> Unit
  425. ): Listener<E> = subscribeOnce(E::class, coroutineContext, priority, handler)
  426. /**
  427. * @see subscribeOnce
  428. */
  429. public actual fun <E : Event> subscribeOnce(
  430. eventClass: KClass<out E>,
  431. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  432. priority: EventPriority = EventPriority.NORMAL,
  433. handler: suspend E.(E) -> Unit
  434. ): Listener<E> = subscribeInternal(
  435. eventClass,
  436. createListener(coroutineContext, LOCKED, priority) { it.handler(it); ListeningStatus.STOPPED }
  437. )
  438. // endregion
  439. /**
  440. * 注册 [ListenerHost] 中的所有 [EventHandler] 标注的方法到这个 [EventChannel]. 查看 [EventHandler].
  441. *
  442. * @param coroutineContext 在 [defaultCoroutineContext] 的基础上, 给事件监听协程的额外的 [CoroutineContext]
  443. *
  444. * @see subscribe
  445. * @see EventHandler
  446. * @see ListenerHost
  447. */
  448. @JvmOverloads
  449. public actual fun registerListenerHost(
  450. host: ListenerHost,
  451. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  452. ) {
  453. for (method in host.javaClass.declaredMethods) {
  454. method.getAnnotation(EventHandler::class.java)?.let {
  455. method.registerEventHandler(host, this, it, coroutineContext)
  456. }
  457. }
  458. }
  459. // region Java API
  460. /**
  461. * Java API. 查看 [subscribeAlways] 获取更多信息.
  462. *
  463. * ```java
  464. * eventChannel.subscribeAlways(GroupMessageEvent.class, (event) -> { });
  465. * ```
  466. *
  467. * @see subscribe
  468. * @see subscribeAlways
  469. */
  470. @JvmOverloads
  471. @LowPriorityInOverloadResolution
  472. public fun <E : Event> subscribeAlways(
  473. eventClass: Class<out E>,
  474. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  475. concurrency: ConcurrencyKind = CONCURRENT,
  476. priority: EventPriority = EventPriority.NORMAL,
  477. handler: Consumer<E>
  478. ): Listener<E> = subscribeInternal(
  479. eventClass.kotlin,
  480. createListener(coroutineContext, concurrency, priority) { event ->
  481. runInterruptible(Dispatchers.IO) { handler.accept(event) }
  482. ListeningStatus.LISTENING
  483. }
  484. )
  485. /**
  486. * Java API. 查看 [subscribe] 获取更多信息.
  487. *
  488. * ```java
  489. * eventChannel.subscribe(GroupMessageEvent.class, (event) -> {
  490. * return ListeningStatus.LISTENING;
  491. * });
  492. * ```
  493. *
  494. * @see subscribe
  495. */
  496. @JvmOverloads
  497. @LowPriorityInOverloadResolution
  498. public fun <E : Event> subscribe(
  499. eventClass: Class<out E>,
  500. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  501. concurrency: ConcurrencyKind = CONCURRENT,
  502. priority: EventPriority = EventPriority.NORMAL,
  503. handler: java.util.function.Function<E, ListeningStatus>
  504. ): Listener<E> = subscribeInternal(
  505. eventClass.kotlin,
  506. createListener(coroutineContext, concurrency, priority) { event ->
  507. runInterruptible(Dispatchers.IO) { handler.apply(event) }
  508. }
  509. )
  510. /**
  511. * Java API. 查看 [subscribeOnce] 获取更多信息.
  512. *
  513. * ```java
  514. * eventChannel.subscribeOnce(GroupMessageEvent.class, (event) -> { });
  515. * ```
  516. *
  517. * @see subscribe
  518. * @see subscribeOnce
  519. */
  520. @JvmOverloads
  521. @LowPriorityInOverloadResolution
  522. public fun <E : Event> subscribeOnce(
  523. eventClass: Class<out E>,
  524. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  525. concurrency: ConcurrencyKind = CONCURRENT,
  526. priority: EventPriority = EventPriority.NORMAL,
  527. handler: Consumer<E>
  528. ): Listener<E> = subscribeInternal(
  529. eventClass.kotlin,
  530. createListener(coroutineContext, concurrency, priority) { event ->
  531. runInterruptible(Dispatchers.IO) { handler.accept(event) }
  532. ListeningStatus.STOPPED
  533. }
  534. )
  535. // endregion
  536. // region impl
  537. /**
  538. * 由子类实现,可以为 handler 包装一个过滤器等. 每个 handler 都会经过此函数处理.
  539. */
  540. @MiraiExperimentalApi
  541. protected actual open fun <E : Event> (suspend (E) -> ListeningStatus).intercepted(): (suspend (E) -> ListeningStatus) {
  542. return this
  543. }
  544. private fun <E : Event> intercept(listener: (suspend (E) -> ListeningStatus)): suspend (E) -> ListeningStatus {
  545. return listener.intercepted()
  546. }
  547. private fun <L : Listener<E>, E : Event> subscribeInternal(eventClass: KClass<out E>, listener: L): L {
  548. with(GlobalEventListeners[listener.priority]) {
  549. @Suppress("UNCHECKED_CAST")
  550. val node = ListenerRegistry(listener as Listener<Event>, eventClass)
  551. add(node)
  552. listener.invokeOnCompletion {
  553. this.remove(node)
  554. }
  555. }
  556. return listener
  557. }
  558. @Suppress("FunctionName")
  559. private fun <E : Event> createListener(
  560. coroutineContext: CoroutineContext,
  561. concurrencyKind: ConcurrencyKind,
  562. priority: EventPriority = EventPriority.NORMAL,
  563. handler: suspend (E) -> ListeningStatus
  564. ): Listener<E> {
  565. val context = this.defaultCoroutineContext + coroutineContext
  566. return Handler(
  567. parentJob = context[Job],
  568. subscriberContext = context,
  569. handler = handler.intercepted(),
  570. concurrencyKind = concurrencyKind,
  571. priority = priority
  572. )
  573. }
  574. // endregion
  575. }