|
|
@@ -1,30 +1,28 @@
|
|
|
@file:Suppress("EXPERIMENTAL_API_USAGE")
|
|
|
|
|
|
-package net.mamoe.mirai.network.protocol.timpc
|
|
|
+package net.mamoe.mirai.timpc.network
|
|
|
|
|
|
import kotlinx.coroutines.*
|
|
|
-import kotlinx.coroutines.sync.Mutex
|
|
|
-import kotlinx.coroutines.sync.withLock
|
|
|
import kotlinx.io.core.*
|
|
|
import kotlinx.io.pool.useInstance
|
|
|
import net.mamoe.mirai.Bot
|
|
|
-import net.mamoe.mirai.event.*
|
|
|
-import net.mamoe.mirai.event.events.BeforePacketSendEvent
|
|
|
+import net.mamoe.mirai.event.BroadcastControllable
|
|
|
+import net.mamoe.mirai.event.Cancellable
|
|
|
+import net.mamoe.mirai.event.Subscribable
|
|
|
+import net.mamoe.mirai.event.broadcast
|
|
|
import net.mamoe.mirai.event.events.BotLoginSucceedEvent
|
|
|
-import net.mamoe.mirai.event.events.PacketSentEvent
|
|
|
-import net.mamoe.mirai.event.events.ServerPacketReceivedEvent
|
|
|
import net.mamoe.mirai.network.BotNetworkHandler
|
|
|
-import net.mamoe.mirai.network.BotSession
|
|
|
-import net.mamoe.mirai.network.protocol.timpc.handler.DataPacketSocketAdapter
|
|
|
-import net.mamoe.mirai.network.protocol.timpc.handler.TemporaryPacketHandler
|
|
|
-import net.mamoe.mirai.network.protocol.timpc.packet.*
|
|
|
-import net.mamoe.mirai.network.protocol.timpc.packet.login.*
|
|
|
+import net.mamoe.mirai.network.data.LoginResult
|
|
|
+import net.mamoe.mirai.network.data.Packet
|
|
|
import net.mamoe.mirai.qqAccount
|
|
|
-import net.mamoe.mirai.utils.OnlineStatus
|
|
|
-import net.mamoe.mirai.utils.currentBotConfiguration
|
|
|
+import net.mamoe.mirai.timpc.TIMPCBot
|
|
|
+import net.mamoe.mirai.timpc.network.handler.DataPacketSocketAdapter
|
|
|
+import net.mamoe.mirai.timpc.network.handler.TemporaryPacketHandler
|
|
|
+import net.mamoe.mirai.timpc.network.packet.*
|
|
|
+import net.mamoe.mirai.timpc.network.packet.login.*
|
|
|
+import net.mamoe.mirai.utils.*
|
|
|
import net.mamoe.mirai.utils.io.*
|
|
|
import kotlin.coroutines.CoroutineContext
|
|
|
-import kotlin.properties.Delegates
|
|
|
import kotlin.random.Random
|
|
|
|
|
|
/**
|
|
|
@@ -39,9 +37,9 @@ internal expect val NetworkDispatcher: CoroutineDispatcher
|
|
|
*
|
|
|
* @see BotNetworkHandler
|
|
|
*/
|
|
|
-internal class TIMBotNetworkHandler internal constructor(coroutineContext: CoroutineContext, override val bot: Bot) :
|
|
|
- BotNetworkHandler<TIMBotNetworkHandler.BotSocketAdapter>, CoroutineScope {
|
|
|
-
|
|
|
+internal class TIMBotNetworkHandler internal constructor(coroutineContext: CoroutineContext, bot: TIMPCBot) :
|
|
|
+ BotNetworkHandler(), CoroutineScope {
|
|
|
+ override val bot: TIMPCBot by bot.unsafeWeakRef()
|
|
|
override val supervisor: CompletableJob = SupervisorJob(coroutineContext[Job])
|
|
|
|
|
|
override val coroutineContext: CoroutineContext =
|
|
|
@@ -50,52 +48,43 @@ internal class TIMBotNetworkHandler internal constructor(coroutineContext: Corou
|
|
|
?: "an unnamed coroutine"} under TIMBotNetworkHandler", e)
|
|
|
} + supervisor
|
|
|
|
|
|
- override lateinit var socket: BotSocketAdapter
|
|
|
+ lateinit var socket: BotSocketAdapter
|
|
|
private set
|
|
|
|
|
|
- private val temporaryPacketHandlers = mutableListOf<TemporaryPacketHandler<*, *>>()
|
|
|
- private val handlersLock = Mutex()
|
|
|
+ internal val temporaryPacketHandlers = LockFreeLinkedList<TemporaryPacketHandler<*, *>>()
|
|
|
|
|
|
private var heartbeatJob: Job? = null
|
|
|
|
|
|
- suspend fun addHandler(temporaryPacketHandler: TemporaryPacketHandler<*, *>) {
|
|
|
- handlersLock.withLock {
|
|
|
- temporaryPacketHandlers.add(temporaryPacketHandler)
|
|
|
- }
|
|
|
- temporaryPacketHandler.send(this.session)
|
|
|
- }
|
|
|
+ override suspend fun login() {
|
|
|
|
|
|
- override suspend fun login(): LoginResult {
|
|
|
- return withContext(this.coroutineContext) {
|
|
|
- TIMProtocol.SERVER_IP.sortedBy { Random.nextInt() }.forEach { ip ->
|
|
|
- bot.logger.info("Connecting server $ip")
|
|
|
- try {
|
|
|
- withTimeout(3000) {
|
|
|
- socket = BotSocketAdapter(ip)
|
|
|
- }
|
|
|
- } catch (e: Exception) {
|
|
|
- return@withContext LoginResult.NETWORK_UNAVAILABLE
|
|
|
+ TIMProtocol.SERVER_IP.sortedBy { Random.nextInt() }.forEach { ip ->
|
|
|
+ bot.logger.info("Connecting server $ip")
|
|
|
+ try {
|
|
|
+ withTimeout(3000) {
|
|
|
+ socket = BotSocketAdapter(ip)
|
|
|
}
|
|
|
+ } catch (e: Exception) {
|
|
|
+ throw LoginFailedException(LoginResult.NETWORK_UNAVAILABLE)
|
|
|
+ }
|
|
|
|
|
|
- loginResult = CompletableDeferred()
|
|
|
+ loginResult = CompletableDeferred()
|
|
|
|
|
|
- socket.resendTouch().takeIf { it != LoginResult.TIMEOUT }?.let { return@withContext it }
|
|
|
+ val result = socket.resendTouch() ?: return // success
|
|
|
+ result.takeIf { it != LoginResult.TIMEOUT }?.let { throw LoginFailedException(it) }
|
|
|
|
|
|
- bot.logger.warning("Timeout. Retrying next server")
|
|
|
+ bot.logger.warning("Timeout. Retrying next server")
|
|
|
|
|
|
- socket.close()
|
|
|
- }
|
|
|
- return@withContext LoginResult.TIMEOUT
|
|
|
+ socket.close()
|
|
|
}
|
|
|
+ throw LoginFailedException(LoginResult.TIMEOUT)
|
|
|
}
|
|
|
|
|
|
- internal var loginResult: CompletableDeferred<LoginResult> = CompletableDeferred()
|
|
|
-
|
|
|
- override lateinit var session: BotSession
|
|
|
+ internal var loginResult: CompletableDeferred<LoginResult?> = CompletableDeferred()
|
|
|
|
|
|
//private | internal
|
|
|
|
|
|
- private var sessionKey: SessionKey by Delegates.notNull()
|
|
|
+ private var _sessionKey: SessionKey? = null
|
|
|
+ internal val sessionKey: SessionKey get() = _sessionKey ?: error("sessionKey is not yet initialized")
|
|
|
|
|
|
override suspend fun awaitDisconnection() {
|
|
|
heartbeatJob?.join()
|
|
|
@@ -187,43 +176,38 @@ internal class TIMBotNetworkHandler internal constructor(coroutineContext: Corou
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- internal suspend fun resendTouch(): LoginResult /* = coroutineScope */ {
|
|
|
+ internal suspend fun resendTouch(): LoginResult? /* = coroutineScope */ {
|
|
|
loginHandler?.close()
|
|
|
|
|
|
loginHandler = LoginHandler()
|
|
|
|
|
|
|
|
|
- val expect = expectPacket<TouchPacket.TouchResponse>()
|
|
|
- launch { processReceive() }
|
|
|
- launch {
|
|
|
- if (withTimeoutOrNull(currentBotConfiguration().touchTimeoutMillis) { expect.join() } == null) {
|
|
|
- loginResult.complete(LoginResult.TIMEOUT)
|
|
|
+ expectingTouchResponse = Job(supervisor)
|
|
|
+ try {
|
|
|
+ launch { processReceive() }
|
|
|
+ launch {
|
|
|
+ if (withTimeoutOrNull(currentBotConfiguration().touchTimeoutMillis) { expectingTouchResponse!!.join() } == null) {
|
|
|
+ loginResult.complete(LoginResult.TIMEOUT)
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- sendPacket(TouchPacket(bot.qqAccount, serverIp, false))
|
|
|
+ sendPacket(TouchPacket(bot.qqAccount, serverIp, false))
|
|
|
|
|
|
- return loginResult.await()
|
|
|
- }
|
|
|
-
|
|
|
- private suspend inline fun <reified P : Packet> expectPacket(): CompletableDeferred<P> {
|
|
|
- val receiving = CompletableDeferred<P>(coroutineContext[Job])
|
|
|
- subscribe<ServerPacketReceivedEvent<*>> {
|
|
|
- if (it.packet is P && it.bot === bot) {
|
|
|
- receiving.complete(it.packet)
|
|
|
- ListeningStatus.STOPPED
|
|
|
- } else
|
|
|
- ListeningStatus.LISTENING
|
|
|
+ return loginResult.await()
|
|
|
+ } finally {
|
|
|
+ expectingTouchResponse = null
|
|
|
}
|
|
|
- return receiving
|
|
|
}
|
|
|
|
|
|
+ private var expectingTouchResponse: CompletableJob? = null
|
|
|
+
|
|
|
private suspend fun <TPacket : Packet> handlePacket0(
|
|
|
sequenceId: UShort,
|
|
|
packet: TPacket,
|
|
|
factory: PacketFactory<TPacket, *>
|
|
|
) {
|
|
|
- if (ServerPacketReceivedEvent(bot, packet).broadcast().cancelled)
|
|
|
- return
|
|
|
+ if (packet is TouchPacket.TouchResponse) {
|
|
|
+ expectingTouchResponse?.complete()
|
|
|
+ }
|
|
|
|
|
|
if (!packet::class.annotations.filterIsInstance<NoLog>().any()) {
|
|
|
if ((packet as? BroadcastControllable)?.shouldBroadcast != false) {
|
|
|
@@ -236,12 +220,10 @@ internal class TIMBotNetworkHandler internal constructor(coroutineContext: Corou
|
|
|
is Subscribable -> if ((packet as? BroadcastControllable)?.shouldBroadcast != false) packet.broadcast()
|
|
|
}
|
|
|
|
|
|
- // Remove first to release the lock
|
|
|
- handlersLock.withLock {
|
|
|
- temporaryPacketHandlers.filter { it.filter(session, packet, sequenceId) }
|
|
|
- .also { temporaryPacketHandlers.removeAll(it) }
|
|
|
- }.forEach {
|
|
|
- it.doReceiveCatchingExceptions(packet)
|
|
|
+ temporaryPacketHandlers.forEach {
|
|
|
+ if (it.filter(packet, sequenceId) && temporaryPacketHandlers.remove(it)) {
|
|
|
+ it.doReceivePassingExceptionsToDeferred(packet)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
if (factory is SessionPacketFactory<*>) {
|
|
|
@@ -256,10 +238,6 @@ internal class TIMBotNetworkHandler internal constructor(coroutineContext: Corou
|
|
|
internal suspend fun sendPacket(packet: OutgoingPacket): Unit = withContext(coroutineContext + CoroutineName("sendPacket")) {
|
|
|
check(channel.isOpen) { "channel is not open" }
|
|
|
|
|
|
- if (BeforePacketSendEvent(bot, packet).broadcast().cancelled) {
|
|
|
- return@withContext
|
|
|
- }
|
|
|
-
|
|
|
packet.delegate.use { built ->
|
|
|
val buffer = IoBuffer.Pool.borrow()
|
|
|
try {
|
|
|
@@ -291,8 +269,6 @@ internal class TIMBotNetworkHandler internal constructor(coroutineContext: Corou
|
|
|
bot.logger.verbose("Packet sent: ${it.name}")
|
|
|
}
|
|
|
|
|
|
- PacketSentEvent(bot, packet).broadcast()
|
|
|
-
|
|
|
Unit
|
|
|
}
|
|
|
|
|
|
@@ -466,8 +442,8 @@ internal class TIMBotNetworkHandler internal constructor(coroutineContext: Corou
|
|
|
}
|
|
|
|
|
|
is RequestSessionPacket.SessionKeyResponse -> {
|
|
|
- sessionKey = packet.sessionKey
|
|
|
- bot.logger.info("sessionKey = ${sessionKey.value.toUHexString()}")
|
|
|
+ _sessionKey = packet.sessionKey
|
|
|
+ bot.logger.info("sessionKey = ${packet.sessionKey.value.toUHexString()}")
|
|
|
|
|
|
setOnlineStatus(OnlineStatus.ONLINE)//required
|
|
|
}
|
|
|
@@ -475,14 +451,11 @@ internal class TIMBotNetworkHandler internal constructor(coroutineContext: Corou
|
|
|
is ChangeOnlineStatusPacket.ChangeOnlineStatusResponse -> {
|
|
|
BotLoginSucceedEvent(bot).broadcast()
|
|
|
|
|
|
-
|
|
|
- session = BotSession(sessionKey)
|
|
|
-
|
|
|
val configuration = currentBotConfiguration()
|
|
|
heartbeatJob = [email protected] {
|
|
|
while (socket.isOpen) {
|
|
|
delay(configuration.heartbeatPeriodMillis)
|
|
|
- with(session) {
|
|
|
+ with(bot) {
|
|
|
class HeartbeatTimeoutException : CancellationException("heartbeat timeout")
|
|
|
|
|
|
if (withTimeoutOrNull(configuration.heartbeatTimeoutMillis) {
|
|
|
@@ -500,7 +473,7 @@ internal class TIMBotNetworkHandler internal constructor(coroutineContext: Corou
|
|
|
}
|
|
|
|
|
|
bot.logger.info("Successfully logged in")
|
|
|
- loginResult.complete(LoginResult.SUCCESS)
|
|
|
+ loginResult.complete(null)
|
|
|
this.close()//The LoginHandler is useless since then
|
|
|
}
|
|
|
}
|