From 024d46638ed39dbb8a08a2a5cd1a5a45225d7e98 Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Fri, 3 Mar 2023 22:02:34 +0700 Subject: [PATCH] Multi packet mattery registry sync --- .../mc/otm/matter/MatterManager.kt | 212 +++++++++++------- 1 file changed, 128 insertions(+), 84 deletions(-) diff --git a/src/main/kotlin/ru/dbotthepony/mc/otm/matter/MatterManager.kt b/src/main/kotlin/ru/dbotthepony/mc/otm/matter/MatterManager.kt index b6a9c2a0b..c9839c011 100644 --- a/src/main/kotlin/ru/dbotthepony/mc/otm/matter/MatterManager.kt +++ b/src/main/kotlin/ru/dbotthepony/mc/otm/matter/MatterManager.kt @@ -1379,7 +1379,7 @@ object MatterManager { matterValues[item] = value } - RegistryNetworkChannel.send(PacketDistributor.ALL.noArg(), SyncPacket(matterValues, commentary)) + syncRegistry(PacketDistributor.ALL.noArg()) } fun onDataPackSync(event: OnDatapackSyncEvent) { @@ -1387,9 +1387,9 @@ object MatterManager { return if (event.player == null) { - RegistryNetworkChannel.send(PacketDistributor.ALL.noArg(), SyncPacket(matterValues, commentary)) + syncRegistry(PacketDistributor.ALL.noArg()) } else { - RegistryNetworkChannel.send(PacketDistributor.PLAYER.with { event.player ?: throw ConcurrentModificationException() }, SyncPacket(matterValues, commentary)) + syncRegistry(PacketDistributor.PLAYER.with { event.player ?: throw ConcurrentModificationException() }) } } @@ -1403,18 +1403,100 @@ object MatterManager { return matterValues[value] ?: IMatterValue.Companion } - fun readSyncPacket(buff: FriendlyByteBuf): SyncPacket { - LOGGER.info("Received matter registry packet, ${buff.readableBytes()} bytes in size") + private val receivedPackets = ArrayList() + private fun syncRegistry(distributor: PacketDistributor.PacketTarget) { val time = SystemTime() + val stream = FastByteArrayOutputStream() + val data = DataOutputStream(stream) - val bbytes = ByteArray(buff.readableBytes()) - buff.readBytes(bbytes) + var commentsSize = commentary.size + data.writeInt(matterValues.size) + + for ((k, v) in matterValues) { + data.writeInt(ForgeRegistries.ITEMS.getID(k)) + data.writeMatterValue(v) + + val comment = commentary[k] + + if (comment != null) { + commentsSize-- + data.write(1) + data.writeCollection(comment, OutputStream::writeBinaryComponent) + } else { + data.write(0) + } + } + + data.writeInt(commentsSize) + + for ((k, v) in commentary) { + if (!matterValues.containsKey(k)) { + data.writeInt(ForgeRegistries.ITEMS.getID(k)) + data.writeCollection(v, OutputStream::writeBinaryComponent) + } + } + + val deflater = Deflater(5) + deflater.setInput(stream.array, 0, stream.length) + deflater.finish() + + val chunks = ArrayList() + var totalSize = 0 + var first = true + + while (true) { + val bytes = ByteArray(2 shl 20 - 1024) + val written = deflater.deflate(bytes) + + if (written == 0) { + break + } else { + totalSize += written + chunks.add(SyncPacket(bytes, written, if (first) { first = false; FIRST } else NORMAL)) + } + } + + if (chunks.size == 1) { + chunks[0].mode = FIRST_AND_LAST + } else if(chunks.size > 1) { + chunks.last().mode = LAST + } + + LOGGER.debug("Encoding matter registry packet took ${time.millis}ms, (${stream.length} bytes total, $totalSize bytes compressed)") + + for (chunk in chunks) { + RegistryNetworkChannel.send(distributor, chunk) + } + } + + private fun playRegistryPackets() { + val time = SystemTime() + var totalCompressedSize = 0 + + for (chunk in receivedPackets) { + totalCompressedSize += chunk.length + } + + if (totalCompressedSize == 0) { + return // what. + } + + val compressed = ByteArray(totalCompressedSize) + var pointer = 0 + + for (chunk in receivedPackets) { + for (i in 0 until chunk.length) { + compressed[pointer++] = chunk.payload[i] + } + } + + receivedPackets.clear() val chunks = ArrayList() var size = 0 val inflater = Inflater() - inflater.setInput(bbytes) + inflater.setInput(compressed) while (!inflater.finished()) { val chunk = ByteArray(2 shl 16) @@ -1438,11 +1520,11 @@ object MatterManager { } val spliced = ByteArray(size) - var pointer = 0 + var pointer2 = 0 for (chunk in chunks) { for (i in chunk.indices) { - spliced[pointer++] = chunk[i] + spliced[pointer2++] = chunk[i] } } @@ -1450,15 +1532,16 @@ object MatterManager { val data = DataInputStream(stream) val valuesSize = data.readInt() - val values = Reference2ObjectOpenHashMap(valuesSize) - val comments = Reference2ObjectOpenHashMap>() + + matterValues.clear() + commentary.clear() for (i in 0 until valuesSize) { val type = data.readItemType() - check(values.put(type, data.readMatterValue()) == null) { "Duplicate item type $type" } + check(matterValues.put(type, data.readMatterValue()) == null) { "Duplicate item type $type" } if (data.read() > 0) { - comments[type] = data.readCollection { readBinaryComponent()!! } + commentary[type] = data.readCollection { readBinaryComponent()!! } } } @@ -1466,88 +1549,49 @@ object MatterManager { for (i in 0 until commentsSize) { val type = data.readItemType() - check(comments.put(type, data.readCollection { readBinaryComponent()!! }) == null) { "Duplicate commentary item type $type" } + check(commentary.put(type, data.readCollection { readBinaryComponent()!! }) == null) { "Duplicate commentary item type $type" } } - val result = SyncPacket(values, comments) - - LOGGER.debug("Reading matter registry packet took ${time.millis}ms (${bbytes.size} bytes compressed, $size bytes total)") - - return result + LOGGER.debug("Decoding matter registry packets took ${time.millis}ms ($totalCompressedSize bytes compressed, $size bytes total)") } - class SyncPacket( - val values: Map, - val comments: Map> - ) : MatteryPacket { + fun readSyncPacket(buff: FriendlyByteBuf): SyncPacket { + LOGGER.info("Received matter registry packet, ${buff.readableBytes()} bytes in size") + + val mode = buff.readByte() + val bytes = ByteArray(buff.readableBytes()) + buff.readBytes(bytes) + return SyncPacket(bytes, bytes.size, mode.toInt()) + } + + private const val FIRST = 0 + private const val NORMAL = 1 + private const val LAST = 2 + private const val FIRST_AND_LAST = 3 + + class SyncPacket(val payload: ByteArray, val length: Int, var mode: Int) : MatteryPacket { override fun write(buff: FriendlyByteBuf) { - val time = SystemTime() - - val stream = FastByteArrayOutputStream() - val data = DataOutputStream(stream) - - var commentsSize = comments.size - data.writeInt(values.size) - - for ((k, v) in values) { - data.writeInt(ForgeRegistries.ITEMS.getID(k)) - data.writeMatterValue(v) - - val comment = comments[k] - - if (comment != null) { - commentsSize-- - data.write(1) - data.writeCollection(comment, OutputStream::writeBinaryComponent) - } else { - data.write(0) - } - } - - data.writeInt(commentsSize) - - for ((k, v) in comments) { - if (!values.containsKey(k)) { - data.writeInt(ForgeRegistries.ITEMS.getID(k)) - data.writeCollection(v, OutputStream::writeBinaryComponent) - } - } - - val deflater = Deflater(5) - deflater.setInput(stream.array, 0, stream.length) - deflater.finish() - - val bytes = ByteArray(2 shl 16) - - while (true) { - val written = deflater.deflate(bytes) - - if (written == 0) { - break - } else { - buff.writeBytes(bytes, 0, written) - } - } - - LOGGER.debug("Encoding matter registry packet took ${time.millis}ms, (${stream.length} bytes total, ${buff.writerIndex() - 1} bytes compressed)") + buff.writeByte(mode) + buff.writeBytes(payload, 0, length) } override fun play(context: Supplier) { if (SERVER_IS_LIVE) return // singleplayer or LAN host - val time = SystemTime() - - matterValues.clear() - matterValues.putAll(values) - - commentary.clear() - - for ((k, v) in comments) { - commentary[k] = ArrayList(v) + if (mode == FIRST) { + receivedPackets.clear() + receivedPackets.add(this) + } else if (mode == LAST) { + receivedPackets.add(this) + playRegistryPackets() + } else if (mode == FIRST_AND_LAST) { + receivedPackets.clear() + receivedPackets.add(this) + playRegistryPackets() + } else { + receivedPackets.add(this) } - - LOGGER.debug("Updating matter registry from packet took ${time.millis}ms") } } }