Remove streamcodec and delegate syncher

This commit is contained in:
DBotThePony 2024-08-10 19:19:58 +07:00
parent 415267586d
commit 7609a23dd6
Signed by: DBot
GPG Key ID: DCC23B5715498507
4 changed files with 1 additions and 1282 deletions

View File

@ -4,7 +4,7 @@ kotlin.code.style=official
specifyKotlinAsDependency=false
projectGroup=ru.dbotthepony.kommons
projectVersion=3.0.2
projectVersion=3.1.0
guavaDepVersion=33.0.0
gsonDepVersion=2.8.9

View File

@ -1,846 +0,0 @@
package ru.dbotthepony.kommons.io
import it.unimi.dsi.fastutil.ints.IntAVLTreeSet
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap
import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet
import ru.dbotthepony.kommons.collect.ListenableMap
import ru.dbotthepony.kommons.collect.ListenableSet
import ru.dbotthepony.kommons.util.Delegate
import ru.dbotthepony.kommons.util.DelegateGetter
import ru.dbotthepony.kommons.util.DelegateSetter
import ru.dbotthepony.kommons.util.KOptional
import ru.dbotthepony.kommons.util.Listenable
import ru.dbotthepony.kommons.util.ListenableDelegate
import ru.dbotthepony.kommons.util.Observer
import ru.dbotthepony.kommons.util.ValueObserver
import java.io.Closeable
import java.io.DataInputStream
import java.io.DataOutputStream
import java.io.InputStream
import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.function.BooleanSupplier
import java.util.function.Consumer
import java.util.function.DoubleSupplier
import java.util.function.IntSupplier
import java.util.function.LongSupplier
import java.util.function.Supplier
import kotlin.collections.ArrayList
import kotlin.concurrent.withLock
/**
* Universal, one-to-many delegate/value synchronizer.
*
* Values and delegates can be attached using [add], or by constructing subclassed directly.
* Delta changes are tracked by [DelegateSyncher.Remote] instances.
*
* In general, this class is not meant to be _structurally_ concurrently mutated by different threads,
* to avoid structure corruption a lock is employed.
*
* Attached delegates can be safely mutated by multiple threads concurrently
* (attached [ListenableDelegate] can call [Listenable.addListener] callback from different threads).
*/
@Suppress("UNCHECKED_CAST", "UNUSED")
class DelegateSyncher : Observer {
private val lock = ReentrantLock()
private val slots = ArrayList<AbstractSlot?>()
private val gaps = IntAVLTreeSet()
private val observers = ArrayList<AbstractSlot>()
private val remotes = ArrayList<Remote>()
private var isRemote = false
override fun observe(): Boolean {
var any = false
observers.forEach { any = it.observe() || any }
return any
}
fun read(stream: DataInputStream) {
isRemote = true
var readID = stream.readVarInt()
while (readID > 0) {
val slot = slots.getOrNull(readID - 1) ?: throw IndexOutOfBoundsException("Unknown networked slot ${readID - 1}!")
slot.read(stream)
readID = stream.readVarInt()
}
}
fun read(stream: InputStream) {
if (stream is DataInputStream)
return read(stream)
read(DataInputStream(stream))
}
abstract inner class AbstractSlot : Closeable, Observer, Comparable<AbstractSlot> {
val id: Int
protected val isRemoved = AtomicBoolean()
fun remove() {
if (isRemoved.compareAndSet(false, true)) {
lock.withLock {
slots[id] = null
gaps.add(id)
remoteSlots.forEach {
it.remove()
}
remoteSlots.clear()
remove0()
}
}
}
final override fun close() {
return remove()
}
final override fun compareTo(other: AbstractSlot): Int {
return id.compareTo(other.id)
}
protected abstract fun remove0()
internal abstract fun read(stream: DataInputStream)
internal abstract fun write(stream: DataOutputStream)
internal val remoteSlots = CopyOnWriteArrayList<Remote.RemoteSlot>()
protected fun markDirty() {
if (!isRemote && !isRemoved.get()) {
remoteSlots.forEach {
it.markDirty()
}
}
}
init {
lock.withLock {
if (gaps.isNotEmpty()) {
val gap = gaps.firstInt()
gaps.remove(gap)
id = gap
check(slots[id] == null) { "Expected slot $id to be empty" }
slots[id] = this
} else {
id = slots.size
slots.add(this)
}
remotes.forEach {
it.addSlot(this)
}
}
}
}
inner class Slot<V>(val delegate: ListenableDelegate<V>, val codec: StreamCodec<V>) : AbstractSlot(), ListenableDelegate<V> {
constructor(delegate: Supplier<V>, codec: StreamCodec<V>) : this(ListenableDelegate.CustomShadow(delegate, codec::compare, codec::copy), codec)
private val l = delegate.addListener(Consumer {
markDirty()
listeners.accept(it)
})
init {
if (delegate is ListenableDelegate.AbstractShadow) {
observers.add(this)
}
}
private val listeners = Listenable.Impl<V>()
override fun remove0() {
l.remove()
listeners.clear()
}
override fun read(stream: DataInputStream) {
check(!isRemoved.get()) { "This network slot was removed" }
delegate.accept(codec.read(stream))
}
override fun write(stream: DataOutputStream) {
check(!isRemoved.get()) { "This network slot was removed" }
codec.write(stream, delegate.get())
}
override fun get(): V {
check(!isRemoved.get()) { "This network slot was removed" }
return delegate.get()
}
override fun accept(t: V) {
check(!isRemoved.get()) { "This network slot was removed" }
delegate.accept(t)
}
override fun addListener(listener: Consumer<V>): Listenable.L {
check(!isRemoved.get()) { "This network slot was removed" }
return listeners.addListener(listener)
}
override fun observe(): Boolean {
check(!isRemoved.get()) { "This network slot was removed" }
if (delegate is ListenableDelegate.AbstractShadow) {
return delegate.observe()
}
return false
}
}
inner class ObservedSlot<V>(val delegate: Delegate<V>, val codec: StreamCodec<V>) : AbstractSlot(), ListenableDelegate<V>, ValueObserver<V> {
private val listeners = Listenable.Impl<V>()
private var observed: Any? = Mark
private val observeLock = ReentrantLock()
init {
observers.add(this)
}
override fun remove0() {
listeners.clear()
}
override fun read(stream: DataInputStream) {
check(!isRemoved.get()) { "This network slot was removed" }
val read = codec.read(stream)
observed = codec.copy(read)
delegate.accept(read)
}
override fun write(stream: DataOutputStream) {
check(!isRemoved.get()) { "This network slot was removed" }
codec.write(stream, delegate.get())
}
override fun observe(): Boolean {
val get = delegate.get()
if (observed === Mark || !codec.compare(get, observed as V)) {
observeLock.withLock {
if (observed === Mark || !codec.compare(get, observed as V)) {
observed = codec.copy(get)
markDirty()
listeners.accept(get)
}
return true
}
}
return false
}
override fun getAndObserve(): Pair<V, Boolean> {
val get = delegate.get()
if (observed === Mark || !codec.compare(get, observed as V)) {
observeLock.withLock {
if (observed === Mark || !codec.compare(get, observed as V)) {
observed = codec.copy(get)
markDirty()
listeners.accept(get)
}
return get to true
}
}
return get to false
}
override fun get(): V {
check(!isRemoved.get()) { "This network slot was removed" }
return delegate.get()
}
override fun accept(t: V) {
check(!isRemoved.get()) { "This network slot was removed" }
delegate.accept(t)
}
override fun addListener(listener: Consumer<V>): Listenable.L {
check(!isRemoved.get()) { "This network slot was removed" }
return listeners.addListener(listener)
}
}
private object Mark
internal data class SetAction<V>(val value: KOptional<V>, val action: Int)
inner class SetSlot<V>(val delegate: ListenableSet<V>, val codec: StreamCodec<V>) : AbstractSlot() {
private val listener = object : ListenableSet.SetListener<V> {
override fun onClear() {
remoteSlots.forEach {
(it as Remote.RemoteSetSlot<V>).changelist.clear()
synchronized(it.changelistLock) {
it.changelist.add(SetAction(KOptional(), CLEAR))
}
listeners.forEach {
it.listener.onClear()
}
it.markDirty()
}
}
override fun onValueAdded(element: V) {
remoteSlots.forEach {
it as Remote.RemoteSetSlot<V>
synchronized(it.changelistLock) {
it.changelist.removeIf { it.value.valueEquals(element) }
it.changelist.add(SetAction(KOptional(codec.copy(element)), ADD))
}
listeners.forEach {
it.listener.onValueAdded(element)
}
it.markDirty()
}
}
override fun onValueRemoved(element: V) {
remoteSlots.forEach {
it as Remote.RemoteSetSlot<V>
synchronized(it.changelistLock) {
it.changelist.removeIf { it.value.valueEquals(element) }
it.changelist.add(SetAction(KOptional(codec.copy(element)), REMOVE))
}
listeners.forEach {
it.listener.onValueRemoved(element)
}
it.markDirty()
}
}
}
private val l = delegate.addListener(listener)
private val listeners = CopyOnWriteArrayList<Listener>()
override fun remove0() {
l.remove()
listeners.clear()
}
private inner class Listener(val listener: ListenableSet.SetListener<V>): Listenable.L {
private var isRemoved = false
init {
listeners.add(this)
}
override fun remove() {
if (!isRemoved) {
isRemoved = true
listeners.remove(this)
}
}
}
fun addListener(listener: ListenableSet.SetListener<V>): Listenable.L {
check(!isRemoved.get()) { "This network slot was removed" }
return Listener(listener)
}
fun addListener(listener: Runnable): Listenable.L {
check(!isRemoved.get()) { "This network slot was removed" }
return Listener(ListenableSet.RunnableAdapter(listener))
}
override fun read(stream: DataInputStream) {
check(!isRemoved.get()) { "This network slot was removed" }
var action = stream.read()
while (true) {
when (action) {
ADD -> delegate.add(codec.read(stream))
REMOVE -> delegate.remove(codec.read(stream))
CLEAR -> delegate.clear()
else -> break
}
action = stream.read()
}
}
override fun write(stream: DataOutputStream) {
check(!isRemoved.get()) { "This network slot was removed" }
throw RuntimeException("unreachable code")
}
override fun observe(): Boolean {
check(!isRemoved.get()) { "This network slot was removed" }
return false
}
}
internal data class MapAction<K, V>(val key: KOptional<K>, val value: KOptional<V>, val action: Int)
inner class MapSlot<K, V>(val delegate: ListenableMap<K, V>, val keyCodec: StreamCodec<K>, val valueCodec: StreamCodec<V>) : AbstractSlot() {
private val listener = object : ListenableMap.MapListener<K, V> {
override fun onClear() {
remoteSlots.forEach {
it as Remote.RemoteMapSlot<K, V>
synchronized(it.changelistLock) {
it.changelist.clear()
it.changelist.add(MapAction(KOptional(), KOptional(), CLEAR))
}
listeners.forEach {
it.listener.onClear()
}
it.markDirty()
}
}
override fun onValueAdded(key: K, value: V) {
remoteSlots.forEach {
it as Remote.RemoteMapSlot<K, V>
synchronized(it.changelistLock) {
it.changelist.removeIf { it.key.valueEquals(key) }
it.changelist.add(MapAction(KOptional(keyCodec.copy(key)), KOptional(valueCodec.copy(value)), ADD))
}
listeners.forEach {
it.listener.onValueAdded(key, value)
}
it.markDirty()
}
}
override fun onValueRemoved(key: K, value: V) {
remoteSlots.forEach {
it as Remote.RemoteMapSlot<K, V>
synchronized(it.changelistLock) {
it.changelist.removeIf { it.key.valueEquals(key) }
it.changelist.add(MapAction(KOptional(keyCodec.copy(key)), KOptional(), REMOVE))
}
listeners.forEach {
it.listener.onValueRemoved(key, value)
}
it.markDirty()
}
}
}
private val l = delegate.addListener(listener)
private val listeners = CopyOnWriteArrayList<Listener>()
override fun remove0() {
l.remove()
listeners.clear()
}
private inner class Listener(val listener: ListenableMap.MapListener<K, V>): Listenable.L {
private var isRemoved = false
init {
listeners.add(this)
}
override fun remove() {
if (!isRemoved) {
isRemoved = true
listeners.remove(this)
}
}
}
fun addListener(listener: ListenableMap.MapListener<K, V>): Listenable.L {
check(!isRemoved.get()) { "This network slot was removed" }
return Listener(listener)
}
fun addListener(listener: Runnable): Listenable.L {
check(!isRemoved.get()) { "This network slot was removed" }
return Listener(ListenableMap.RunnableAdapter(listener))
}
override fun read(stream: DataInputStream) {
check(!isRemoved.get()) { "This network slot was removed" }
var action = stream.read()
while (true) {
when (action) {
ADD -> delegate.put(keyCodec.read(stream), valueCodec.read(stream))
REMOVE -> delegate.remove(keyCodec.read(stream))
CLEAR -> delegate.clear()
else -> break
}
action = stream.read()
}
}
override fun write(stream: DataOutputStream) {
check(!isRemoved.get()) { "This network slot was removed" }
throw RuntimeException("unreachable code")
}
override fun observe(): Boolean {
check(!isRemoved.get()) { "This network slot was removed" }
return false
}
}
fun <V> add(value: V, codec: StreamCodec<V>, setter: DelegateSetter<V> = DelegateSetter.passthrough(), getter: DelegateGetter<V> = DelegateGetter.passthrough()): Slot<V> {
return Slot(ListenableDelegate.maskSmart(value, getter, setter), codec)
}
fun <V> add(delegate: ListenableDelegate<V>, codec: StreamCodec<V>): Slot<V> {
return Slot(delegate, codec)
}
fun <V> add(delegate: Delegate<V>, codec: StreamCodec<V>): ObservedSlot<V> {
return ObservedSlot(delegate, codec)
}
fun <V> add(delegate: Supplier<V>, codec: StreamCodec<V>): Slot<V> {
return computed(delegate, codec)
}
fun <E> add(delegate: ListenableSet<E>, codec: StreamCodec<E>): SetSlot<E> {
return SetSlot(delegate, codec)
}
fun <K, V> add(delegate: ListenableMap<K, V>, keyCodec: StreamCodec<K>, valueCodec: StreamCodec<V>): MapSlot<K, V> {
return MapSlot(delegate, keyCodec, valueCodec)
}
fun computedByte(delegate: Supplier<Byte>) = Slot(ListenableDelegate.Shadow(delegate), ByteValueCodec)
fun computedShort(delegate: Supplier<Short>) = Slot(ListenableDelegate.Shadow(delegate), ShortValueCodec)
fun computedChar(delegate: Supplier<Char>) = Slot(ListenableDelegate.Shadow(delegate), CharValueCodec)
fun computedInt(delegate: Supplier<Int>) = Slot(ListenableDelegate.Shadow(delegate), VarIntValueCodec)
fun computedLong(delegate: Supplier<Long>) = Slot(ListenableDelegate.Shadow(delegate), VarLongValueCodec)
fun computedFloat(delegate: Supplier<Float>) = Slot(ListenableDelegate.Shadow(delegate), FloatValueCodec)
fun computedDouble(delegate: Supplier<Double>) = Slot(ListenableDelegate.Shadow(delegate), DoubleValueCodec)
fun computedBoolean(delegate: Supplier<Boolean>) = Slot(ListenableDelegate.Shadow(delegate), BooleanValueCodec)
fun computedString(delegate: Supplier<String>) = Slot(ListenableDelegate.Shadow(delegate), BinaryStringCodec)
fun computedUUID(delegate: Supplier<UUID>) = Slot(ListenableDelegate.Shadow(delegate), UUIDValueCodec)
fun <V> computed(delegate: Supplier<V>, codec: StreamCodec<V>) = Slot(ListenableDelegate.CustomShadow(delegate, codec::compare, codec::copy), codec)
fun computedInt(delegate: IntSupplier) = Slot(ListenableDelegate.Shadow(delegate::getAsInt), VarIntValueCodec)
fun computedLong(delegate: LongSupplier) = Slot(ListenableDelegate.Shadow(delegate::getAsLong), VarLongValueCodec)
fun computedDouble(delegate: DoubleSupplier) = Slot(ListenableDelegate.Shadow(delegate::getAsDouble), DoubleValueCodec)
fun computedBoolean(delegate: BooleanSupplier) = Slot(ListenableDelegate.Shadow(delegate::getAsBoolean), BooleanValueCodec)
@JvmName("vbyte")
@JvmOverloads
fun byte(value: Byte = 0, setter: DelegateSetter<Byte> = DelegateSetter.passthrough(), getter: DelegateGetter<Byte> = DelegateGetter.passthrough()): Slot<Byte> {
return add(ListenableDelegate.maskSmart(value, getter, setter), ByteValueCodec)
}
@JvmName("vshort")
@JvmOverloads
fun short(value: Short = 0, setter: DelegateSetter<Short> = DelegateSetter.passthrough(), getter: DelegateGetter<Short> = DelegateGetter.passthrough()): Slot<Short> {
return add(ListenableDelegate.maskSmart(value, getter, setter), ShortValueCodec)
}
@JvmName("vchar")
@JvmOverloads
fun char(value: Char, setter: DelegateSetter<Char> = DelegateSetter.passthrough(), getter: DelegateGetter<Char> = DelegateGetter.passthrough()): Slot<Char> {
return add(ListenableDelegate.maskSmart(value, getter, setter), CharValueCodec)
}
@JvmName("vint")
@JvmOverloads
fun int(value: Int = 0, setter: DelegateSetter<Int> = DelegateSetter.passthrough(), getter: DelegateGetter<Int> = DelegateGetter.passthrough()): Slot<Int> {
return add(ListenableDelegate.maskSmart(value, getter, setter), VarIntValueCodec)
}
@JvmName("vlong")
@JvmOverloads
fun long(value: Long = 0L, setter: DelegateSetter<Long> = DelegateSetter.passthrough(), getter: DelegateGetter<Long> = DelegateGetter.passthrough()): Slot<Long> {
return add(ListenableDelegate.maskSmart(value, getter, setter), VarLongValueCodec)
}
@JvmName("vfloat")
@JvmOverloads
fun float(value: Float = 0f, setter: DelegateSetter<Float> = DelegateSetter.passthrough(), getter: DelegateGetter<Float> = DelegateGetter.passthrough()): Slot<Float> {
return add(ListenableDelegate.maskSmart(value, getter, setter), FloatValueCodec)
}
@JvmName("vdouble")
@JvmOverloads
fun double(value: Double = 0.0, setter: DelegateSetter<Double> = DelegateSetter.passthrough(), getter: DelegateGetter<Double> = DelegateGetter.passthrough()): Slot<Double> {
return add(ListenableDelegate.maskSmart(value, getter, setter), DoubleValueCodec)
}
@JvmName("vboolean")
@JvmOverloads
fun boolean(value: Boolean = false, setter: DelegateSetter<Boolean> = DelegateSetter.passthrough(), getter: DelegateGetter<Boolean> = DelegateGetter.passthrough()): Slot<Boolean> {
return add(ListenableDelegate.maskSmart(value, getter, setter), BooleanValueCodec)
}
@JvmOverloads
fun string(value: String, setter: DelegateSetter<String> = DelegateSetter.passthrough(), getter: DelegateGetter<String> = DelegateGetter.passthrough()): Slot<String> {
return add(ListenableDelegate.maskSmart(value, getter, setter), BinaryStringCodec)
}
@JvmOverloads
fun uuid(value: UUID, setter: DelegateSetter<UUID> = DelegateSetter.passthrough(), getter: DelegateGetter<UUID> = DelegateGetter.passthrough()): Slot<UUID> {
return add(ListenableDelegate.maskSmart(value, getter, setter), UUIDValueCodec)
}
@JvmOverloads
fun <E> set(codec: StreamCodec<E>, backing: MutableSet<E> = ObjectOpenHashSet()): SetSlot<E> {
return add(ListenableSet(backing), codec)
}
@JvmOverloads
fun <K, V> map(keyCodec: StreamCodec<K>, valueCodec: StreamCodec<V>, backing: MutableMap<K, V> = Object2ObjectOpenHashMap()): MapSlot<K, V> {
return add(ListenableMap(backing), keyCodec, valueCodec)
}
@JvmOverloads
fun <E : Enum<E>> enum(value: E, setter: DelegateSetter<E> = DelegateSetter.passthrough(), getter: DelegateGetter<E> = DelegateGetter.passthrough()): Slot<E> {
return add(ListenableDelegate.maskSmart(value, getter, setter), StreamCodec.Enum(value::class.java))
}
/**
* Remotes must be [close]d when no longer in use, otherwise they will
* leak memory and decrease performance of syncher.
*
* To get changes to be networked to remote client, [write] should be called.
*/
inner class Remote : Closeable {
@Volatile
private var isRemoved = false
internal val dirty = ConcurrentLinkedQueue<RemoteSlot>()
private val remoteSlots = CopyOnWriteArrayList<RemoteSlot>()
internal open inner class RemoteSlot(val parent: AbstractSlot) : Comparable<RemoteSlot> {
private val isDirty = AtomicBoolean()
final override fun compareTo(other: RemoteSlot): Int {
return parent.compareTo(other.parent)
}
init {
remoteSlots.add(this)
dirty.add(this)
}
fun markDirty() {
if (!isRemoved && isDirty.compareAndSet(false, true)) {
dirty.add(this)
}
}
fun markClean() {
isDirty.set(false)
}
open fun invalidate() {
markDirty()
}
open fun remove() {
isDirty.set(true)
remoteSlots.remove(this)
dirty.remove(this)
}
open fun write(stream: DataOutputStream) {
parent.write(stream)
}
override fun hashCode(): Int {
return parent.hashCode()
}
override fun equals(other: Any?): Boolean {
return this === other || other is RemoteSlot && other.parent == parent
}
}
internal inner class RemoteSetSlot<V>(parent: SetSlot<V>) : RemoteSlot(parent) {
val changelist = ArrayList<SetAction<V>>()
val changelistLock = Any()
override fun remove() {
super.remove()
changelist.clear()
}
override fun invalidate() {
synchronized(changelistLock) {
changelist.clear()
changelist.add(SetAction(KOptional(), CLEAR))
parent as SetSlot<V>
for (v in parent.delegate) {
changelist.add(SetAction(KOptional(parent.codec.copy(v)), ADD))
}
}
super.invalidate()
}
override fun write(stream: DataOutputStream) {
synchronized(changelistLock) {
for (change in changelist) {
stream.write(change.action)
change.value.ifPresent { (parent as SetSlot<V>).codec.write(stream, it) }
}
changelist.clear()
}
stream.write(0)
}
}
internal inner class RemoteMapSlot<K, V>(parent: MapSlot<K, V>) : RemoteSlot(parent) {
val changelist = ArrayList<MapAction<K, V>>()
val changelistLock = Any()
override fun remove() {
super.remove()
changelist.clear()
}
override fun invalidate() {
synchronized(changelistLock) {
changelist.clear()
changelist.add(MapAction(KOptional(), KOptional(), CLEAR))
parent as MapSlot<K, V>
for ((k, v) in parent.delegate) {
changelist.add(MapAction(KOptional(parent.keyCodec.copy(k)), KOptional(parent.valueCodec.copy(v)), ADD))
}
}
super.invalidate()
}
override fun write(stream: DataOutputStream) {
synchronized(changelistLock) {
for (change in changelist) {
stream.write(change.action)
change.key.ifPresent { (parent as MapSlot<K, V>).keyCodec.write(stream, it) }
change.value.ifPresent { (parent as MapSlot<K, V>).valueCodec.write(stream, it) }
}
changelist.clear()
}
stream.write(0)
}
}
init {
lock.withLock {
slots.forEach {
if (it != null) {
addSlot(it)
}
}
remotes.add(this)
}
}
internal fun addSlot(slot: AbstractSlot) {
if (slot is SetSlot<*>) {
slot.remoteSlots.add(RemoteSetSlot(slot))
} else if (slot is MapSlot<*, *>) {
slot.remoteSlots.add(RemoteMapSlot(slot))
} else {
slot.remoteSlots.add(RemoteSlot(slot))
}
}
/**
* Returns null of this remote is clean.
*
* [DelegateSyncher.observe] is not called automatically for performance
* reasons, you must call it manually.
*/
fun write(): FastByteArrayOutputStream? {
if (dirty.isNotEmpty()) {
val data = FastByteArrayOutputStream()
val stream = DataOutputStream(data)
val sorted = ObjectAVLTreeSet<RemoteSlot>()
var next = dirty.poll()
while (next != null) {
sorted.add(next)
next.markClean()
next = dirty.poll()
}
for (v in sorted) {
stream.writeVarInt(v.parent.id + 1)
v.write(stream)
}
stream.write(0)
return data
}
return null
}
/**
* Marks all networked slots dirty
*/
fun invalidate() {
remoteSlots.forEach { it.invalidate() }
}
override fun close() {
if (!isRemoved) {
lock.withLock {
if (!isRemoved) {
remoteSlots.forEach {
it.remove()
it.parent.remoteSlots.remove(it)
}
remotes.remove(this)
}
}
dirty.clear()
}
}
}
companion object {
private const val END = 0
private const val CLEAR = 1
private const val ADD = 2
private const val REMOVE = 3
}
}

