diff --git a/src/main/kotlin/ru/dbotthepony/kommons/io/SynchedDelegates.kt b/src/main/kotlin/ru/dbotthepony/kommons/io/SynchedDelegates.kt index 9e4ff4c..aaeb942 100644 --- a/src/main/kotlin/ru/dbotthepony/kommons/io/SynchedDelegates.kt +++ b/src/main/kotlin/ru/dbotthepony/kommons/io/SynchedDelegates.kt @@ -2,6 +2,7 @@ package ru.dbotthepony.kommons.io +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import it.unimi.dsi.fastutil.objects.Reference2ObjectFunction import it.unimi.dsi.fastutil.objects.Reference2ObjectOpenHashMap import it.unimi.dsi.fastutil.objects.ReferenceArraySet @@ -12,7 +13,6 @@ import ru.dbotthepony.kommons.util.ListenableDelegate import ru.dbotthepony.kommons.util.DelegateGetter import ru.dbotthepony.kommons.util.DelegateSetter import ru.dbotthepony.kommons.util.Listenable -import java.io.ByteArrayOutputStream import java.io.DataInputStream import java.io.DataOutputStream import java.io.InputStream @@ -25,7 +25,7 @@ import java.util.function.Supplier * Universal, one-to-many value synchronizer, allowing to synchronize values from server to client * anywhere, where input/output streams are supported */ -@Suppress("unused", "BlockingMethodInNonBlockingContext") +@Suppress("unused") class SynchedDelegates(private val callback: Runnable, private val alwaysCallCallback: Boolean) { constructor() : this(Runnable {}, false) constructor(callback: Runnable) : this(callback, false) @@ -38,16 +38,13 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal // но если поля нет в пакете, то всё окей private val fields = ArrayList(0) private val observers = ArrayList(0) - + private val endpoints = ArrayList>(1) + private var lastEndpointCleanup = System.nanoTime() private var nextFieldID = 0 - val hasObservers: Boolean get() = observers.isNotEmpty() - var isEmpty: Boolean = true private set - val isNotEmpty: Boolean get() = !isEmpty - var isDirty: Boolean = false private set(value) { if (value != field) { @@ -63,16 +60,24 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal } } + private val boundEndpoints = Collections.synchronizedMap(WeakHashMap()) + + fun computeEndpointFor(obj: Any): Endpoint { + return boundEndpoints.computeIfAbsent(obj) { Endpoint() } + } + + fun removeEndpointFor(obj: Any): Endpoint? { + return boundEndpoints.remove(obj) + } + + fun endpointFor(obj: Any): Endpoint? { + return boundEndpoints[obj] + } + fun markClean() { isDirty = false } - private var endpointsMaxCapacity = 1 - private val endpoints = ArrayList>(1) - val defaultEndpoint = Endpoint() - - private var lastEndpointCleanup = System.nanoTime() - private fun notifyEndpoints(dirtyField: AbstractValue) { isDirty = true @@ -84,21 +89,17 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal private inline fun forEachEndpoint(execute: (Endpoint) -> Unit) { synchronized(endpoints) { endpoints.forValidRefs { execute.invoke(it) } - - if (endpoints.size < endpointsMaxCapacity / 2) { - endpoints.trimToSize() - endpointsMaxCapacity = endpoints.size - } } } + val defaultEndpoint = Endpoint() + inner class Endpoint { init { synchronized(endpoints) { endpoints.add(WeakReference(this)) - endpointsMaxCapacity = endpointsMaxCapacity.coerceAtLeast(endpoints.size) - if (System.nanoTime() - lastEndpointCleanup <= 60) { + if (System.nanoTime() - lastEndpointCleanup >= 60) { lastEndpointCleanup = System.nanoTime() val iterator = endpoints.listIterator() @@ -108,11 +109,6 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal iterator.remove() } } - - if (endpoints.size < endpointsMaxCapacity / 2) { - endpoints.trimToSize() - endpointsMaxCapacity = endpoints.size - } } } } @@ -123,21 +119,22 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal private val mapBacklogs = Reference2ObjectOpenHashMap, LinkedList Unit>>>() private val setBacklogs = Reference2ObjectOpenHashMap, LinkedList Unit>>>() - var unused: Boolean = false + var isDisabled: Boolean = false private set - fun markUnused() { - require(this === defaultEndpoint) { "This is not a default endpoint" } - if (unused) return - unused = true + fun disable() { + if (isDisabled) return + isDisabled = true mapBacklogs.clear() dirty.clear() - val iterator = endpoints.listIterator() + synchronized(endpoints) { + val iterator = endpoints.listIterator() - for (value in iterator) { - if (value.get() === this) { - iterator.remove() + for (value in iterator) { + if (value.get() === this || value.get() == null) { + iterator.remove() + } } } } @@ -147,16 +144,15 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal } fun markDirty() { + if (isDisabled) return + for (field in fields) { field?.markDirty(this) } } internal fun addDirty(field: AbstractValue) { - if (unused) { - return - } - + if (isDisabled) return dirty.add(field) } @@ -165,9 +161,7 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal } internal fun getMapBacklog(map: Map): LinkedList Unit>> { - if (unused) { - return LinkedList() - } + if (isDisabled) return LinkedList() return mapBacklogs.computeIfAbsent(map, Reference2ObjectFunction { LinkedList() @@ -179,9 +173,7 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal } internal fun getSetBacklog(set: Set): LinkedList Unit>> { - if (unused) { - return LinkedList() - } + if (isDisabled) return LinkedList() return setBacklogs.computeIfAbsent(set, Reference2ObjectFunction { LinkedList() @@ -192,12 +184,11 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal setBacklogs.remove(set) } - fun collectNetworkPayload(): ByteArrayOutputStream? { - if (unused || dirty.isEmpty()) { + fun collectData(): FastByteArrayOutputStream? { + if (isDisabled || dirty.isEmpty()) return null - } - val stream = ByteArrayOutputStream() + val stream = FastByteArrayOutputStream() val dataStream = DataOutputStream(stream) for (field in dirty) { @@ -212,20 +203,6 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal } } - private val boundEndpoints = WeakHashMap() - - fun computeEndpointFor(obj: Any): Endpoint { - return boundEndpoints.computeIfAbsent(obj) { Endpoint() } - } - - fun removeEndpointFor(obj: Any): Endpoint? { - return boundEndpoints.remove(obj) - } - - fun endpointFor(obj: Any): Endpoint? { - return boundEndpoints[obj] - } - abstract inner class AbstractValue : Observer { val id: Int @@ -912,10 +889,10 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal /** * [defaultEndpoint]#collectNetworkPayload */ - fun collectNetworkPayload(): ByteArrayOutputStream? { - check(!defaultEndpoint.unused) { "Default endpoint is not used" } + fun collectData(): FastByteArrayOutputStream? { + check(!defaultEndpoint.isDisabled) { "Default endpoint is not used" } observe() - val values = defaultEndpoint.collectNetworkPayload() + val values = defaultEndpoint.collectData() markClean() return values }