Quellcode durchsuchen

ImageUpload: enhance performance using `ByteArrayPool`, send chunked packets separately

Him188 vor 6 Jahren
Ursprung
Commit
ccf5df944e

+ 0 - 119
mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/Codec.kt

@@ -1,119 +0,0 @@
-/*
- * Copyright 2020 Mamoe Technologies and contributors.
- *
- * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
- * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
- *
- * https://github.com/mamoe/mirai/blob/master/LICENSE
- */
-
-package net.mamoe.mirai.qqandroid.network.highway
-
-import io.ktor.utils.io.ByteReadChannel
-import kotlinx.io.InputStream
-import kotlinx.io.core.*
-import kotlinx.io.pool.useInstance
-import net.mamoe.mirai.qqandroid.io.serialization.toByteArray
-import net.mamoe.mirai.qqandroid.network.protocol.data.proto.CSDataHighwayHead
-import net.mamoe.mirai.qqandroid.network.protocol.packet.EMPTY_BYTE_ARRAY
-import net.mamoe.mirai.utils.io.ByteArrayPool
-
-object Highway {
-    suspend fun RequestDataTrans(
-        uin: Long,
-        command: String,
-        sequenceId: Int,
-        appId: Int = 537062845,
-        dataFlag: Int = 4096,
-        commandId: Int,
-        localId: Int = 2052,
-        uKey: ByteArray,
-
-        data: Any,
-        dataSize: Int,
-        md5: ByteArray
-    ): ByteReadPacket {
-        require(data is Input || data is InputStream || data is ByteReadChannel) { "unsupported data: ${data::class.simpleName}" }
-        require(uKey.size == 128) { "bad uKey. Required size=128, got ${uKey.size}" }
-        require(data !is ByteReadPacket || data.remaining.toInt() == dataSize) { "bad input. given dataSize=$dataSize, but actual readRemaining=${(data as ByteReadPacket).remaining}" }
-        require(data !is IoBuffer || data.readRemaining == dataSize) { "bad input. given dataSize=$dataSize, but actual readRemaining=${(data as IoBuffer).readRemaining}" }
-
-        val dataHighwayHead = CSDataHighwayHead.DataHighwayHead(
-            version = 1,
-            uin = uin.toString(),
-            command = command,
-            seq = sequenceId,
-            retryTimes = 0,
-            appid = appId,
-            dataflag = dataFlag,
-            commandId = commandId,
-            localeId = localId
-        )
-        val segHead = CSDataHighwayHead.SegHead(
-            datalength = dataSize,
-            filesize = dataSize.toLong(),
-            serviceticket = uKey,
-            md5 = md5,
-            fileMd5 = md5,
-            flag = 0,
-            rtcode = 0
-        )
-        //println(data.readBytes().toUHexString())
-        return Codec.buildC2SData(dataHighwayHead, segHead, EMPTY_BYTE_ARRAY, null, data, dataSize)
-    }
-
-    private object Codec {
-        suspend fun buildC2SData(
-            dataHighwayHead: CSDataHighwayHead.DataHighwayHead,
-            segHead: CSDataHighwayHead.SegHead,
-            extendInfo: ByteArray,
-            loginSigHead: CSDataHighwayHead.LoginSigHead?,
-            body: Any,
-            bodySize: Int
-        ): ByteReadPacket {
-            require(body is Input || body is InputStream || body is ByteReadChannel) { "unsupported body: ${body::class.simpleName}" }
-            val head = CSDataHighwayHead.ReqDataHighwayHead(
-                msgBasehead = dataHighwayHead,
-                msgSeghead = segHead,
-                reqExtendinfo = extendInfo,
-                msgLoginSigHead = loginSigHead
-            ).toByteArray(CSDataHighwayHead.ReqDataHighwayHead.serializer())
-
-            return buildPacket {
-                writeByte(40)
-                writeInt(head.size)
-                writeInt(bodySize)
-                writeFully(head)
-                when (body) {
-                    is ByteReadPacket -> writePacket(body)
-                    is Input -> body.use {
-                        ByteArrayPool.useInstance { buffer ->
-                            var size: Int
-                            while (body.readAvailable(buffer).also { size = it } != 0) {
-                                [email protected](buffer, 0, size)
-                            }
-                        }
-                    }
-                    is ByteReadChannel -> ByteArrayPool.useInstance { buffer ->
-                        var size: Int
-                        while (body.readAvailable(buffer, 0, buffer.size).also { size = it } != 0) {
-                            [email protected](buffer, 0, size)
-                        }
-                    }
-                    is InputStream -> try {
-                        ByteArrayPool.useInstance { buffer ->
-                            var size: Int
-                            while (body.read(buffer).also { size = it } != 0) {
-                                [email protected](buffer, 0, size)
-                            }
-                        }
-                    } finally {
-                        body.close()
-                    }
-                }
-
-                writeByte(41)
-            }
-        }
-    }
-}

