|
|
@@ -18,8 +18,6 @@ import io.netty.channel.socket.nio.NioSocketChannel
|
|
|
import io.netty.handler.codec.LengthFieldBasedFrameDecoder
|
|
|
import io.netty.handler.codec.MessageToByteEncoder
|
|
|
import kotlinx.coroutines.*
|
|
|
-import kotlinx.coroutines.channels.Channel
|
|
|
-import kotlinx.coroutines.channels.trySendBlocking
|
|
|
import net.mamoe.mirai.internal.network.components.*
|
|
|
import net.mamoe.mirai.internal.network.handler.NetworkHandler.State
|
|
|
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContext
|
|
|
@@ -165,31 +163,17 @@ internal open class NettyNetworkHandler(
|
|
|
|
|
|
protected inner class PacketDecodePipeline(parentContext: CoroutineContext) :
|
|
|
CoroutineScope by parentContext.childScope() {
|
|
|
- private val channel: Channel<RawIncomingPacket> = Channel(Channel.BUFFERED)
|
|
|
private val packetCodec: PacketCodec by lazy { context[PacketCodec] }
|
|
|
|
|
|
- init {
|
|
|
- coroutineContext.job.invokeOnCompletion {
|
|
|
- channel.close() // normally close
|
|
|
+ fun send(raw: RawIncomingPacket) {
|
|
|
+ launch {
|
|
|
+ packetLogger.debug { "Packet Handling Processor: receive packet ${raw.commandName}" }
|
|
|
+ val result = packetCodec.processBody(context.bot, raw)
|
|
|
+ if (result == null) {
|
|
|
+ collectUnknownPacket(raw)
|
|
|
+ } else collectReceived(result)
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- init {
|
|
|
- repeat(4) { processorId ->
|
|
|
- launch(CoroutineName("PacketDecodePipeline processor #$processorId")) {
|
|
|
- while (isActive) {
|
|
|
- val raw = channel.receiveCatching().getOrNull() ?: return@launch
|
|
|
- packetLogger.debug { "Packet Handling Processor #$processorId: receive packet ${raw.commandName}" }
|
|
|
- val result = packetCodec.processBody(context.bot, raw)
|
|
|
- if (result == null) {
|
|
|
- collectUnknownPacket(raw)
|
|
|
- } else collectReceived(result)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- fun send(raw: RawIncomingPacket) = channel.trySendBlocking(raw)
|
|
|
}
|
|
|
|
|
|
|