View File

@ -1,318 +0,0 @@
package ru.dbotthepony.kommons.io
import ru.dbotthepony.kommons.math.RGBAColor
import ru.dbotthepony.kommons.util.KOptional
import java.io.DataInputStream
import java.io.DataOutputStream
import java.util.*
import kotlin.reflect.KClass
/**
* Represents value which can be encoded onto or decoded from stream.
*/
interface StreamCodec<V> {
fun read(stream: DataInputStream): V
fun write(stream: DataOutputStream, value: V)
/**
* Defensive copy
*/
fun copy(value: V): V
/**
* Optional custom equality check
*/
fun compare(a: V, b: V): Boolean {
return a == b
}
class Impl<V>(
private val reader: (stream: DataInputStream) -> V,
private val writer: (stream: DataOutputStream, value: V) -> Unit,
private val copier: ((value: V) -> V) = { it },
private val comparator: ((a: V, b: V) -> Boolean) = { a, b -> a == b }
) : StreamCodec<V> {
constructor(
reader: (stream: DataInputStream) -> V,
payloadSize: Long,
writer: (stream: DataOutputStream, value: V) -> Unit,
copier: ((value: V) -> V) = { it },
comparator: ((a: V, b: V) -> Boolean) = { a, b -> a == b }
) : this({ stream -> reader.invoke(stream) }, writer, copier, comparator)
override fun read(stream: DataInputStream): V {
return reader.invoke(stream)
}
override fun write(stream: DataOutputStream, value: V) {
writer.invoke(stream, value)
}
override fun copy(value: V): V {
return copier.invoke(value)
}
override fun compare(a: V, b: V): Boolean {
return comparator.invoke(a, b)
}
}
class Nullable<V>(val parent: StreamCodec<V>) : StreamCodec<V?> {
override fun read(stream: DataInputStream): V? {
return if (stream.read() == 0) null else parent.read(stream)
}
override fun write(stream: DataOutputStream, value: V?) {
if (value === null)
stream.write(0)
else {
stream.write(1)
parent.write(stream, value)
}
}
override fun copy(value: V?): V? {
return if (value === null) null else parent.copy(value)
}
override fun compare(a: V?, b: V?): Boolean {
if (a === null && b === null) return true
if (a === null || b === null) return false
return parent.compare(a, b)
}
}
class Collection<E, C : MutableCollection<E>>(val elementCodec: StreamCodec<E>, val collectionFactory: (Int) -> C) : StreamCodec<C> {
override fun read(stream: DataInputStream): C {
val size = stream.readVarInt()
if (size <= 0) {
return collectionFactory.invoke(0)
}
val collection = collectionFactory.invoke(size)
for (i in 0 until size) {
collection.add(elementCodec.read(stream))
}
return collection
}
override fun write(stream: DataOutputStream, value: C) {
stream.writeVarInt(value.size)
value.forEach { elementCodec.write(stream, it) }
}
override fun copy(value: C): C {
val new = collectionFactory.invoke(value.size)
value.forEach { new.add(elementCodec.copy(it)) }
return new
}
}
class Map<K, V, C : MutableMap<K, V>>(val keyCodec: StreamCodec<K>, val valueCodec: StreamCodec<V>, val factory: (Int) -> C): StreamCodec<C> {
override fun read(stream: DataInputStream): C {
return stream.readMap(keyCodec::read, valueCodec::read, factory)
}
override fun write(stream: DataOutputStream, value: C) {
stream.writeMap(value, keyCodec::write, valueCodec::write)
}
override fun copy(value: C): C {
val new = factory.invoke(value.size)
for ((k, v) in value.entries) {
new[keyCodec.copy(k)] = valueCodec.copy(v)
}
return new
}
}
class Enum<V : kotlin.Enum<V>>(clazz: Class<out V>) : StreamCodec<V> {
val clazz = searchClass(clazz)
val values: List<V> = listOf(*this.clazz.enumConstants!!)
val valuesMap = values.associateBy { it.name }
override fun read(stream: DataInputStream): V {
val id = stream.readVarInt()
return values.getOrNull(id) ?: throw NoSuchElementException("No such enum with index $id")
}
override fun write(stream: DataOutputStream, value: V) {
stream.writeVarInt(value.ordinal)
}
override fun copy(value: V): V {
return value
}
override fun compare(a: V, b: V): Boolean {
return a === b
}
companion object {
/**
* FIXME: enums with abstract methods which get compiled to subclasses, whose DO NOT expose "parent's" enum constants array
*
* is there an already existing solution?
*/
fun <V : kotlin.Enum<V>> searchClass(clazz: Class<out V>): Class<out V> {
var search: Class<*> = clazz
while (search.enumConstants == null && search.superclass != null) {
search = search.superclass
}
if (search.enumConstants == null) {
throw ClassCastException("$clazz does not represent an enum or enum subclass")
}
return search as Class<out V>
}
}
}
class KOptional<V>(val codec: StreamCodec<V>) : StreamCodec<ru.dbotthepony.kommons.util.KOptional<V>> {
override fun read(stream: DataInputStream): ru.dbotthepony.kommons.util.KOptional<V> {
return stream.readKOptional(codec::read)
}
override fun write(stream: DataOutputStream, value: ru.dbotthepony.kommons.util.KOptional<V>) {
stream.writeKOptional(value, codec::write)
}
override fun copy(value: ru.dbotthepony.kommons.util.KOptional<V>): ru.dbotthepony.kommons.util.KOptional<V> {
return value.map(codec::copy)
}
}
class Optional<V : Any>(val codec: StreamCodec<V>) : StreamCodec<java.util.Optional<V>> {
override fun read(stream: DataInputStream): java.util.Optional<V> {
return stream.readOptional(codec::read)
}
override fun write(stream: DataOutputStream, value: java.util.Optional<V>) {
stream.writeOptional(value, codec::write)
}
override fun copy(value: java.util.Optional<V>): java.util.Optional<V> {
return value.map(codec::copy)
}
}
class Mapping<T, R>(private val codec: StreamCodec<T>, private val from: T.() -> R, private val to: R.() -> T, private val copy: R.() -> R = { this }) : StreamCodec<R> {
override fun read(stream: DataInputStream): R {
return from(codec.read(stream))
}
override fun write(stream: DataOutputStream, value: R) {
codec.write(stream, to(value))
}
override fun copy(value: R): R {
return copy.invoke(value)
}
}
class Pair<L, R>(private val left: StreamCodec<L>, private val right: StreamCodec<R>) : StreamCodec<kotlin.Pair<L, R>> {
override fun read(stream: DataInputStream): kotlin.Pair<L, R> {
val left = left.read(stream)
val right = right.read(stream)
return left to right
}
override fun write(stream: DataOutputStream, value: kotlin.Pair<L, R>) {
left.write(stream, value.first)
right.write(stream, value.second)
}
override fun copy(value: kotlin.Pair<L, R>): kotlin.Pair<L, R> {
val left = left.copy(value.first)
val right = right.copy(value.second)
if (left !== this.left && right !== this.right) {
return left to right
} else {
return value
}
}
}
}
fun <T, R> StreamCodec<T>.map(from: T.() -> R, to: R.() -> T): StreamCodec<R> {
return StreamCodec.Mapping(this, from, to)
}
fun <T, R> StreamCodec<T>.map(from: T.() -> R, to: R.() -> T, copy: R.() -> R): StreamCodec<R> {
return StreamCodec.Mapping(this, from, to, copy)
}
fun <T : Any> StreamCodec<T>.optional(): StreamCodec<Optional<T>> = StreamCodec.Optional(this)
fun <T> StreamCodec<T>.koptional(): StreamCodec<KOptional<T>> = StreamCodec.KOptional(this)
fun <T> StreamCodec<T>.nullable(): StreamCodec<T?> = StreamCodec.Nullable(this)
val NullValueCodec = StreamCodec.Impl({ _ -> null }, { _, _ -> })
val BooleanValueCodec = StreamCodec.Impl(DataInputStream::readBoolean, 1L, DataOutputStream::writeBoolean)
val ByteValueCodec = StreamCodec.Impl(DataInputStream::readByte, 1L, { s, v -> s.writeByte(v.toInt()) })
val ShortValueCodec = StreamCodec.Impl(DataInputStream::readShort, 2L, { s, v -> s.writeShort(v.toInt()) })
val CharValueCodec = StreamCodec.Impl(DataInputStream::readChar, 2L, { s, v -> s.writeShort(v.code) })
val IntValueCodec = StreamCodec.Impl(DataInputStream::readInt, 4L, DataOutputStream::writeInt)
val LongValueCodec = StreamCodec.Impl(DataInputStream::readLong, 8L, DataOutputStream::writeLong)
val FloatValueCodec = StreamCodec.Impl(DataInputStream::readFloat, 4L, DataOutputStream::writeFloat)
val DoubleValueCodec = StreamCodec.Impl(DataInputStream::readDouble, 8L, DataOutputStream::writeDouble)
val BigDecimalValueCodec = StreamCodec.Impl(DataInputStream::readBigDecimal, DataOutputStream::writeBigDecimal)
val UUIDValueCodec = StreamCodec.Impl({ s -> UUID(s.readLong(), s.readLong()) }, { s, v -> s.writeLong(v.mostSignificantBits); s.writeLong(v.leastSignificantBits) })
val VarIntValueCodec = StreamCodec.Impl(DataInputStream::readSignedVarInt, DataOutputStream::writeSignedVarInt)
val VarLongValueCodec = StreamCodec.Impl(DataInputStream::readSignedVarLong, DataOutputStream::writeSignedVarLong)
val BinaryStringCodec = StreamCodec.Impl(DataInputStream::readBinaryString, DataOutputStream::writeBinaryString)
val UnsignedVarLongCodec = StreamCodec.Impl(DataInputStream::readVarLong, DataOutputStream::writeVarLong)
val UnsignedVarIntCodec = StreamCodec.Impl(DataInputStream::readVarInt, DataOutputStream::writeVarInt)
val ByteArrayCodec = StreamCodec.Impl(DataInputStream::readByteArray, DataOutputStream::writeByteArray)
val RGBCodec = StreamCodec.Impl(
{ s -> RGBAColor(s.readFloat(), s.readFloat(), s.readFloat()) },
{ s, v -> s.writeFloat(v.red); s.writeFloat(v.green); s.writeFloat(v.blue) })
val RGBACodec = StreamCodec.Impl(
{ s -> RGBAColor(s.readFloat(), s.readFloat(), s.readFloat(), s.readFloat()) },
{ s, v -> s.writeFloat(v.red); s.writeFloat(v.green); s.writeFloat(v.blue); s.writeFloat(v.alpha) })
val OptionalBooleanValueCodec = StreamCodec.Optional(BooleanValueCodec)
val OptionalByteValueCodec = StreamCodec.Optional(ByteValueCodec)
val OptionalShortValueCodec = StreamCodec.Optional(ShortValueCodec)
val OptionalCharValueCodec = StreamCodec.Optional(CharValueCodec)
val OptionalIntValueCodec = StreamCodec.Optional(IntValueCodec)
val OptionalLongValueCodec = StreamCodec.Optional(LongValueCodec)
val OptionalFloatValueCodec = StreamCodec.Optional(FloatValueCodec)
val OptionalDoubleValueCodec = StreamCodec.Optional(DoubleValueCodec)
val OptionalBigDecimalValueCodec = StreamCodec.Optional(BigDecimalValueCodec)
val OptionalUUIDValueCodec = StreamCodec.Optional(UUIDValueCodec)
val OptionalVarIntValueCodec = StreamCodec.Optional(VarIntValueCodec)
val OptionalVarLongValueCodec = StreamCodec.Optional(VarLongValueCodec)
val OptionalBinaryStringCodec = StreamCodec.Optional(BinaryStringCodec)
val OptionalRGBCodec = StreamCodec.Optional(RGBCodec)
val OptionalRGBACodec = StreamCodec.Optional(RGBACodec)
val KOptionalBooleanValueCodec = StreamCodec.KOptional(BooleanValueCodec)
val KOptionalByteValueCodec = StreamCodec.KOptional(ByteValueCodec)
val KOptionalShortValueCodec = StreamCodec.KOptional(ShortValueCodec)
val KOptionalCharValueCodec = StreamCodec.KOptional(CharValueCodec)
val KOptionalIntValueCodec = StreamCodec.KOptional(IntValueCodec)
val KOptionalLongValueCodec = StreamCodec.KOptional(LongValueCodec)
val KOptionalFloatValueCodec = StreamCodec.KOptional(FloatValueCodec)
val KOptionalDoubleValueCodec = StreamCodec.KOptional(DoubleValueCodec)
val KOptionalBigDecimalValueCodec = StreamCodec.KOptional(BigDecimalValueCodec)
val KOptionalUUIDValueCodec = StreamCodec.KOptional(UUIDValueCodec)
val KOptionalVarIntValueCodec = StreamCodec.KOptional(VarIntValueCodec)
val KOptionalVarLongValueCodec = StreamCodec.KOptional(VarLongValueCodec)
val KOptionalBinaryStringCodec = StreamCodec.KOptional(BinaryStringCodec)
val KOptionalRGBCodec = StreamCodec.KOptional(RGBCodec)
val KOptionalRGBACodec = StreamCodec.KOptional(RGBACodec)
fun <E : Enum<E>> Class<E>.codec() = StreamCodec.Enum(this)
fun <E : Enum<E>> KClass<E>.codec() = StreamCodec.Enum(this.java)

