Multi packet mattery registry sync

This commit is contained in:
DBotThePony 2023-03-03 22:02:34 +07:00
parent f004c62990
commit 024d46638e
Signed by: DBot
GPG Key ID: DCC23B5715498507

View File

@ -1379,7 +1379,7 @@ object MatterManager {
matterValues[item] = value matterValues[item] = value
} }
RegistryNetworkChannel.send(PacketDistributor.ALL.noArg(), SyncPacket(matterValues, commentary)) syncRegistry(PacketDistributor.ALL.noArg())
} }
fun onDataPackSync(event: OnDatapackSyncEvent) { fun onDataPackSync(event: OnDatapackSyncEvent) {
@ -1387,9 +1387,9 @@ object MatterManager {
return return
if (event.player == null) { if (event.player == null) {
RegistryNetworkChannel.send(PacketDistributor.ALL.noArg(), SyncPacket(matterValues, commentary)) syncRegistry(PacketDistributor.ALL.noArg())
} else { } else {
RegistryNetworkChannel.send(PacketDistributor.PLAYER.with { event.player ?: throw ConcurrentModificationException() }, SyncPacket(matterValues, commentary)) syncRegistry(PacketDistributor.PLAYER.with { event.player ?: throw ConcurrentModificationException() })
} }
} }
@ -1403,18 +1403,100 @@ object MatterManager {
return matterValues[value] ?: IMatterValue.Companion return matterValues[value] ?: IMatterValue.Companion
} }
fun readSyncPacket(buff: FriendlyByteBuf): SyncPacket { private val receivedPackets = ArrayList<SyncPacket>()
LOGGER.info("Received matter registry packet, ${buff.readableBytes()} bytes in size")
private fun syncRegistry(distributor: PacketDistributor.PacketTarget) {
val time = SystemTime() val time = SystemTime()
val stream = FastByteArrayOutputStream()
val data = DataOutputStream(stream)
val bbytes = ByteArray(buff.readableBytes()) var commentsSize = commentary.size
buff.readBytes(bbytes) data.writeInt(matterValues.size)
for ((k, v) in matterValues) {
data.writeInt(ForgeRegistries.ITEMS.getID(k))
data.writeMatterValue(v)
val comment = commentary[k]
if (comment != null) {
commentsSize--
data.write(1)
data.writeCollection(comment, OutputStream::writeBinaryComponent)
} else {
data.write(0)
}
}
data.writeInt(commentsSize)
for ((k, v) in commentary) {
if (!matterValues.containsKey(k)) {
data.writeInt(ForgeRegistries.ITEMS.getID(k))
data.writeCollection(v, OutputStream::writeBinaryComponent)
}
}
val deflater = Deflater(5)
deflater.setInput(stream.array, 0, stream.length)
deflater.finish()
val chunks = ArrayList<SyncPacket>()
var totalSize = 0
var first = true
while (true) {
val bytes = ByteArray(2 shl 20 - 1024)
val written = deflater.deflate(bytes)
if (written == 0) {
break
} else {
totalSize += written
chunks.add(SyncPacket(bytes, written, if (first) { first = false; FIRST } else NORMAL))
}
}
if (chunks.size == 1) {
chunks[0].mode = FIRST_AND_LAST
} else if(chunks.size > 1) {
chunks.last().mode = LAST
}
LOGGER.debug("Encoding matter registry packet took ${time.millis}ms, (${stream.length} bytes total, $totalSize bytes compressed)")
for (chunk in chunks) {
RegistryNetworkChannel.send(distributor, chunk)
}
}
private fun playRegistryPackets() {
val time = SystemTime()
var totalCompressedSize = 0
for (chunk in receivedPackets) {
totalCompressedSize += chunk.length
}
if (totalCompressedSize == 0) {
return // what.
}
val compressed = ByteArray(totalCompressedSize)
var pointer = 0
for (chunk in receivedPackets) {
for (i in 0 until chunk.length) {
compressed[pointer++] = chunk.payload[i]
}
}
receivedPackets.clear()
val chunks = ArrayList<ByteArray>() val chunks = ArrayList<ByteArray>()
var size = 0 var size = 0
val inflater = Inflater() val inflater = Inflater()
inflater.setInput(bbytes) inflater.setInput(compressed)
while (!inflater.finished()) { while (!inflater.finished()) {
val chunk = ByteArray(2 shl 16) val chunk = ByteArray(2 shl 16)
@ -1438,11 +1520,11 @@ object MatterManager {
} }
val spliced = ByteArray(size) val spliced = ByteArray(size)
var pointer = 0 var pointer2 = 0
for (chunk in chunks) { for (chunk in chunks) {
for (i in chunk.indices) { for (i in chunk.indices) {
spliced[pointer++] = chunk[i] spliced[pointer2++] = chunk[i]
} }
} }
@ -1450,15 +1532,16 @@ object MatterManager {
val data = DataInputStream(stream) val data = DataInputStream(stream)
val valuesSize = data.readInt() val valuesSize = data.readInt()
val values = Reference2ObjectOpenHashMap<Item, IMatterValue>(valuesSize)
val comments = Reference2ObjectOpenHashMap<Item, ArrayList<Component>>() matterValues.clear()
commentary.clear()
for (i in 0 until valuesSize) { for (i in 0 until valuesSize) {
val type = data.readItemType() val type = data.readItemType()
check(values.put(type, data.readMatterValue()) == null) { "Duplicate item type $type" } check(matterValues.put(type, data.readMatterValue()) == null) { "Duplicate item type $type" }
if (data.read() > 0) { if (data.read() > 0) {
comments[type] = data.readCollection { readBinaryComponent()!! } commentary[type] = data.readCollection { readBinaryComponent()!! }
} }
} }
@ -1466,88 +1549,49 @@ object MatterManager {
for (i in 0 until commentsSize) { for (i in 0 until commentsSize) {
val type = data.readItemType() val type = data.readItemType()
check(comments.put(type, data.readCollection { readBinaryComponent()!! }) == null) { "Duplicate commentary item type $type" } check(commentary.put(type, data.readCollection { readBinaryComponent()!! }) == null) { "Duplicate commentary item type $type" }
} }
val result = SyncPacket(values, comments) LOGGER.debug("Decoding matter registry packets took ${time.millis}ms ($totalCompressedSize bytes compressed, $size bytes total)")
LOGGER.debug("Reading matter registry packet took ${time.millis}ms (${bbytes.size} bytes compressed, $size bytes total)")
return result
} }
class SyncPacket( fun readSyncPacket(buff: FriendlyByteBuf): SyncPacket {
val values: Map<Item, IMatterValue>, LOGGER.info("Received matter registry packet, ${buff.readableBytes()} bytes in size")
val comments: Map<Item, Collection<Component>>
) : MatteryPacket { val mode = buff.readByte()
val bytes = ByteArray(buff.readableBytes())
buff.readBytes(bytes)
return SyncPacket(bytes, bytes.size, mode.toInt())
}
private const val FIRST = 0
private const val NORMAL = 1
private const val LAST = 2
private const val FIRST_AND_LAST = 3
class SyncPacket(val payload: ByteArray, val length: Int, var mode: Int) : MatteryPacket {
override fun write(buff: FriendlyByteBuf) { override fun write(buff: FriendlyByteBuf) {
val time = SystemTime() buff.writeByte(mode)
buff.writeBytes(payload, 0, length)
val stream = FastByteArrayOutputStream()
val data = DataOutputStream(stream)
var commentsSize = comments.size
data.writeInt(values.size)
for ((k, v) in values) {
data.writeInt(ForgeRegistries.ITEMS.getID(k))
data.writeMatterValue(v)
val comment = comments[k]
if (comment != null) {
commentsSize--
data.write(1)
data.writeCollection(comment, OutputStream::writeBinaryComponent)
} else {
data.write(0)
}
}
data.writeInt(commentsSize)
for ((k, v) in comments) {
if (!values.containsKey(k)) {
data.writeInt(ForgeRegistries.ITEMS.getID(k))
data.writeCollection(v, OutputStream::writeBinaryComponent)
}
}
val deflater = Deflater(5)
deflater.setInput(stream.array, 0, stream.length)
deflater.finish()
val bytes = ByteArray(2 shl 16)
while (true) {
val written = deflater.deflate(bytes)
if (written == 0) {
break
} else {
buff.writeBytes(bytes, 0, written)
}
}
LOGGER.debug("Encoding matter registry packet took ${time.millis}ms, (${stream.length} bytes total, ${buff.writerIndex() - 1} bytes compressed)")
} }
override fun play(context: Supplier<NetworkEvent.Context>) { override fun play(context: Supplier<NetworkEvent.Context>) {
if (SERVER_IS_LIVE) if (SERVER_IS_LIVE)
return // singleplayer or LAN host return // singleplayer or LAN host
val time = SystemTime() if (mode == FIRST) {
receivedPackets.clear()
matterValues.clear() receivedPackets.add(this)
matterValues.putAll(values) } else if (mode == LAST) {
receivedPackets.add(this)
commentary.clear() playRegistryPackets()
} else if (mode == FIRST_AND_LAST) {
for ((k, v) in comments) { receivedPackets.clear()
commentary[k] = ArrayList(v) receivedPackets.add(this)
} playRegistryPackets()
} else {
LOGGER.debug("Updating matter registry from packet took ${time.millis}ms") receivedPackets.add(this)
}
} }
} }
} }