Multi packet mattery registry sync

This commit is contained in:
DBotThePony 2023-03-03 22:02:34 +07:00
parent 04a5e87fec
commit 3dcaed64cd
Signed by: DBot
GPG Key ID: DCC23B5715498507

View File

@ -1375,7 +1375,7 @@ object MatterManager {
matterValues[item] = value
}
RegistryNetworkChannel.send(PacketDistributor.ALL.noArg(), SyncPacket(matterValues, commentary))
syncRegistry(PacketDistributor.ALL.noArg())
}
fun onDataPackSync(event: OnDatapackSyncEvent) {
@ -1383,9 +1383,9 @@ object MatterManager {
return
if (event.player == null) {
RegistryNetworkChannel.send(PacketDistributor.ALL.noArg(), SyncPacket(matterValues, commentary))
syncRegistry(PacketDistributor.ALL.noArg())
} else {
RegistryNetworkChannel.send(PacketDistributor.PLAYER.with { event.player ?: throw ConcurrentModificationException() }, SyncPacket(matterValues, commentary))
syncRegistry(PacketDistributor.PLAYER.with { event.player ?: throw ConcurrentModificationException() })
}
}
@ -1399,18 +1399,100 @@ object MatterManager {
return matterValues[value] ?: IMatterValue.Companion
}
fun readSyncPacket(buff: FriendlyByteBuf): SyncPacket {
LOGGER.info("Received matter registry packet, ${buff.readableBytes()} bytes in size")
private val receivedPackets = ArrayList<SyncPacket>()
private fun syncRegistry(distributor: PacketDistributor.PacketTarget) {
val time = SystemTime()
val stream = FastByteArrayOutputStream()
val data = DataOutputStream(stream)
val bbytes = ByteArray(buff.readableBytes())
buff.readBytes(bbytes)
var commentsSize = commentary.size
data.writeInt(matterValues.size)
for ((k, v) in matterValues) {
data.writeInt(ForgeRegistries.ITEMS.getID(k))
data.writeMatterValue(v)
val comment = commentary[k]
if (comment != null) {
commentsSize--
data.write(1)
data.writeCollection(comment, OutputStream::writeBinaryComponent)
} else {
data.write(0)
}
}
data.writeInt(commentsSize)
for ((k, v) in commentary) {
if (!matterValues.containsKey(k)) {
data.writeInt(ForgeRegistries.ITEMS.getID(k))
data.writeCollection(v, OutputStream::writeBinaryComponent)
}
}
val deflater = Deflater(5)
deflater.setInput(stream.array, 0, stream.length)
deflater.finish()
val chunks = ArrayList<SyncPacket>()
var totalSize = 0
var first = true
while (true) {
val bytes = ByteArray(2 shl 20 - 1024)
val written = deflater.deflate(bytes)
if (written == 0) {
break
} else {
totalSize += written
chunks.add(SyncPacket(bytes, written, if (first) { first = false; FIRST } else NORMAL))
}
}
if (chunks.size == 1) {
chunks[0].mode = FIRST_AND_LAST
} else if(chunks.size > 1) {
chunks.last().mode = LAST
}
LOGGER.debug("Encoding matter registry packet took ${time.millis}ms, (${stream.length} bytes total, $totalSize bytes compressed)")
for (chunk in chunks) {
RegistryNetworkChannel.send(distributor, chunk)
}
}
private fun playRegistryPackets() {
val time = SystemTime()
var totalCompressedSize = 0
for (chunk in receivedPackets) {
totalCompressedSize += chunk.length
}
if (totalCompressedSize == 0) {
return // what.
}
val compressed = ByteArray(totalCompressedSize)
var pointer = 0
for (chunk in receivedPackets) {
for (i in 0 until chunk.length) {
compressed[pointer++] = chunk.payload[i]
}
}
receivedPackets.clear()
val chunks = ArrayList<ByteArray>()
var size = 0
val inflater = Inflater()
inflater.setInput(bbytes)
inflater.setInput(compressed)
while (!inflater.finished()) {
val chunk = ByteArray(2 shl 16)
@ -1434,11 +1516,11 @@ object MatterManager {
}
val spliced = ByteArray(size)
var pointer = 0
var pointer2 = 0
for (chunk in chunks) {
for (i in chunk.indices) {
spliced[pointer++] = chunk[i]
spliced[pointer2++] = chunk[i]
}
}
@ -1446,15 +1528,16 @@ object MatterManager {
val data = DataInputStream(stream)
val valuesSize = data.readInt()
val values = Reference2ObjectOpenHashMap<Item, IMatterValue>(valuesSize)
val comments = Reference2ObjectOpenHashMap<Item, ArrayList<Component>>()
matterValues.clear()
commentary.clear()
for (i in 0 until valuesSize) {
val type = data.readItemType()
check(values.put(type, data.readMatterValue()) == null) { "Duplicate item type $type" }
check(matterValues.put(type, data.readMatterValue()) == null) { "Duplicate item type $type" }
if (data.read() > 0) {
comments[type] = data.readCollection { readBinaryComponent()!! }
commentary[type] = data.readCollection { readBinaryComponent()!! }
}
}
@ -1462,88 +1545,49 @@ object MatterManager {
for (i in 0 until commentsSize) {
val type = data.readItemType()
check(comments.put(type, data.readCollection { readBinaryComponent()!! }) == null) { "Duplicate commentary item type $type" }
check(commentary.put(type, data.readCollection { readBinaryComponent()!! }) == null) { "Duplicate commentary item type $type" }
}
val result = SyncPacket(values, comments)
LOGGER.debug("Reading matter registry packet took ${time.millis}ms (${bbytes.size} bytes compressed, $size bytes total)")
return result
LOGGER.debug("Decoding matter registry packets took ${time.millis}ms ($totalCompressedSize bytes compressed, $size bytes total)")
}
class SyncPacket(
val values: Map<Item, IMatterValue>,
val comments: Map<Item, Collection<Component>>
) : MatteryPacket {
fun readSyncPacket(buff: FriendlyByteBuf): SyncPacket {
LOGGER.info("Received matter registry packet, ${buff.readableBytes()} bytes in size")
val mode = buff.readByte()
val bytes = ByteArray(buff.readableBytes())
buff.readBytes(bytes)
return SyncPacket(bytes, bytes.size, mode.toInt())
}
private const val FIRST = 0
private const val NORMAL = 1
private const val LAST = 2
private const val FIRST_AND_LAST = 3
class SyncPacket(val payload: ByteArray, val length: Int, var mode: Int) : MatteryPacket {
override fun write(buff: FriendlyByteBuf) {
val time = SystemTime()
val stream = FastByteArrayOutputStream()
val data = DataOutputStream(stream)
var commentsSize = comments.size
data.writeInt(values.size)
for ((k, v) in values) {
data.writeInt(ForgeRegistries.ITEMS.getID(k))
data.writeMatterValue(v)
val comment = comments[k]
if (comment != null) {
commentsSize--
data.write(1)
data.writeCollection(comment, OutputStream::writeBinaryComponent)
} else {
data.write(0)
}
}
data.writeInt(commentsSize)
for ((k, v) in comments) {
if (!values.containsKey(k)) {
data.writeInt(ForgeRegistries.ITEMS.getID(k))
data.writeCollection(v, OutputStream::writeBinaryComponent)
}
}
val deflater = Deflater(5)
deflater.setInput(stream.array, 0, stream.length)
deflater.finish()
val bytes = ByteArray(2 shl 16)
while (true) {
val written = deflater.deflate(bytes)
if (written == 0) {
break
} else {
buff.writeBytes(bytes, 0, written)
}
}
LOGGER.debug("Encoding matter registry packet took ${time.millis}ms, (${stream.length} bytes total, ${buff.writerIndex() - 1} bytes compressed)")
buff.writeByte(mode)
buff.writeBytes(payload, 0, length)
}
override fun play(context: Supplier<NetworkEvent.Context>) {
if (SERVER_IS_LIVE)
return // singleplayer or LAN host
val time = SystemTime()
matterValues.clear()
matterValues.putAll(values)
commentary.clear()
for ((k, v) in comments) {
commentary[k] = ArrayList(v)
if (mode == FIRST) {
receivedPackets.clear()
receivedPackets.add(this)
} else if (mode == LAST) {
receivedPackets.add(this)
playRegistryPackets()
} else if (mode == FIRST_AND_LAST) {
receivedPackets.clear()
receivedPackets.add(this)
playRegistryPackets()
} else {
receivedPackets.add(this)
}
LOGGER.debug("Updating matter registry from packet took ${time.millis}ms")
}
}
}