+ 26 - 26
mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/HighwayHelper.kt

@@ -18,8 +18,11 @@ import io.ktor.http.content.OutgoingContent
 import io.ktor.http.userAgent
 import io.ktor.utils.io.ByteReadChannel
 import io.ktor.utils.io.copyAndClose
+import kotlinx.coroutines.InternalCoroutinesApi
+import kotlinx.coroutines.flow.collect
 import kotlinx.io.InputStream
 import kotlinx.io.core.Input
+import kotlinx.io.core.discardExact
 import kotlinx.io.core.readAvailable
 import kotlinx.io.core.use
 import kotlinx.io.pool.useInstance
@@ -30,9 +33,8 @@ import net.mamoe.mirai.qqandroid.network.protocol.packet.withUse
 import net.mamoe.mirai.utils.MiraiInternalAPI
 import net.mamoe.mirai.utils.io.ByteArrayPool
 import net.mamoe.mirai.utils.io.PlatformSocket
-import net.mamoe.mirai.utils.io.discardExact
-
 
+@UseExperimental(MiraiInternalAPI::class)
 @Suppress("SpellCheckingInspection")
 internal suspend fun HttpClient.postImage(
     htcmd: String,
@@ -90,6 +92,7 @@ internal suspend fun HttpClient.postImage(
 
 @UseExperimental(MiraiInternalAPI::class)
 internal object HighwayHelper {
+    @UseExperimental(InternalCoroutinesApi::class)
     suspend fun uploadImage(
         client: QQAndroidClient,
         serverIp: String,
@@ -108,30 +111,27 @@ internal object HighwayHelper {
         val socket = PlatformSocket()
         socket.connect(serverIp, serverPort)
         socket.use {
-
-            // TODO: 2020/2/23 使用缓存, 或使用 HTTP 发送更好 (因为无需读取到内存)
-            socket.send(
-                Highway.RequestDataTrans(
-                    uin = client.uin,
-                    command = "PicUp.DataUp",
-                    sequenceId =
-                    if (commandId == 2) client.nextHighwayDataTransSequenceIdForGroup()
-                    else client.nextHighwayDataTransSequenceIdForFriend(),
-                    uKey = uKey,
-                    data = imageInput,
-                    dataSize = inputSize,
-                    md5 = md5,
-                    commandId = commandId
-                )
-            )
-
-            //0A 3C 08 01 12 0A 31 39 39 34 37 30 31 30 32 31 1A 0C 50 69 63 55 70 2E 44 61 74 61 55 70 20 E9 A7 05 28 00 30 BD DB 8B 80 02 38 80 20 40 02 4A 0A 38 2E 32 2E 30 2E 31 32 39 36 50 84 10 12 3D 08 00 10 FD 08 18 00 20 FD 08 28 C6 01 38 00 42 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 4A 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 50 89 92 A2 FB 06 58 00 60 00 18 53 20 01 28 00 30 04 3A 00 40 E6 B7 F7 D9 80 2E 48 00 50 00
-            socket.read().withUse {
-                discardExact(1)
-                val headLength = readInt()
-                discardExact(4)
-                val proto = readProtoBuf(CSDataHighwayHead.RspDataHighwayHead.serializer(), length = headLength)
-                check(proto.errorCode == 0) { "image upload failed: Transfer errno=${proto.errorCode}" }
+            createImageDataPacketSequence(
+                uin = client.uin,
+                command = "PicUp.DataUp",
+                sequenceId =
+                if (commandId == 2) client.nextHighwayDataTransSequenceIdForGroup()
+                else client.nextHighwayDataTransSequenceIdForFriend(),
+                commandId = commandId,
+                uKey = uKey,
+                data = imageInput,
+                dataSize = inputSize,
+                md5 = md5
+            ).collect {
+                socket.send(it)
+                //0A 3C 08 01 12 0A 31 39 39 34 37 30 31 30 32 31 1A 0C 50 69 63 55 70 2E 44 61 74 61 55 70 20 E9 A7 05 28 00 30 BD DB 8B 80 02 38 80 20 40 02 4A 0A 38 2E 32 2E 30 2E 31 32 39 36 50 84 10 12 3D 08 00 10 FD 08 18 00 20 FD 08 28 C6 01 38 00 42 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 4A 10 D4 1D 8C D9 8F 00 B2 04 E9 80 09 98 EC F8 42 7E 50 89 92 A2 FB 06 58 00 60 00 18 53 20 01 28 00 30 04 3A 00 40 E6 B7 F7 D9 80 2E 48 00 50 00
+                socket.read().withUse {
+                    discardExact(1)
+                    val headLength = readInt()
+                    discardExact(4)
+                    val proto = readProtoBuf(CSDataHighwayHead.RspDataHighwayHead.serializer(), length = headLength)
+                    check(proto.errorCode == 0) { "image upload failed: Transfer errno=${proto.errorCode}" }
+                }
             }
         }
     }

+ 89 - 0
mirai-core-qqandroid/src/commonMain/kotlin/net/mamoe/mirai/qqandroid/network/highway/highway.kt

@@ -0,0 +1,89 @@
+/*
+ * Copyright 2020 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/master/LICENSE
+ */
+
+@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
+
+package net.mamoe.mirai.qqandroid.network.highway
+
+import io.ktor.utils.io.ByteReadChannel
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.map
+import kotlinx.io.InputStream
+import kotlinx.io.core.*
+import net.mamoe.mirai.qqandroid.io.serialization.toByteArray
+import net.mamoe.mirai.qqandroid.network.protocol.data.proto.CSDataHighwayHead
+import net.mamoe.mirai.qqandroid.network.protocol.packet.EMPTY_BYTE_ARRAY
+import net.mamoe.mirai.utils.MiraiInternalAPI
+import net.mamoe.mirai.utils.io.*
+
+@UseExperimental(MiraiInternalAPI::class)
+internal fun createImageDataPacketSequence( // RequestDataTrans
+    uin: Long,
+    command: String,
+    sequenceId: Int,
+    appId: Int = 537062845,
+    dataFlag: Int = 4096,
+    commandId: Int,
+    localId: Int = 2052,
+    uKey: ByteArray,
+
+    data: Any,
+    dataSize: Int,
+    md5: ByteArray,
+    sizePerPacket: Int = 8192
+): Flow<ByteReadPacket> {
+    require(data is Input || data is InputStream || data is ByteReadChannel) { "unsupported data: ${data::class.simpleName}" }
+    require(uKey.size == 128) { "bad uKey. Required size=128, got ${uKey.size}" }
+    require(data !is ByteReadPacket || data.remaining.toInt() == dataSize) { "bad input. given dataSize=$dataSize, but actual readRemaining=${(data as ByteReadPacket).remaining}" }
+    require(data !is IoBuffer || data.readRemaining == dataSize) { "bad input. given dataSize=$dataSize, but actual readRemaining=${(data as IoBuffer).readRemaining}" }
+
+    val flow = when (data) {
+        is ByteReadPacket -> data.chunkedFlow(sizePerPacket)
+        is Input -> data.chunkedFlow(sizePerPacket)
+        is ByteReadChannel -> data.chunkedFlow(sizePerPacket)
+        is InputStream -> data.chunkedFlow(sizePerPacket)
+        else -> error("unreachable code")
+    }
+
+    return flow.map { chunkedInput ->
+        buildPacket {
+            val head = CSDataHighwayHead.ReqDataHighwayHead(
+                msgBasehead = CSDataHighwayHead.DataHighwayHead(
+                    version = 1,
+                    uin = uin.toString(),
+                    command = command,
+                    seq = sequenceId,
+                    retryTimes = 0,
+                    appid = appId,
+                    dataflag = dataFlag,
+                    commandId = commandId,
+                    localeId = localId
+                ),
+                msgSeghead = CSDataHighwayHead.SegHead(
+                    datalength = dataSize,
+                    filesize = dataSize.toLong(),
+                    serviceticket = uKey,
+                    md5 = md5,
+                    fileMd5 = md5,
+                    flag = 0,
+                    rtcode = 0
+                ),
+                reqExtendinfo = EMPTY_BYTE_ARRAY,
+                msgLoginSigHead = null
+            ).toByteArray(CSDataHighwayHead.ReqDataHighwayHead.serializer())
+
+            writeByte(40)
+            writeInt(head.size)
+            writeInt(dataSize)
+            writeFully(head)
+            writeFully(chunkedInput.buffer, 0, chunkedInput.bufferSize)
+            writeByte(41)
+        }
+    }
+}