Add fast path for hasChangedSince()

This commit is contained in:
DBotThePony 2024-04-25 21:44:26 +07:00
parent b6ff323e5a
commit be6277bb33
Signed by: DBot
GPG Key ID: DCC23B5715498507
5 changed files with 98 additions and 51 deletions

View File

@ -1,5 +1,6 @@
package ru.dbotthepony.kstarbound.network.syncher package ru.dbotthepony.kstarbound.network.syncher
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap
import it.unimi.dsi.fastutil.ints.IntAVLTreeSet import it.unimi.dsi.fastutil.ints.IntAVLTreeSet
import it.unimi.dsi.fastutil.io.FastByteArrayInputStream import it.unimi.dsi.fastutil.io.FastByteArrayInputStream
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
@ -7,6 +8,7 @@ import ru.dbotthepony.kommons.io.readByteArray
import ru.dbotthepony.kommons.io.readVarInt import ru.dbotthepony.kommons.io.readVarInt
import ru.dbotthepony.kommons.io.writeByteArray import ru.dbotthepony.kommons.io.writeByteArray
import ru.dbotthepony.kommons.io.writeVarInt import ru.dbotthepony.kommons.io.writeVarInt
import ru.dbotthepony.kommons.util.Listenable
import ru.dbotthepony.kstarbound.collect.IdMap import ru.dbotthepony.kstarbound.collect.IdMap
import java.io.DataInputStream import java.io.DataInputStream
import java.io.DataOutputStream import java.io.DataOutputStream
@ -65,7 +67,11 @@ class NetworkedDynamicGroup<T : Any>(private val factory: () -> T, private val a
constructor(id: Int) : this(Action.REMOVE, id, null, null) constructor(id: Int) : this(Action.REMOVE, id, null, null)
} }
private fun setupElement(element: NetworkedElement) { private var listenedVersion = 0L
private val nonListenableElements = ArrayList<NetworkedElement>()
private val listenersOnElements = Int2ObjectAVLTreeMap<Listenable.L>()
private fun setupElement(element: NetworkedElement, id: Int) {
val versionCounter = versionCounter val versionCounter = versionCounter
if (versionCounter != null) if (versionCounter != null)
@ -75,12 +81,20 @@ class NetworkedDynamicGroup<T : Any>(private val factory: () -> T, private val a
element.enableInterpolation(extrapolation) element.enableInterpolation(extrapolation)
else else
element.disableInterpolation() element.disableInterpolation()
val listener = element.listen(Runnable { listenedVersion = currentVersion() })
if (listener == null) {
nonListenableElements.add(element)
} else {
listenersOnElements[id] = listener
}
} }
fun add(element: T): Int { fun add(element: T): Int {
check(element !in elementsInternal.values) { "Already containing $element" } check(element !in elementsInternal.values) { "Already containing $element" }
setupElement(accessor(element))
val id = elementsInternal.add(element) val id = elementsInternal.add(element)
setupElement(accessor(element), id)
backlog.add(currentVersion() to Entry(id, accessor(element))) backlog.add(currentVersion() to Entry(id, accessor(element)))
purgeBacklog() purgeBacklog()
return id return id
@ -97,13 +111,17 @@ class NetworkedDynamicGroup<T : Any>(private val factory: () -> T, private val a
} }
override fun hasChangedSince(version: Long): Boolean { override fun hasChangedSince(version: Long): Boolean {
return version == 0L || backlog.any { it.first >= version } || elementsInternal.values.any { accessor(it).hasChangedSince(version) } return version == 0L ||
listenedVersion >= version ||
backlog.isNotEmpty() && backlog.last().first >= version ||
nonListenableElements.any { it.hasChangedSince(version) }
} }
override fun specifyVersioner(versionCounter: LongSupplier) { override fun specifyVersioner(versionCounter: LongSupplier) {
super.specifyVersioner(versionCounter) super.specifyVersioner(versionCounter)
backlog.clear() backlog.clear()
backlog.add(currentVersion() to clearAction) backlog.add(currentVersion() to clearAction)
listenedVersion = 0L
for ((id, element) in elementsInternal) { for ((id, element) in elementsInternal) {
accessor(element).specifyVersioner(versionCounter) accessor(element).specifyVersioner(versionCounter)
@ -118,6 +136,10 @@ class NetworkedDynamicGroup<T : Any>(private val factory: () -> T, private val a
backlog.clear() backlog.clear()
elementsInternal.clear() elementsInternal.clear()
nonListenableElements.clear()
listenersOnElements.values.forEach { it.remove() }
listenersOnElements.clear()
listenedVersion = 0L
backlog.add(currentVersion() to clearAction) backlog.add(currentVersion() to clearAction)
@ -132,7 +154,7 @@ class NetworkedDynamicGroup<T : Any>(private val factory: () -> T, private val a
} }
val element = factory() val element = factory()
setupElement(accessor(element)) setupElement(accessor(element), index)
elementsInternal[index] = element elementsInternal[index] = element
accessor(element).readInitial(stream, isLegacy) accessor(element).readInitial(stream, isLegacy)
backlog.add(currentVersion() to Entry(index, accessor(element))) backlog.add(currentVersion() to Entry(index, accessor(element)))
@ -170,6 +192,8 @@ class NetworkedDynamicGroup<T : Any>(private val factory: () -> T, private val a
when (Action.entries[data.readUnsignedByte()]) { when (Action.entries[data.readUnsignedByte()]) {
Action.CLEAR -> { Action.CLEAR -> {
elementsInternal.clear() elementsInternal.clear()
listenersOnElements.values.forEach { it.remove() }
nonListenableElements.clear()
backlog.add(currentVersion() to clearAction) backlog.add(currentVersion() to clearAction)
} }
@ -177,15 +201,20 @@ class NetworkedDynamicGroup<T : Any>(private val factory: () -> T, private val a
// inconsistent usage of VarInt and int32_t when networking is driving me insane // inconsistent usage of VarInt and int32_t when networking is driving me insane
val id = if (isLegacy) data.readInt() else data.readVarInt() val id = if (isLegacy) data.readInt() else data.readVarInt()
backlog.add(currentVersion() to Entry(id)) backlog.add(currentVersion() to Entry(id))
elementsInternal.remove(id) listenersOnElements.remove(id)?.remove()
val element = elementsInternal.remove(id)
if (element != null) {
nonListenableElements.remove(accessor(element))
}
} }
Action.ADD -> { Action.ADD -> {
// inconsistent usage of VarInt and int32_t when networking is driving me insane // inconsistent usage of VarInt and int32_t when networking is driving me insane
val id = if (isLegacy) data.readInt() else data.readVarInt() val id = if (isLegacy) data.readInt() else data.readVarInt()
val element = factory() val element = factory()
setupElement(accessor(element))
check(id !in elementsInternal) { "Already has networked element $id" } check(id !in elementsInternal) { "Already has networked element $id" }
setupElement(accessor(element), id)
elementsInternal[id] = element elementsInternal[id] = element
if (isLegacy) { if (isLegacy) {

View File

@ -1,5 +1,6 @@
package ru.dbotthepony.kstarbound.network.syncher package ru.dbotthepony.kstarbound.network.syncher
import ru.dbotthepony.kommons.util.Listenable
import java.io.DataInputStream import java.io.DataInputStream
import java.io.DataOutputStream import java.io.DataOutputStream
import java.util.function.LongSupplier import java.util.function.LongSupplier
@ -81,6 +82,11 @@ abstract class NetworkedElement {
return this.version >= version return this.version >= version
} }
@Deprecated("Internal API")
open fun listen(listener: Runnable): Listenable.L? {
return null
}
var versionCounter: LongSupplier? = null var versionCounter: LongSupplier? = null
protected set protected set

View File

@ -2,6 +2,7 @@ package ru.dbotthepony.kstarbound.network.syncher
import ru.dbotthepony.kommons.io.readVarInt import ru.dbotthepony.kommons.io.readVarInt
import ru.dbotthepony.kommons.io.writeVarInt import ru.dbotthepony.kommons.io.writeVarInt
import ru.dbotthepony.kommons.util.Listenable
import java.io.DataInputStream import java.io.DataInputStream
import java.io.DataOutputStream import java.io.DataOutputStream
import java.util.function.Consumer import java.util.function.Consumer
@ -14,34 +15,33 @@ class NetworkedGroup() : NetworkedElement() {
} }
// element -> propagateInterpolation // element -> propagateInterpolation
private val elements = ArrayList<Pair<NetworkedElement, Boolean>>() private data class Element(val element: NetworkedElement, val propagateInterpolation: Boolean, val listener: Listenable.L?)
private val elements = ArrayList<Element>()
private val nonListenableElements = ArrayList<NetworkedElement>()
private var listenedVersion = 0L
var isInterpolating = false var isInterpolating = false
private set private set
var extrapolation = 0.0 var extrapolation = 0.0
private set private set
fun clear() {
elements.clear()
}
override fun toString(): String { override fun toString(): String {
if (elements.isEmpty()) if (elements.isEmpty())
return "NetworkedGroup[]" return "NetworkedGroup[]"
return "NetworkedGroup[\n${elements.joinToString("\n") { "\t" + it.first.toString() }}]" return "NetworkedGroup[\n${elements.joinToString("\n") { "\t" + it.element.toString() }}]"
} }
override fun specifyVersioner(versionCounter: LongSupplier) { override fun specifyVersioner(versionCounter: LongSupplier) {
super.specifyVersioner(versionCounter) super.specifyVersioner(versionCounter)
elements.forEach { it.first.specifyVersioner(versionCounter) } elements.forEach { it.element.specifyVersioner(versionCounter) }
} }
override fun enableInterpolation(extrapolation: Double) { override fun enableInterpolation(extrapolation: Double) {
if (!isInterpolating || extrapolation != this.extrapolation) { if (!isInterpolating || extrapolation != this.extrapolation) {
isInterpolating = true isInterpolating = true
this.extrapolation = extrapolation this.extrapolation = extrapolation
elements.forEach { it.first.enableInterpolation(extrapolation) } elements.forEach { it.element.enableInterpolation(extrapolation) }
} }
} }
@ -49,21 +49,15 @@ class NetworkedGroup() : NetworkedElement() {
if (isInterpolating) { if (isInterpolating) {
isInterpolating = false isInterpolating = false
extrapolation = 0.0 extrapolation = 0.0
elements.forEach { it.first.disableInterpolation() } elements.forEach { it.element.disableInterpolation() }
} }
} }
override fun tickInterpolation(delta: Double) { override fun tickInterpolation(delta: Double) {
elements.forEach { it.first.tickInterpolation(delta) } elements.forEach { it.element.tickInterpolation(delta) }
} }
private var trap: Consumer<DataInputStream>? = null private fun setupElement(element: NetworkedElement, propagateInterpolation: Boolean): Listenable.L? {
fun putTrap(trap: Consumer<DataInputStream>) {
this.trap = trap
}
private fun setupElement(element: NetworkedElement, propagateInterpolation: Boolean) {
if (propagateInterpolation) { if (propagateInterpolation) {
if (isInterpolating) if (isInterpolating)
element.enableInterpolation(extrapolation) element.enableInterpolation(extrapolation)
@ -74,37 +68,44 @@ class NetworkedGroup() : NetworkedElement() {
if (versionCounter != null) { if (versionCounter != null) {
element.specifyVersioner(versionCounter!!) element.specifyVersioner(versionCounter!!)
} }
val listenable = element.listen(Runnable { listenedVersion = currentVersion() })
if (listenable == null) {
nonListenableElements.add(element)
}
return listenable
} }
fun <E : NetworkedElement> add(element: E, propagateInterpolation: Boolean = true): E { fun <E : NetworkedElement> add(element: E, propagateInterpolation: Boolean = true): E {
require(elements.none { it.first === element }) { "Already has element $element in $this" } require(elements.none { it.element === element }) { "Already has element $element in $this" }
elements.add(element to propagateInterpolation) elements.add(Element(element, propagateInterpolation, setupElement(element, propagateInterpolation)))
setupElement(element, propagateInterpolation)
return element return element
} }
fun replace(find: NetworkedElement, replace: NetworkedElement) { fun replace(find: NetworkedElement, replace: NetworkedElement) {
if (find === replace) return if (find === replace) return
val index = elements.indexOfFirst { it.first === find } val index = elements.indexOfFirst { it.element === find }
check(index != -1) { "Unable to find $find in $this" } check(index != -1) { "Unable to find $find in $this" }
setupElement(replace, elements[index].second) nonListenableElements.remove(elements[index].element)
elements[index] = replace to elements[index].second elements[index].listener?.remove()
elements[index] = Element(replace, elements[index].propagateInterpolation, setupElement(replace, elements[index].propagateInterpolation))
} }
override fun readInitial(data: DataInputStream, isLegacy: Boolean) { override fun readInitial(data: DataInputStream, isLegacy: Boolean) {
trap?.accept(data) elements.forEach { it.element.readInitial(data, isLegacy) }
elements.forEach { it.first.readInitial(data, isLegacy) }
} }
override fun writeInitial(data: DataOutputStream, isLegacy: Boolean) { override fun writeInitial(data: DataOutputStream, isLegacy: Boolean) {
elements.forEach { it.first.writeInitial(data, isLegacy) } elements.forEach { it.element.writeInitial(data, isLegacy) }
} }
override fun readDelta(data: DataInputStream, interpolationDelay: Double, isLegacy: Boolean) { override fun readDelta(data: DataInputStream, interpolationDelay: Double, isLegacy: Boolean) {
check(elements.isNotEmpty()) { "No networked elements in this group" } check(elements.isNotEmpty()) { "No networked elements in this group" }
if (elements.size == 1) { if (elements.size == 1) {
elements[0].first.readDelta(data, interpolationDelay, isLegacy) elements[0].element.readDelta(data, interpolationDelay, isLegacy)
} else { } else {
var nextIndex = data.readVarInt() var nextIndex = data.readVarInt()
@ -118,9 +119,9 @@ class NetworkedGroup() : NetworkedElement() {
for ((i, element) in elements.withIndex()) { for ((i, element) in elements.withIndex()) {
if (nextIndex == 0 || i < nextIndex - 1) { if (nextIndex == 0 || i < nextIndex - 1) {
element.first.readBlankDelta(interpolationDelay) element.element.readBlankDelta(interpolationDelay)
} else if (i == nextIndex - 1) { } else if (i == nextIndex - 1) {
element.first.readDelta(data, interpolationDelay, isLegacy) element.element.readDelta(data, interpolationDelay, isLegacy)
nextIndex = data.readVarInt() nextIndex = data.readVarInt()
} else { } else {
throw IllegalStateException("Networked element group indexes were written out of order") throw IllegalStateException("Networked element group indexes were written out of order")
@ -129,7 +130,7 @@ class NetworkedGroup() : NetworkedElement() {
} else { } else {
while (nextIndex != 0) { while (nextIndex != 0) {
val element = elements.getOrNull(nextIndex - 1) ?: throw NoSuchElementException("Unknown networked element with index ${nextIndex - 1}!") val element = elements.getOrNull(nextIndex - 1) ?: throw NoSuchElementException("Unknown networked element with index ${nextIndex - 1}!")
element.first.readDelta(data, interpolationDelay, isLegacy) element.element.readDelta(data, interpolationDelay, isLegacy)
nextIndex = data.readVarInt() nextIndex = data.readVarInt()
} }
} }
@ -138,7 +139,7 @@ class NetworkedGroup() : NetworkedElement() {
override fun readBlankDelta(interpolationDelay: Double) { override fun readBlankDelta(interpolationDelay: Double) {
if (isInterpolating) { if (isInterpolating) {
elements.forEach { it.first.readBlankDelta(interpolationDelay) } elements.forEach { it.element.readBlankDelta(interpolationDelay) }
} }
} }
@ -151,16 +152,16 @@ class NetworkedGroup() : NetworkedElement() {
if (elements.size == 1) { if (elements.size == 1) {
// one element - pass through // one element - pass through
elements[0].first.writeDelta(data, remoteVersion, isLegacy) elements[0].element.writeDelta(data, remoteVersion, isLegacy)
} else { } else {
// otherwise, sequentially scan elements for updates // otherwise, sequentially scan elements for updates
// and if element needs updating, write element's index as variable length integer; // and if element needs updating, write element's index as variable length integer;
// then write element itself // then write element itself
for ((i, element) in elements.withIndex()) { for ((i, element) in elements.withIndex()) {
if (element.first.hasChangedSince(remoteVersion)) { if (element.element.hasChangedSince(remoteVersion)) {
data.writeVarInt(i + 1) data.writeVarInt(i + 1)
element.first.writeDelta(data, remoteVersion, isLegacy) element.element.writeDelta(data, remoteVersion, isLegacy)
} }
} }
@ -169,10 +170,10 @@ class NetworkedGroup() : NetworkedElement() {
} }
override fun hasChangedSince(version: Long): Boolean { override fun hasChangedSince(version: Long): Boolean {
return elements.any { it.first.hasChangedSince(version) } return listenedVersion >= version || nonListenableElements.any { it.hasChangedSince(version) }
} }
override fun bumpVersion() { override fun bumpVersion() {
elements.forEach { it.first.bumpVersion() } elements.forEach { it.element.bumpVersion() }
} }
} }

View File

@ -7,6 +7,7 @@ import ru.dbotthepony.kommons.io.readByteArray
import ru.dbotthepony.kommons.io.readVarInt import ru.dbotthepony.kommons.io.readVarInt
import ru.dbotthepony.kommons.io.writeVarInt import ru.dbotthepony.kommons.io.writeVarInt
import ru.dbotthepony.kommons.util.KOptional import ru.dbotthepony.kommons.util.KOptional
import ru.dbotthepony.kommons.util.Listenable
import ru.dbotthepony.kstarbound.collect.RandomListIterator import ru.dbotthepony.kstarbound.collect.RandomListIterator
import ru.dbotthepony.kstarbound.collect.RandomSubList import ru.dbotthepony.kstarbound.collect.RandomSubList
import ru.dbotthepony.kstarbound.io.writeByteArray import ru.dbotthepony.kstarbound.io.writeByteArray
@ -47,10 +48,15 @@ class NetworkedList<E>(
private val clearEntry = Entry<E>(Type.CLEAR, 0, KOptional()) private val clearEntry = Entry<E>(Type.CLEAR, 0, KOptional())
private var isInterpolating = false private var isInterpolating = false
private val listeners = CopyOnWriteArrayList<Runnable>() private val listeners = Listenable.Void()
fun addListener(listener: Runnable) { fun addListener(listener: Runnable) {
listeners.add(listener) listeners.addListener(listener)
}
@Deprecated("Internal API")
override fun listen(listener: Runnable): Listenable.L {
return listeners.addListener(listener)
} }
/** /**
@ -107,7 +113,7 @@ class NetworkedList<E>(
} }
purgeBacklog() purgeBacklog()
listeners.forEach { it.run() } listeners.run()
} }
override fun writeInitial(data: DataOutputStream, isLegacy: Boolean) { override fun writeInitial(data: DataOutputStream, isLegacy: Boolean) {
@ -157,7 +163,7 @@ class NetworkedList<E>(
} }
purgeBacklog() purgeBacklog()
listeners.forEach { it.run() } listeners.run()
} else { } else {
readInitial(data, false) readInitial(data, false)
} }
@ -248,7 +254,7 @@ class NetworkedList<E>(
elements.add(index, element) elements.add(index, element)
backlog.add(currentVersion() to Entry(index, element)) backlog.add(currentVersion() to Entry(index, element))
purgeBacklog() purgeBacklog()
listeners.forEach { it.run() } listeners.run()
} }
override fun addAll(index: Int, elements: Collection<E>): Boolean { override fun addAll(index: Int, elements: Collection<E>): Boolean {
@ -265,7 +271,7 @@ class NetworkedList<E>(
backlog.clear() backlog.clear()
backlog.add(currentVersion() to clearEntry) backlog.add(currentVersion() to clearEntry)
elements.clear() elements.clear()
listeners.forEach { it.run() } listeners.run()
} }
override fun listIterator(): MutableListIterator<E> { override fun listIterator(): MutableListIterator<E> {
@ -296,7 +302,7 @@ class NetworkedList<E>(
val element = elements.removeAt(index) val element = elements.removeAt(index)
backlog.add(currentVersion() to Entry(index)) backlog.add(currentVersion() to Entry(index))
purgeBacklog() purgeBacklog()
listeners.forEach { it.run() } listeners.run()
return element return element
} }
@ -318,7 +324,7 @@ class NetworkedList<E>(
val old = elements.set(index, element) val old = elements.set(index, element)
backlog.add(currentVersion() to Entry(index, element)) backlog.add(currentVersion() to Entry(index, element))
purgeBacklog() purgeBacklog()
listeners.forEach { it.run() } listeners.run()
return old return old
} }

View File

@ -97,6 +97,11 @@ class NetworkedMap<K, V>(
return Listener(ListenableMap.RunnableAdapter(listener)) return Listener(ListenableMap.RunnableAdapter(listener))
} }
@Deprecated("Internal API")
override fun listen(listener: Runnable): Listenable.L {
return Listener(ListenableMap.RunnableAdapter(listener))
}
private val dumbCodec by lazy { private val dumbCodec by lazy {
StreamCodec.Map(keyCodec.second, valueCodec.second, ::HashMap) StreamCodec.Map(keyCodec.second, valueCodec.second, ::HashMap)
} }
@ -316,7 +321,7 @@ class NetworkedMap<K, V>(
} }
override fun hasChangedSince(version: Long): Boolean { override fun hasChangedSince(version: Long): Boolean {
return backlog.any { it.first >= version } return backlog.isNotEmpty() && backlog.last().first >= version
} }
override fun readBlankDelta(interpolationDelay: Double) {} override fun readBlankDelta(interpolationDelay: Double) {}