some changes for SynchedDelegates (will be discarded anyway)
This commit is contained in:
parent
3c797b9131
commit
164ba29add
@ -2,6 +2,7 @@
|
||||
|
||||
package ru.dbotthepony.kommons.io
|
||||
|
||||
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
|
||||
import it.unimi.dsi.fastutil.objects.Reference2ObjectFunction
|
||||
import it.unimi.dsi.fastutil.objects.Reference2ObjectOpenHashMap
|
||||
import it.unimi.dsi.fastutil.objects.ReferenceArraySet
|
||||
@ -12,7 +13,6 @@ import ru.dbotthepony.kommons.util.ListenableDelegate
|
||||
import ru.dbotthepony.kommons.util.DelegateGetter
|
||||
import ru.dbotthepony.kommons.util.DelegateSetter
|
||||
import ru.dbotthepony.kommons.util.Listenable
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.DataInputStream
|
||||
import java.io.DataOutputStream
|
||||
import java.io.InputStream
|
||||
@ -25,7 +25,7 @@ import java.util.function.Supplier
|
||||
* Universal, one-to-many value synchronizer, allowing to synchronize values from server to client
|
||||
* anywhere, where input/output streams are supported
|
||||
*/
|
||||
@Suppress("unused", "BlockingMethodInNonBlockingContext")
|
||||
@Suppress("unused")
|
||||
class SynchedDelegates(private val callback: Runnable, private val alwaysCallCallback: Boolean) {
|
||||
constructor() : this(Runnable {}, false)
|
||||
constructor(callback: Runnable) : this(callback, false)
|
||||
@ -38,16 +38,13 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal
|
||||
// но если поля нет в пакете, то всё окей
|
||||
private val fields = ArrayList<AbstractValue?>(0)
|
||||
private val observers = ArrayList<Observer>(0)
|
||||
|
||||
private val endpoints = ArrayList<WeakReference<Endpoint>>(1)
|
||||
private var lastEndpointCleanup = System.nanoTime()
|
||||
private var nextFieldID = 0
|
||||
|
||||
val hasObservers: Boolean get() = observers.isNotEmpty()
|
||||
|
||||
var isEmpty: Boolean = true
|
||||
private set
|
||||
|
||||
val isNotEmpty: Boolean get() = !isEmpty
|
||||
|
||||
var isDirty: Boolean = false
|
||||
private set(value) {
|
||||
if (value != field) {
|
||||
@ -63,16 +60,24 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal
|
||||
}
|
||||
}
|
||||
|
||||
private val boundEndpoints = Collections.synchronizedMap(WeakHashMap<Any, Endpoint>())
|
||||
|
||||
fun computeEndpointFor(obj: Any): Endpoint {
|
||||
return boundEndpoints.computeIfAbsent(obj) { Endpoint() }
|
||||
}
|
||||
|
||||
fun removeEndpointFor(obj: Any): Endpoint? {
|
||||
return boundEndpoints.remove(obj)
|
||||
}
|
||||
|
||||
fun endpointFor(obj: Any): Endpoint? {
|
||||
return boundEndpoints[obj]
|
||||
}
|
||||
|
||||
fun markClean() {
|
||||
isDirty = false
|
||||
}
|
||||
|
||||
private var endpointsMaxCapacity = 1
|
||||
private val endpoints = ArrayList<WeakReference<Endpoint>>(1)
|
||||
val defaultEndpoint = Endpoint()
|
||||
|
||||
private var lastEndpointCleanup = System.nanoTime()
|
||||
|
||||
private fun notifyEndpoints(dirtyField: AbstractValue) {
|
||||
isDirty = true
|
||||
|
||||
@ -84,21 +89,17 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal
|
||||
private inline fun forEachEndpoint(execute: (Endpoint) -> Unit) {
|
||||
synchronized(endpoints) {
|
||||
endpoints.forValidRefs { execute.invoke(it) }
|
||||
|
||||
if (endpoints.size < endpointsMaxCapacity / 2) {
|
||||
endpoints.trimToSize()
|
||||
endpointsMaxCapacity = endpoints.size
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val defaultEndpoint = Endpoint()
|
||||
|
||||
inner class Endpoint {
|
||||
init {
|
||||
synchronized(endpoints) {
|
||||
endpoints.add(WeakReference(this))
|
||||
endpointsMaxCapacity = endpointsMaxCapacity.coerceAtLeast(endpoints.size)
|
||||
|
||||
if (System.nanoTime() - lastEndpointCleanup <= 60) {
|
||||
if (System.nanoTime() - lastEndpointCleanup >= 60) {
|
||||
lastEndpointCleanup = System.nanoTime()
|
||||
|
||||
val iterator = endpoints.listIterator()
|
||||
@ -108,11 +109,6 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal
|
||||
iterator.remove()
|
||||
}
|
||||
}
|
||||
|
||||
if (endpoints.size < endpointsMaxCapacity / 2) {
|
||||
endpoints.trimToSize()
|
||||
endpointsMaxCapacity = endpoints.size
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -123,21 +119,22 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal
|
||||
private val mapBacklogs = Reference2ObjectOpenHashMap<Map<*, *>, LinkedList<Pair<Any?, (DataOutputStream) -> Unit>>>()
|
||||
private val setBacklogs = Reference2ObjectOpenHashMap<Set<*>, LinkedList<Pair<Any?, (DataOutputStream) -> Unit>>>()
|
||||
|
||||
var unused: Boolean = false
|
||||
var isDisabled: Boolean = false
|
||||
private set
|
||||
|
||||
fun markUnused() {
|
||||
require(this === defaultEndpoint) { "This is not a default endpoint" }
|
||||
if (unused) return
|
||||
unused = true
|
||||
fun disable() {
|
||||
if (isDisabled) return
|
||||
isDisabled = true
|
||||
mapBacklogs.clear()
|
||||
dirty.clear()
|
||||
|
||||
val iterator = endpoints.listIterator()
|
||||
synchronized(endpoints) {
|
||||
val iterator = endpoints.listIterator()
|
||||
|
||||
for (value in iterator) {
|
||||
if (value.get() === this) {
|
||||
iterator.remove()
|
||||
for (value in iterator) {
|
||||
if (value.get() === this || value.get() == null) {
|
||||
iterator.remove()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -147,16 +144,15 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal
|
||||
}
|
||||
|
||||
fun markDirty() {
|
||||
if (isDisabled) return
|
||||
|
||||
for (field in fields) {
|
||||
field?.markDirty(this)
|
||||
}
|
||||
}
|
||||
|
||||
internal fun addDirty(field: AbstractValue) {
|
||||
if (unused) {
|
||||
return
|
||||
}
|
||||
|
||||
if (isDisabled) return
|
||||
dirty.add(field)
|
||||
}
|
||||
|
||||
@ -165,9 +161,7 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal
|
||||
}
|
||||
|
||||
internal fun <K, V> getMapBacklog(map: Map<K, V>): LinkedList<Pair<Any?, (DataOutputStream) -> Unit>> {
|
||||
if (unused) {
|
||||
return LinkedList()
|
||||
}
|
||||
if (isDisabled) return LinkedList()
|
||||
|
||||
return mapBacklogs.computeIfAbsent(map, Reference2ObjectFunction {
|
||||
LinkedList()
|
||||
@ -179,9 +173,7 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal
|
||||
}
|
||||
|
||||
internal fun <V> getSetBacklog(set: Set<V>): LinkedList<Pair<Any?, (DataOutputStream) -> Unit>> {
|
||||
if (unused) {
|
||||
return LinkedList()
|
||||
}
|
||||
if (isDisabled) return LinkedList()
|
||||
|
||||
return setBacklogs.computeIfAbsent(set, Reference2ObjectFunction {
|
||||
LinkedList()
|
||||
@ -192,12 +184,11 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal
|
||||
setBacklogs.remove(set)
|
||||
}
|
||||
|
||||
fun collectNetworkPayload(): ByteArrayOutputStream? {
|
||||
if (unused || dirty.isEmpty()) {
|
||||
fun collectData(): FastByteArrayOutputStream? {
|
||||
if (isDisabled || dirty.isEmpty())
|
||||
return null
|
||||
}
|
||||
|
||||
val stream = ByteArrayOutputStream()
|
||||
val stream = FastByteArrayOutputStream()
|
||||
val dataStream = DataOutputStream(stream)
|
||||
|
||||
for (field in dirty) {
|
||||
@ -212,20 +203,6 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal
|
||||
}
|
||||
}
|
||||
|
||||
private val boundEndpoints = WeakHashMap<Any, Endpoint>()
|
||||
|
||||
fun computeEndpointFor(obj: Any): Endpoint {
|
||||
return boundEndpoints.computeIfAbsent(obj) { Endpoint() }
|
||||
}
|
||||
|
||||
fun removeEndpointFor(obj: Any): Endpoint? {
|
||||
return boundEndpoints.remove(obj)
|
||||
}
|
||||
|
||||
fun endpointFor(obj: Any): Endpoint? {
|
||||
return boundEndpoints[obj]
|
||||
}
|
||||
|
||||
abstract inner class AbstractValue : Observer {
|
||||
val id: Int
|
||||
|
||||
@ -912,10 +889,10 @@ class SynchedDelegates(private val callback: Runnable, private val alwaysCallCal
|
||||
/**
|
||||
* [defaultEndpoint]#collectNetworkPayload
|
||||
*/
|
||||
fun collectNetworkPayload(): ByteArrayOutputStream? {
|
||||
check(!defaultEndpoint.unused) { "Default endpoint is not used" }
|
||||
fun collectData(): FastByteArrayOutputStream? {
|
||||
check(!defaultEndpoint.isDisabled) { "Default endpoint is not used" }
|
||||
observe()
|
||||
val values = defaultEndpoint.collectNetworkPayload()
|
||||
val values = defaultEndpoint.collectData()
|
||||
markClean()
|
||||
return values
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user