View File

@ -1,117 +0,0 @@
package ru.dbotthepony.kommons.test
import it.unimi.dsi.fastutil.io.FastByteArrayInputStream
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import ru.dbotthepony.kommons.io.DelegateSyncher
import ru.dbotthepony.kommons.util.Delegate
import kotlin.test.assertEquals
object DelegateSyncherTests {
@Test
@DisplayName("Delegate Syncher basic test")
fun test() {
val a = DelegateSyncher()
val b = DelegateSyncher()
a.int(0).delegate.accept(4)
val remote = a.Remote()
val payload = remote.write()!!
val v = b.int(0)
b.read(FastByteArrayInputStream(payload.array, 0, payload.length))
assertEquals(4, v.delegate.get())
}
@Test
@DisplayName("Delegate Syncher multiple delegates")
fun multiple() {
val a = DelegateSyncher()
val b = DelegateSyncher()
val ints = ArrayList<Delegate<Int>>()
val longs = ArrayList<Delegate<Long>>()
for (i in 0 .. 10) {
a.int().delegate.accept(i)
a.long().delegate.accept(i.toLong())
ints.add(b.int())
longs.add(b.long())
}
val remote = a.Remote()
val payload = remote.write()!!
b.read(FastByteArrayInputStream(payload.array, 0, payload.length))
for (i in 0 .. 10) {
assertEquals(i, ints[i].get())
assertEquals(i.toLong(), longs[i].get())
}
}
@Test
@DisplayName("Delegate Syncher network half of delegates")
fun half() {
val a = DelegateSyncher()
val b = DelegateSyncher()
val ints = ArrayList<Delegate<Int>>()
val longs = ArrayList<Delegate<Long>>()
for (i in 0 .. 10) {
if (i % 2 == 0) {
a.int()
a.long()
} else {
a.int().delegate.accept(i)
a.long().delegate.accept(i.toLong())
}
ints.add(b.int())
longs.add(b.long())
}
val remote = a.Remote()
val payload = remote.write()!!
b.read(FastByteArrayInputStream(payload.array, 0, payload.length))
for (i in 0 .. 10) {
if (i % 2 == 0) {
assertEquals(0, ints[i].get())
assertEquals(0L, longs[i].get())
} else {
assertEquals(i, ints[i].get())
assertEquals(i.toLong(), longs[i].get())
}
}
}
@Test
@DisplayName("Delegate Syncher >127 delegates")
fun moreThanByte() {
val a = DelegateSyncher()
val b = DelegateSyncher()
val ints = ArrayList<Delegate<Int>>()
val longs = ArrayList<Delegate<Long>>()
for (i in 0 .. 400) {
a.int().delegate.accept(i)
a.long().delegate.accept(i.toLong())
ints.add(b.int())
longs.add(b.long())
}
val remote = a.Remote()
val payload = remote.write()!!
b.read(FastByteArrayInputStream(payload.array, 0, payload.length))
for (i in 0 .. 400) {
assertEquals(i, ints[i].get())
assertEquals(i.toLong(), longs[i].get())
}
}
}