From be6277bb33de8c63cc9a3aa9d0c1885fd5b970d5 Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Thu, 25 Apr 2024 21:44:26 +0700 Subject: [PATCH] Add fast path for hasChangedSince() --- .../network/syncher/NetworkedDynamicGroup.kt | 41 +++++++++-- .../network/syncher/NetworkedElement.kt | 6 ++ .../network/syncher/NetworkedGroup.kt | 73 ++++++++++--------- .../network/syncher/NetworkedList.kt | 22 ++++-- .../network/syncher/NetworkedMap.kt | 7 +- 5 files changed, 98 insertions(+), 51 deletions(-) diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedDynamicGroup.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedDynamicGroup.kt index d7feaf00..2a84923e 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedDynamicGroup.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedDynamicGroup.kt @@ -1,5 +1,6 @@ package ru.dbotthepony.kstarbound.network.syncher +import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap import it.unimi.dsi.fastutil.ints.IntAVLTreeSet import it.unimi.dsi.fastutil.io.FastByteArrayInputStream import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream @@ -7,6 +8,7 @@ import ru.dbotthepony.kommons.io.readByteArray import ru.dbotthepony.kommons.io.readVarInt import ru.dbotthepony.kommons.io.writeByteArray import ru.dbotthepony.kommons.io.writeVarInt +import ru.dbotthepony.kommons.util.Listenable import ru.dbotthepony.kstarbound.collect.IdMap import java.io.DataInputStream import java.io.DataOutputStream @@ -65,7 +67,11 @@ class NetworkedDynamicGroup(private val factory: () -> T, private val a constructor(id: Int) : this(Action.REMOVE, id, null, null) } - private fun setupElement(element: NetworkedElement) { + private var listenedVersion = 0L + private val nonListenableElements = ArrayList() + private val listenersOnElements = Int2ObjectAVLTreeMap() + + private fun setupElement(element: NetworkedElement, id: Int) { val versionCounter = versionCounter if (versionCounter != null) @@ -75,12 +81,20 @@ class NetworkedDynamicGroup(private val factory: () -> T, private val a element.enableInterpolation(extrapolation) else element.disableInterpolation() + + val listener = element.listen(Runnable { listenedVersion = currentVersion() }) + + if (listener == null) { + nonListenableElements.add(element) + } else { + listenersOnElements[id] = listener + } } fun add(element: T): Int { check(element !in elementsInternal.values) { "Already containing $element" } - setupElement(accessor(element)) val id = elementsInternal.add(element) + setupElement(accessor(element), id) backlog.add(currentVersion() to Entry(id, accessor(element))) purgeBacklog() return id @@ -97,13 +111,17 @@ class NetworkedDynamicGroup(private val factory: () -> T, private val a } override fun hasChangedSince(version: Long): Boolean { - return version == 0L || backlog.any { it.first >= version } || elementsInternal.values.any { accessor(it).hasChangedSince(version) } + return version == 0L || + listenedVersion >= version || + backlog.isNotEmpty() && backlog.last().first >= version || + nonListenableElements.any { it.hasChangedSince(version) } } override fun specifyVersioner(versionCounter: LongSupplier) { super.specifyVersioner(versionCounter) backlog.clear() backlog.add(currentVersion() to clearAction) + listenedVersion = 0L for ((id, element) in elementsInternal) { accessor(element).specifyVersioner(versionCounter) @@ -118,6 +136,10 @@ class NetworkedDynamicGroup(private val factory: () -> T, private val a backlog.clear() elementsInternal.clear() + nonListenableElements.clear() + listenersOnElements.values.forEach { it.remove() } + listenersOnElements.clear() + listenedVersion = 0L backlog.add(currentVersion() to clearAction) @@ -132,7 +154,7 @@ class NetworkedDynamicGroup(private val factory: () -> T, private val a } val element = factory() - setupElement(accessor(element)) + setupElement(accessor(element), index) elementsInternal[index] = element accessor(element).readInitial(stream, isLegacy) backlog.add(currentVersion() to Entry(index, accessor(element))) @@ -170,6 +192,8 @@ class NetworkedDynamicGroup(private val factory: () -> T, private val a when (Action.entries[data.readUnsignedByte()]) { Action.CLEAR -> { elementsInternal.clear() + listenersOnElements.values.forEach { it.remove() } + nonListenableElements.clear() backlog.add(currentVersion() to clearAction) } @@ -177,15 +201,20 @@ class NetworkedDynamicGroup(private val factory: () -> T, private val a // inconsistent usage of VarInt and int32_t when networking is driving me insane val id = if (isLegacy) data.readInt() else data.readVarInt() backlog.add(currentVersion() to Entry(id)) - elementsInternal.remove(id) + listenersOnElements.remove(id)?.remove() + val element = elementsInternal.remove(id) + + if (element != null) { + nonListenableElements.remove(accessor(element)) + } } Action.ADD -> { // inconsistent usage of VarInt and int32_t when networking is driving me insane val id = if (isLegacy) data.readInt() else data.readVarInt() val element = factory() - setupElement(accessor(element)) check(id !in elementsInternal) { "Already has networked element $id" } + setupElement(accessor(element), id) elementsInternal[id] = element if (isLegacy) { diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedElement.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedElement.kt index 0705bd09..88fbda9c 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedElement.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedElement.kt @@ -1,5 +1,6 @@ package ru.dbotthepony.kstarbound.network.syncher +import ru.dbotthepony.kommons.util.Listenable import java.io.DataInputStream import java.io.DataOutputStream import java.util.function.LongSupplier @@ -81,6 +82,11 @@ abstract class NetworkedElement { return this.version >= version } + @Deprecated("Internal API") + open fun listen(listener: Runnable): Listenable.L? { + return null + } + var versionCounter: LongSupplier? = null protected set diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedGroup.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedGroup.kt index 1c60a65f..c211b1d0 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedGroup.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedGroup.kt @@ -2,6 +2,7 @@ package ru.dbotthepony.kstarbound.network.syncher import ru.dbotthepony.kommons.io.readVarInt import ru.dbotthepony.kommons.io.writeVarInt +import ru.dbotthepony.kommons.util.Listenable import java.io.DataInputStream import java.io.DataOutputStream import java.util.function.Consumer @@ -14,34 +15,33 @@ class NetworkedGroup() : NetworkedElement() { } // element -> propagateInterpolation - private val elements = ArrayList>() + private data class Element(val element: NetworkedElement, val propagateInterpolation: Boolean, val listener: Listenable.L?) + private val elements = ArrayList() + private val nonListenableElements = ArrayList() + private var listenedVersion = 0L var isInterpolating = false private set var extrapolation = 0.0 private set - fun clear() { - elements.clear() - } - override fun toString(): String { if (elements.isEmpty()) return "NetworkedGroup[]" - return "NetworkedGroup[\n${elements.joinToString("\n") { "\t" + it.first.toString() }}]" + return "NetworkedGroup[\n${elements.joinToString("\n") { "\t" + it.element.toString() }}]" } override fun specifyVersioner(versionCounter: LongSupplier) { super.specifyVersioner(versionCounter) - elements.forEach { it.first.specifyVersioner(versionCounter) } + elements.forEach { it.element.specifyVersioner(versionCounter) } } override fun enableInterpolation(extrapolation: Double) { if (!isInterpolating || extrapolation != this.extrapolation) { isInterpolating = true this.extrapolation = extrapolation - elements.forEach { it.first.enableInterpolation(extrapolation) } + elements.forEach { it.element.enableInterpolation(extrapolation) } } } @@ -49,21 +49,15 @@ class NetworkedGroup() : NetworkedElement() { if (isInterpolating) { isInterpolating = false extrapolation = 0.0 - elements.forEach { it.first.disableInterpolation() } + elements.forEach { it.element.disableInterpolation() } } } override fun tickInterpolation(delta: Double) { - elements.forEach { it.first.tickInterpolation(delta) } + elements.forEach { it.element.tickInterpolation(delta) } } - private var trap: Consumer? = null - - fun putTrap(trap: Consumer) { - this.trap = trap - } - - private fun setupElement(element: NetworkedElement, propagateInterpolation: Boolean) { + private fun setupElement(element: NetworkedElement, propagateInterpolation: Boolean): Listenable.L? { if (propagateInterpolation) { if (isInterpolating) element.enableInterpolation(extrapolation) @@ -74,37 +68,44 @@ class NetworkedGroup() : NetworkedElement() { if (versionCounter != null) { element.specifyVersioner(versionCounter!!) } + + val listenable = element.listen(Runnable { listenedVersion = currentVersion() }) + + if (listenable == null) { + nonListenableElements.add(element) + } + + return listenable } fun add(element: E, propagateInterpolation: Boolean = true): E { - require(elements.none { it.first === element }) { "Already has element $element in $this" } - elements.add(element to propagateInterpolation) - setupElement(element, propagateInterpolation) + require(elements.none { it.element === element }) { "Already has element $element in $this" } + elements.add(Element(element, propagateInterpolation, setupElement(element, propagateInterpolation))) return element } fun replace(find: NetworkedElement, replace: NetworkedElement) { if (find === replace) return - val index = elements.indexOfFirst { it.first === find } + val index = elements.indexOfFirst { it.element === find } check(index != -1) { "Unable to find $find in $this" } - setupElement(replace, elements[index].second) - elements[index] = replace to elements[index].second + nonListenableElements.remove(elements[index].element) + elements[index].listener?.remove() + elements[index] = Element(replace, elements[index].propagateInterpolation, setupElement(replace, elements[index].propagateInterpolation)) } override fun readInitial(data: DataInputStream, isLegacy: Boolean) { - trap?.accept(data) - elements.forEach { it.first.readInitial(data, isLegacy) } + elements.forEach { it.element.readInitial(data, isLegacy) } } override fun writeInitial(data: DataOutputStream, isLegacy: Boolean) { - elements.forEach { it.first.writeInitial(data, isLegacy) } + elements.forEach { it.element.writeInitial(data, isLegacy) } } override fun readDelta(data: DataInputStream, interpolationDelay: Double, isLegacy: Boolean) { check(elements.isNotEmpty()) { "No networked elements in this group" } if (elements.size == 1) { - elements[0].first.readDelta(data, interpolationDelay, isLegacy) + elements[0].element.readDelta(data, interpolationDelay, isLegacy) } else { var nextIndex = data.readVarInt() @@ -118,9 +119,9 @@ class NetworkedGroup() : NetworkedElement() { for ((i, element) in elements.withIndex()) { if (nextIndex == 0 || i < nextIndex - 1) { - element.first.readBlankDelta(interpolationDelay) + element.element.readBlankDelta(interpolationDelay) } else if (i == nextIndex - 1) { - element.first.readDelta(data, interpolationDelay, isLegacy) + element.element.readDelta(data, interpolationDelay, isLegacy) nextIndex = data.readVarInt() } else { throw IllegalStateException("Networked element group indexes were written out of order") @@ -129,7 +130,7 @@ class NetworkedGroup() : NetworkedElement() { } else { while (nextIndex != 0) { val element = elements.getOrNull(nextIndex - 1) ?: throw NoSuchElementException("Unknown networked element with index ${nextIndex - 1}!") - element.first.readDelta(data, interpolationDelay, isLegacy) + element.element.readDelta(data, interpolationDelay, isLegacy) nextIndex = data.readVarInt() } } @@ -138,7 +139,7 @@ class NetworkedGroup() : NetworkedElement() { override fun readBlankDelta(interpolationDelay: Double) { if (isInterpolating) { - elements.forEach { it.first.readBlankDelta(interpolationDelay) } + elements.forEach { it.element.readBlankDelta(interpolationDelay) } } } @@ -151,16 +152,16 @@ class NetworkedGroup() : NetworkedElement() { if (elements.size == 1) { // one element - pass through - elements[0].first.writeDelta(data, remoteVersion, isLegacy) + elements[0].element.writeDelta(data, remoteVersion, isLegacy) } else { // otherwise, sequentially scan elements for updates // and if element needs updating, write element's index as variable length integer; // then write element itself for ((i, element) in elements.withIndex()) { - if (element.first.hasChangedSince(remoteVersion)) { + if (element.element.hasChangedSince(remoteVersion)) { data.writeVarInt(i + 1) - element.first.writeDelta(data, remoteVersion, isLegacy) + element.element.writeDelta(data, remoteVersion, isLegacy) } } @@ -169,10 +170,10 @@ class NetworkedGroup() : NetworkedElement() { } override fun hasChangedSince(version: Long): Boolean { - return elements.any { it.first.hasChangedSince(version) } + return listenedVersion >= version || nonListenableElements.any { it.hasChangedSince(version) } } override fun bumpVersion() { - elements.forEach { it.first.bumpVersion() } + elements.forEach { it.element.bumpVersion() } } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedList.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedList.kt index e847b166..764bd565 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedList.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedList.kt @@ -7,6 +7,7 @@ import ru.dbotthepony.kommons.io.readByteArray import ru.dbotthepony.kommons.io.readVarInt import ru.dbotthepony.kommons.io.writeVarInt import ru.dbotthepony.kommons.util.KOptional +import ru.dbotthepony.kommons.util.Listenable import ru.dbotthepony.kstarbound.collect.RandomListIterator import ru.dbotthepony.kstarbound.collect.RandomSubList import ru.dbotthepony.kstarbound.io.writeByteArray @@ -47,10 +48,15 @@ class NetworkedList( private val clearEntry = Entry(Type.CLEAR, 0, KOptional()) private var isInterpolating = false - private val listeners = CopyOnWriteArrayList() + private val listeners = Listenable.Void() fun addListener(listener: Runnable) { - listeners.add(listener) + listeners.addListener(listener) + } + + @Deprecated("Internal API") + override fun listen(listener: Runnable): Listenable.L { + return listeners.addListener(listener) } /** @@ -107,7 +113,7 @@ class NetworkedList( } purgeBacklog() - listeners.forEach { it.run() } + listeners.run() } override fun writeInitial(data: DataOutputStream, isLegacy: Boolean) { @@ -157,7 +163,7 @@ class NetworkedList( } purgeBacklog() - listeners.forEach { it.run() } + listeners.run() } else { readInitial(data, false) } @@ -248,7 +254,7 @@ class NetworkedList( elements.add(index, element) backlog.add(currentVersion() to Entry(index, element)) purgeBacklog() - listeners.forEach { it.run() } + listeners.run() } override fun addAll(index: Int, elements: Collection): Boolean { @@ -265,7 +271,7 @@ class NetworkedList( backlog.clear() backlog.add(currentVersion() to clearEntry) elements.clear() - listeners.forEach { it.run() } + listeners.run() } override fun listIterator(): MutableListIterator { @@ -296,7 +302,7 @@ class NetworkedList( val element = elements.removeAt(index) backlog.add(currentVersion() to Entry(index)) purgeBacklog() - listeners.forEach { it.run() } + listeners.run() return element } @@ -318,7 +324,7 @@ class NetworkedList( val old = elements.set(index, element) backlog.add(currentVersion() to Entry(index, element)) purgeBacklog() - listeners.forEach { it.run() } + listeners.run() return old } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedMap.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedMap.kt index 066edf90..45e3ed90 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedMap.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/NetworkedMap.kt @@ -97,6 +97,11 @@ class NetworkedMap( return Listener(ListenableMap.RunnableAdapter(listener)) } + @Deprecated("Internal API") + override fun listen(listener: Runnable): Listenable.L { + return Listener(ListenableMap.RunnableAdapter(listener)) + } + private val dumbCodec by lazy { StreamCodec.Map(keyCodec.second, valueCodec.second, ::HashMap) } @@ -316,7 +321,7 @@ class NetworkedMap( } override fun hasChangedSince(version: Long): Boolean { - return backlog.any { it.first >= version } + return backlog.isNotEmpty() && backlog.last().first >= version } override fun readBlankDelta(interpolationDelay: Double) {}