From 7609a23dd6a594ac07f44c41251d059d75028515 Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Sat, 10 Aug 2024 19:19:58 +0700 Subject: [PATCH] Remove streamcodec and delegate syncher --- gradle.properties | 2 +- .../dbotthepony/kommons/io/DelegateSyncher.kt | 846 ------------------ .../ru/dbotthepony/kommons/io/StreamCodec.kt | 318 ------- .../kommons/test/DelegateSyncherTests.kt | 117 --- 4 files changed, 1 insertion(+), 1282 deletions(-) delete mode 100644 src/main/kotlin/ru/dbotthepony/kommons/io/DelegateSyncher.kt delete mode 100644 src/main/kotlin/ru/dbotthepony/kommons/io/StreamCodec.kt delete mode 100644 src/test/kotlin/ru/dbotthepony/kommons/test/DelegateSyncherTests.kt diff --git a/gradle.properties b/gradle.properties index 64c5756..a04962d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -4,7 +4,7 @@ kotlin.code.style=official specifyKotlinAsDependency=false projectGroup=ru.dbotthepony.kommons -projectVersion=3.0.2 +projectVersion=3.1.0 guavaDepVersion=33.0.0 gsonDepVersion=2.8.9 diff --git a/src/main/kotlin/ru/dbotthepony/kommons/io/DelegateSyncher.kt b/src/main/kotlin/ru/dbotthepony/kommons/io/DelegateSyncher.kt deleted file mode 100644 index fbcf4b2..0000000 --- a/src/main/kotlin/ru/dbotthepony/kommons/io/DelegateSyncher.kt +++ /dev/null @@ -1,846 +0,0 @@ -package ru.dbotthepony.kommons.io - -import it.unimi.dsi.fastutil.ints.IntAVLTreeSet -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream -import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap -import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet -import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet -import ru.dbotthepony.kommons.collect.ListenableMap -import ru.dbotthepony.kommons.collect.ListenableSet -import ru.dbotthepony.kommons.util.Delegate -import ru.dbotthepony.kommons.util.DelegateGetter -import ru.dbotthepony.kommons.util.DelegateSetter -import ru.dbotthepony.kommons.util.KOptional -import ru.dbotthepony.kommons.util.Listenable -import ru.dbotthepony.kommons.util.ListenableDelegate -import ru.dbotthepony.kommons.util.Observer -import ru.dbotthepony.kommons.util.ValueObserver -import java.io.Closeable -import java.io.DataInputStream -import java.io.DataOutputStream -import java.io.InputStream -import java.util.* -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.CopyOnWriteArrayList -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.locks.ReentrantLock -import java.util.function.BooleanSupplier -import java.util.function.Consumer -import java.util.function.DoubleSupplier -import java.util.function.IntSupplier -import java.util.function.LongSupplier -import java.util.function.Supplier -import kotlin.collections.ArrayList -import kotlin.concurrent.withLock - -/** - * Universal, one-to-many delegate/value synchronizer. - * - * Values and delegates can be attached using [add], or by constructing subclassed directly. - * Delta changes are tracked by [DelegateSyncher.Remote] instances. - * - * In general, this class is not meant to be _structurally_ concurrently mutated by different threads, - * to avoid structure corruption a lock is employed. - * - * Attached delegates can be safely mutated by multiple threads concurrently - * (attached [ListenableDelegate] can call [Listenable.addListener] callback from different threads). - */ -@Suppress("UNCHECKED_CAST", "UNUSED") -class DelegateSyncher : Observer { - private val lock = ReentrantLock() - private val slots = ArrayList() - private val gaps = IntAVLTreeSet() - private val observers = ArrayList() - private val remotes = ArrayList() - private var isRemote = false - - override fun observe(): Boolean { - var any = false - observers.forEach { any = it.observe() || any } - return any - } - - fun read(stream: DataInputStream) { - isRemote = true - - var readID = stream.readVarInt() - - while (readID > 0) { - val slot = slots.getOrNull(readID - 1) ?: throw IndexOutOfBoundsException("Unknown networked slot ${readID - 1}!") - slot.read(stream) - readID = stream.readVarInt() - } - } - - fun read(stream: InputStream) { - if (stream is DataInputStream) - return read(stream) - - read(DataInputStream(stream)) - } - - abstract inner class AbstractSlot : Closeable, Observer, Comparable { - val id: Int - - protected val isRemoved = AtomicBoolean() - - fun remove() { - if (isRemoved.compareAndSet(false, true)) { - lock.withLock { - slots[id] = null - gaps.add(id) - - remoteSlots.forEach { - it.remove() - } - - remoteSlots.clear() - remove0() - } - } - } - - final override fun close() { - return remove() - } - - final override fun compareTo(other: AbstractSlot): Int { - return id.compareTo(other.id) - } - - protected abstract fun remove0() - internal abstract fun read(stream: DataInputStream) - internal abstract fun write(stream: DataOutputStream) - - internal val remoteSlots = CopyOnWriteArrayList() - - protected fun markDirty() { - if (!isRemote && !isRemoved.get()) { - remoteSlots.forEach { - it.markDirty() - } - } - } - - init { - lock.withLock { - if (gaps.isNotEmpty()) { - val gap = gaps.firstInt() - gaps.remove(gap) - id = gap - check(slots[id] == null) { "Expected slot $id to be empty" } - slots[id] = this - } else { - id = slots.size - slots.add(this) - } - - remotes.forEach { - it.addSlot(this) - } - } - } - } - - inner class Slot(val delegate: ListenableDelegate, val codec: StreamCodec) : AbstractSlot(), ListenableDelegate { - constructor(delegate: Supplier, codec: StreamCodec) : this(ListenableDelegate.CustomShadow(delegate, codec::compare, codec::copy), codec) - - private val l = delegate.addListener(Consumer { - markDirty() - listeners.accept(it) - }) - - init { - if (delegate is ListenableDelegate.AbstractShadow) { - observers.add(this) - } - } - - private val listeners = Listenable.Impl() - - override fun remove0() { - l.remove() - listeners.clear() - } - - override fun read(stream: DataInputStream) { - check(!isRemoved.get()) { "This network slot was removed" } - delegate.accept(codec.read(stream)) - } - - override fun write(stream: DataOutputStream) { - check(!isRemoved.get()) { "This network slot was removed" } - codec.write(stream, delegate.get()) - } - - override fun get(): V { - check(!isRemoved.get()) { "This network slot was removed" } - return delegate.get() - } - - override fun accept(t: V) { - check(!isRemoved.get()) { "This network slot was removed" } - delegate.accept(t) - } - - override fun addListener(listener: Consumer): Listenable.L { - check(!isRemoved.get()) { "This network slot was removed" } - return listeners.addListener(listener) - } - - override fun observe(): Boolean { - check(!isRemoved.get()) { "This network slot was removed" } - - if (delegate is ListenableDelegate.AbstractShadow) { - return delegate.observe() - } - - return false - } - } - - inner class ObservedSlot(val delegate: Delegate, val codec: StreamCodec) : AbstractSlot(), ListenableDelegate, ValueObserver { - private val listeners = Listenable.Impl() - private var observed: Any? = Mark - private val observeLock = ReentrantLock() - - init { - observers.add(this) - } - - override fun remove0() { - listeners.clear() - } - - override fun read(stream: DataInputStream) { - check(!isRemoved.get()) { "This network slot was removed" } - val read = codec.read(stream) - observed = codec.copy(read) - delegate.accept(read) - } - - override fun write(stream: DataOutputStream) { - check(!isRemoved.get()) { "This network slot was removed" } - codec.write(stream, delegate.get()) - } - - override fun observe(): Boolean { - val get = delegate.get() - - if (observed === Mark || !codec.compare(get, observed as V)) { - observeLock.withLock { - if (observed === Mark || !codec.compare(get, observed as V)) { - observed = codec.copy(get) - markDirty() - listeners.accept(get) - } - - return true - } - } - - return false - } - - override fun getAndObserve(): Pair { - val get = delegate.get() - - if (observed === Mark || !codec.compare(get, observed as V)) { - observeLock.withLock { - if (observed === Mark || !codec.compare(get, observed as V)) { - observed = codec.copy(get) - markDirty() - listeners.accept(get) - } - - return get to true - } - } - - return get to false - } - - override fun get(): V { - check(!isRemoved.get()) { "This network slot was removed" } - return delegate.get() - } - - override fun accept(t: V) { - check(!isRemoved.get()) { "This network slot was removed" } - delegate.accept(t) - } - - override fun addListener(listener: Consumer): Listenable.L { - check(!isRemoved.get()) { "This network slot was removed" } - return listeners.addListener(listener) - } - } - - private object Mark - - internal data class SetAction(val value: KOptional, val action: Int) - - inner class SetSlot(val delegate: ListenableSet, val codec: StreamCodec) : AbstractSlot() { - private val listener = object : ListenableSet.SetListener { - override fun onClear() { - remoteSlots.forEach { - (it as Remote.RemoteSetSlot).changelist.clear() - - synchronized(it.changelistLock) { - it.changelist.add(SetAction(KOptional(), CLEAR)) - } - - listeners.forEach { - it.listener.onClear() - } - - it.markDirty() - } - } - - override fun onValueAdded(element: V) { - remoteSlots.forEach { - it as Remote.RemoteSetSlot - - synchronized(it.changelistLock) { - it.changelist.removeIf { it.value.valueEquals(element) } - it.changelist.add(SetAction(KOptional(codec.copy(element)), ADD)) - } - - listeners.forEach { - it.listener.onValueAdded(element) - } - - it.markDirty() - } - } - - override fun onValueRemoved(element: V) { - remoteSlots.forEach { - it as Remote.RemoteSetSlot - - synchronized(it.changelistLock) { - it.changelist.removeIf { it.value.valueEquals(element) } - it.changelist.add(SetAction(KOptional(codec.copy(element)), REMOVE)) - } - - listeners.forEach { - it.listener.onValueRemoved(element) - } - - it.markDirty() - } - } - } - - private val l = delegate.addListener(listener) - private val listeners = CopyOnWriteArrayList() - - override fun remove0() { - l.remove() - listeners.clear() - } - - private inner class Listener(val listener: ListenableSet.SetListener): Listenable.L { - private var isRemoved = false - - init { - listeners.add(this) - } - - override fun remove() { - if (!isRemoved) { - isRemoved = true - listeners.remove(this) - } - } - } - - fun addListener(listener: ListenableSet.SetListener): Listenable.L { - check(!isRemoved.get()) { "This network slot was removed" } - return Listener(listener) - } - - fun addListener(listener: Runnable): Listenable.L { - check(!isRemoved.get()) { "This network slot was removed" } - return Listener(ListenableSet.RunnableAdapter(listener)) - } - - override fun read(stream: DataInputStream) { - check(!isRemoved.get()) { "This network slot was removed" } - var action = stream.read() - - while (true) { - when (action) { - ADD -> delegate.add(codec.read(stream)) - REMOVE -> delegate.remove(codec.read(stream)) - CLEAR -> delegate.clear() - else -> break - } - - action = stream.read() - } - } - - override fun write(stream: DataOutputStream) { - check(!isRemoved.get()) { "This network slot was removed" } - throw RuntimeException("unreachable code") - } - - override fun observe(): Boolean { - check(!isRemoved.get()) { "This network slot was removed" } - return false - } - } - - internal data class MapAction(val key: KOptional, val value: KOptional, val action: Int) - - inner class MapSlot(val delegate: ListenableMap, val keyCodec: StreamCodec, val valueCodec: StreamCodec) : AbstractSlot() { - private val listener = object : ListenableMap.MapListener { - override fun onClear() { - remoteSlots.forEach { - it as Remote.RemoteMapSlot - - synchronized(it.changelistLock) { - it.changelist.clear() - it.changelist.add(MapAction(KOptional(), KOptional(), CLEAR)) - } - - listeners.forEach { - it.listener.onClear() - } - - it.markDirty() - } - } - - override fun onValueAdded(key: K, value: V) { - remoteSlots.forEach { - it as Remote.RemoteMapSlot - - synchronized(it.changelistLock) { - it.changelist.removeIf { it.key.valueEquals(key) } - it.changelist.add(MapAction(KOptional(keyCodec.copy(key)), KOptional(valueCodec.copy(value)), ADD)) - } - - listeners.forEach { - it.listener.onValueAdded(key, value) - } - - it.markDirty() - } - } - - override fun onValueRemoved(key: K, value: V) { - remoteSlots.forEach { - it as Remote.RemoteMapSlot - - synchronized(it.changelistLock) { - it.changelist.removeIf { it.key.valueEquals(key) } - it.changelist.add(MapAction(KOptional(keyCodec.copy(key)), KOptional(), REMOVE)) - } - - listeners.forEach { - it.listener.onValueRemoved(key, value) - } - - it.markDirty() - } - } - } - - private val l = delegate.addListener(listener) - private val listeners = CopyOnWriteArrayList() - - override fun remove0() { - l.remove() - listeners.clear() - } - - private inner class Listener(val listener: ListenableMap.MapListener): Listenable.L { - private var isRemoved = false - - init { - listeners.add(this) - } - - override fun remove() { - if (!isRemoved) { - isRemoved = true - listeners.remove(this) - } - } - } - - fun addListener(listener: ListenableMap.MapListener): Listenable.L { - check(!isRemoved.get()) { "This network slot was removed" } - return Listener(listener) - } - - fun addListener(listener: Runnable): Listenable.L { - check(!isRemoved.get()) { "This network slot was removed" } - return Listener(ListenableMap.RunnableAdapter(listener)) - } - - override fun read(stream: DataInputStream) { - check(!isRemoved.get()) { "This network slot was removed" } - var action = stream.read() - - while (true) { - when (action) { - ADD -> delegate.put(keyCodec.read(stream), valueCodec.read(stream)) - REMOVE -> delegate.remove(keyCodec.read(stream)) - CLEAR -> delegate.clear() - else -> break - } - - action = stream.read() - } - } - - override fun write(stream: DataOutputStream) { - check(!isRemoved.get()) { "This network slot was removed" } - throw RuntimeException("unreachable code") - } - - override fun observe(): Boolean { - check(!isRemoved.get()) { "This network slot was removed" } - return false - } - } - - fun add(value: V, codec: StreamCodec, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { - return Slot(ListenableDelegate.maskSmart(value, getter, setter), codec) - } - - fun add(delegate: ListenableDelegate, codec: StreamCodec): Slot { - return Slot(delegate, codec) - } - - fun add(delegate: Delegate, codec: StreamCodec): ObservedSlot { - return ObservedSlot(delegate, codec) - } - - fun add(delegate: Supplier, codec: StreamCodec): Slot { - return computed(delegate, codec) - } - - fun add(delegate: ListenableSet, codec: StreamCodec): SetSlot { - return SetSlot(delegate, codec) - } - - fun add(delegate: ListenableMap, keyCodec: StreamCodec, valueCodec: StreamCodec): MapSlot { - return MapSlot(delegate, keyCodec, valueCodec) - } - - fun computedByte(delegate: Supplier) = Slot(ListenableDelegate.Shadow(delegate), ByteValueCodec) - fun computedShort(delegate: Supplier) = Slot(ListenableDelegate.Shadow(delegate), ShortValueCodec) - fun computedChar(delegate: Supplier) = Slot(ListenableDelegate.Shadow(delegate), CharValueCodec) - fun computedInt(delegate: Supplier) = Slot(ListenableDelegate.Shadow(delegate), VarIntValueCodec) - fun computedLong(delegate: Supplier) = Slot(ListenableDelegate.Shadow(delegate), VarLongValueCodec) - fun computedFloat(delegate: Supplier) = Slot(ListenableDelegate.Shadow(delegate), FloatValueCodec) - fun computedDouble(delegate: Supplier) = Slot(ListenableDelegate.Shadow(delegate), DoubleValueCodec) - fun computedBoolean(delegate: Supplier) = Slot(ListenableDelegate.Shadow(delegate), BooleanValueCodec) - fun computedString(delegate: Supplier) = Slot(ListenableDelegate.Shadow(delegate), BinaryStringCodec) - fun computedUUID(delegate: Supplier) = Slot(ListenableDelegate.Shadow(delegate), UUIDValueCodec) - fun computed(delegate: Supplier, codec: StreamCodec) = Slot(ListenableDelegate.CustomShadow(delegate, codec::compare, codec::copy), codec) - - fun computedInt(delegate: IntSupplier) = Slot(ListenableDelegate.Shadow(delegate::getAsInt), VarIntValueCodec) - fun computedLong(delegate: LongSupplier) = Slot(ListenableDelegate.Shadow(delegate::getAsLong), VarLongValueCodec) - fun computedDouble(delegate: DoubleSupplier) = Slot(ListenableDelegate.Shadow(delegate::getAsDouble), DoubleValueCodec) - fun computedBoolean(delegate: BooleanSupplier) = Slot(ListenableDelegate.Shadow(delegate::getAsBoolean), BooleanValueCodec) - - @JvmName("vbyte") - @JvmOverloads - fun byte(value: Byte = 0, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { - return add(ListenableDelegate.maskSmart(value, getter, setter), ByteValueCodec) - } - - @JvmName("vshort") - @JvmOverloads - fun short(value: Short = 0, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { - return add(ListenableDelegate.maskSmart(value, getter, setter), ShortValueCodec) - } - - @JvmName("vchar") - @JvmOverloads - fun char(value: Char, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { - return add(ListenableDelegate.maskSmart(value, getter, setter), CharValueCodec) - } - - @JvmName("vint") - @JvmOverloads - fun int(value: Int = 0, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { - return add(ListenableDelegate.maskSmart(value, getter, setter), VarIntValueCodec) - } - - @JvmName("vlong") - @JvmOverloads - fun long(value: Long = 0L, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { - return add(ListenableDelegate.maskSmart(value, getter, setter), VarLongValueCodec) - } - - @JvmName("vfloat") - @JvmOverloads - fun float(value: Float = 0f, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { - return add(ListenableDelegate.maskSmart(value, getter, setter), FloatValueCodec) - } - - @JvmName("vdouble") - @JvmOverloads - fun double(value: Double = 0.0, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { - return add(ListenableDelegate.maskSmart(value, getter, setter), DoubleValueCodec) - } - - @JvmName("vboolean") - @JvmOverloads - fun boolean(value: Boolean = false, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { - return add(ListenableDelegate.maskSmart(value, getter, setter), BooleanValueCodec) - } - - @JvmOverloads - fun string(value: String, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { - return add(ListenableDelegate.maskSmart(value, getter, setter), BinaryStringCodec) - } - - @JvmOverloads - fun uuid(value: UUID, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { - return add(ListenableDelegate.maskSmart(value, getter, setter), UUIDValueCodec) - } - - @JvmOverloads - fun set(codec: StreamCodec, backing: MutableSet = ObjectOpenHashSet()): SetSlot { - return add(ListenableSet(backing), codec) - } - - @JvmOverloads - fun map(keyCodec: StreamCodec, valueCodec: StreamCodec, backing: MutableMap = Object2ObjectOpenHashMap()): MapSlot { - return add(ListenableMap(backing), keyCodec, valueCodec) - } - - @JvmOverloads - fun > enum(value: E, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { - return add(ListenableDelegate.maskSmart(value, getter, setter), StreamCodec.Enum(value::class.java)) - } - - /** - * Remotes must be [close]d when no longer in use, otherwise they will - * leak memory and decrease performance of syncher. - * - * To get changes to be networked to remote client, [write] should be called. - */ - inner class Remote : Closeable { - @Volatile - private var isRemoved = false - internal val dirty = ConcurrentLinkedQueue() - private val remoteSlots = CopyOnWriteArrayList() - - internal open inner class RemoteSlot(val parent: AbstractSlot) : Comparable { - private val isDirty = AtomicBoolean() - - final override fun compareTo(other: RemoteSlot): Int { - return parent.compareTo(other.parent) - } - - init { - remoteSlots.add(this) - dirty.add(this) - } - - fun markDirty() { - if (!isRemoved && isDirty.compareAndSet(false, true)) { - dirty.add(this) - } - } - - fun markClean() { - isDirty.set(false) - } - - open fun invalidate() { - markDirty() - } - - open fun remove() { - isDirty.set(true) - remoteSlots.remove(this) - dirty.remove(this) - } - - open fun write(stream: DataOutputStream) { - parent.write(stream) - } - - override fun hashCode(): Int { - return parent.hashCode() - } - - override fun equals(other: Any?): Boolean { - return this === other || other is RemoteSlot && other.parent == parent - } - } - - internal inner class RemoteSetSlot(parent: SetSlot) : RemoteSlot(parent) { - val changelist = ArrayList>() - val changelistLock = Any() - - override fun remove() { - super.remove() - changelist.clear() - } - - override fun invalidate() { - synchronized(changelistLock) { - changelist.clear() - changelist.add(SetAction(KOptional(), CLEAR)) - - parent as SetSlot - - for (v in parent.delegate) { - changelist.add(SetAction(KOptional(parent.codec.copy(v)), ADD)) - } - } - - super.invalidate() - } - - override fun write(stream: DataOutputStream) { - synchronized(changelistLock) { - for (change in changelist) { - stream.write(change.action) - change.value.ifPresent { (parent as SetSlot).codec.write(stream, it) } - } - - changelist.clear() - } - - stream.write(0) - } - } - - internal inner class RemoteMapSlot(parent: MapSlot) : RemoteSlot(parent) { - val changelist = ArrayList>() - val changelistLock = Any() - - override fun remove() { - super.remove() - changelist.clear() - } - - override fun invalidate() { - synchronized(changelistLock) { - changelist.clear() - changelist.add(MapAction(KOptional(), KOptional(), CLEAR)) - - parent as MapSlot - - for ((k, v) in parent.delegate) { - changelist.add(MapAction(KOptional(parent.keyCodec.copy(k)), KOptional(parent.valueCodec.copy(v)), ADD)) - } - } - - super.invalidate() - } - - override fun write(stream: DataOutputStream) { - synchronized(changelistLock) { - for (change in changelist) { - stream.write(change.action) - change.key.ifPresent { (parent as MapSlot).keyCodec.write(stream, it) } - change.value.ifPresent { (parent as MapSlot).valueCodec.write(stream, it) } - } - - changelist.clear() - } - - stream.write(0) - } - } - - init { - lock.withLock { - slots.forEach { - if (it != null) { - addSlot(it) - } - } - - remotes.add(this) - } - } - - internal fun addSlot(slot: AbstractSlot) { - if (slot is SetSlot<*>) { - slot.remoteSlots.add(RemoteSetSlot(slot)) - } else if (slot is MapSlot<*, *>) { - slot.remoteSlots.add(RemoteMapSlot(slot)) - } else { - slot.remoteSlots.add(RemoteSlot(slot)) - } - } - - /** - * Returns null of this remote is clean. - * - * [DelegateSyncher.observe] is not called automatically for performance - * reasons, you must call it manually. - */ - fun write(): FastByteArrayOutputStream? { - if (dirty.isNotEmpty()) { - val data = FastByteArrayOutputStream() - val stream = DataOutputStream(data) - - val sorted = ObjectAVLTreeSet() - var next = dirty.poll() - - while (next != null) { - sorted.add(next) - next.markClean() - next = dirty.poll() - } - - for (v in sorted) { - stream.writeVarInt(v.parent.id + 1) - v.write(stream) - } - - stream.write(0) - - return data - } - - return null - } - - /** - * Marks all networked slots dirty - */ - fun invalidate() { - remoteSlots.forEach { it.invalidate() } - } - - override fun close() { - if (!isRemoved) { - lock.withLock { - if (!isRemoved) { - remoteSlots.forEach { - it.remove() - it.parent.remoteSlots.remove(it) - } - - remotes.remove(this) - } - } - - dirty.clear() - } - } - } - - companion object { - private const val END = 0 - private const val CLEAR = 1 - private const val ADD = 2 - private const val REMOVE = 3 - } -} diff --git a/src/main/kotlin/ru/dbotthepony/kommons/io/StreamCodec.kt b/src/main/kotlin/ru/dbotthepony/kommons/io/StreamCodec.kt deleted file mode 100644 index c28e927..0000000 --- a/src/main/kotlin/ru/dbotthepony/kommons/io/StreamCodec.kt +++ /dev/null @@ -1,318 +0,0 @@ -package ru.dbotthepony.kommons.io - -import ru.dbotthepony.kommons.math.RGBAColor -import ru.dbotthepony.kommons.util.KOptional -import java.io.DataInputStream -import java.io.DataOutputStream -import java.util.* -import kotlin.reflect.KClass - -/** - * Represents value which can be encoded onto or decoded from stream. - */ -interface StreamCodec { - fun read(stream: DataInputStream): V - fun write(stream: DataOutputStream, value: V) - - /** - * Defensive copy - */ - fun copy(value: V): V - - /** - * Optional custom equality check - */ - fun compare(a: V, b: V): Boolean { - return a == b - } - - class Impl( - private val reader: (stream: DataInputStream) -> V, - private val writer: (stream: DataOutputStream, value: V) -> Unit, - private val copier: ((value: V) -> V) = { it }, - private val comparator: ((a: V, b: V) -> Boolean) = { a, b -> a == b } - ) : StreamCodec { - constructor( - reader: (stream: DataInputStream) -> V, - payloadSize: Long, - writer: (stream: DataOutputStream, value: V) -> Unit, - copier: ((value: V) -> V) = { it }, - comparator: ((a: V, b: V) -> Boolean) = { a, b -> a == b } - ) : this({ stream -> reader.invoke(stream) }, writer, copier, comparator) - - override fun read(stream: DataInputStream): V { - return reader.invoke(stream) - } - - override fun write(stream: DataOutputStream, value: V) { - writer.invoke(stream, value) - } - - override fun copy(value: V): V { - return copier.invoke(value) - } - - override fun compare(a: V, b: V): Boolean { - return comparator.invoke(a, b) - } - } - - class Nullable(val parent: StreamCodec) : StreamCodec { - override fun read(stream: DataInputStream): V? { - return if (stream.read() == 0) null else parent.read(stream) - } - - override fun write(stream: DataOutputStream, value: V?) { - if (value === null) - stream.write(0) - else { - stream.write(1) - parent.write(stream, value) - } - } - - override fun copy(value: V?): V? { - return if (value === null) null else parent.copy(value) - } - - override fun compare(a: V?, b: V?): Boolean { - if (a === null && b === null) return true - if (a === null || b === null) return false - return parent.compare(a, b) - } - } - - class Collection>(val elementCodec: StreamCodec, val collectionFactory: (Int) -> C) : StreamCodec { - override fun read(stream: DataInputStream): C { - val size = stream.readVarInt() - - if (size <= 0) { - return collectionFactory.invoke(0) - } - - val collection = collectionFactory.invoke(size) - - for (i in 0 until size) { - collection.add(elementCodec.read(stream)) - } - - return collection - } - - override fun write(stream: DataOutputStream, value: C) { - stream.writeVarInt(value.size) - value.forEach { elementCodec.write(stream, it) } - } - - override fun copy(value: C): C { - val new = collectionFactory.invoke(value.size) - value.forEach { new.add(elementCodec.copy(it)) } - return new - } - } - - class Map>(val keyCodec: StreamCodec, val valueCodec: StreamCodec, val factory: (Int) -> C): StreamCodec { - override fun read(stream: DataInputStream): C { - return stream.readMap(keyCodec::read, valueCodec::read, factory) - } - - override fun write(stream: DataOutputStream, value: C) { - stream.writeMap(value, keyCodec::write, valueCodec::write) - } - - override fun copy(value: C): C { - val new = factory.invoke(value.size) - - for ((k, v) in value.entries) { - new[keyCodec.copy(k)] = valueCodec.copy(v) - } - - return new - } - } - - class Enum>(clazz: Class) : StreamCodec { - val clazz = searchClass(clazz) - val values: List = listOf(*this.clazz.enumConstants!!) - val valuesMap = values.associateBy { it.name } - - override fun read(stream: DataInputStream): V { - val id = stream.readVarInt() - return values.getOrNull(id) ?: throw NoSuchElementException("No such enum with index $id") - } - - override fun write(stream: DataOutputStream, value: V) { - stream.writeVarInt(value.ordinal) - } - - override fun copy(value: V): V { - return value - } - - override fun compare(a: V, b: V): Boolean { - return a === b - } - - companion object { - /** - * FIXME: enums with abstract methods which get compiled to subclasses, whose DO NOT expose "parent's" enum constants array - * - * is there an already existing solution? - */ - fun > searchClass(clazz: Class): Class { - var search: Class<*> = clazz - - while (search.enumConstants == null && search.superclass != null) { - search = search.superclass - } - - if (search.enumConstants == null) { - throw ClassCastException("$clazz does not represent an enum or enum subclass") - } - - return search as Class - } - } - } - - class KOptional(val codec: StreamCodec) : StreamCodec> { - override fun read(stream: DataInputStream): ru.dbotthepony.kommons.util.KOptional { - return stream.readKOptional(codec::read) - } - - override fun write(stream: DataOutputStream, value: ru.dbotthepony.kommons.util.KOptional) { - stream.writeKOptional(value, codec::write) - } - - override fun copy(value: ru.dbotthepony.kommons.util.KOptional): ru.dbotthepony.kommons.util.KOptional { - return value.map(codec::copy) - } - } - - class Optional(val codec: StreamCodec) : StreamCodec> { - override fun read(stream: DataInputStream): java.util.Optional { - return stream.readOptional(codec::read) - } - - override fun write(stream: DataOutputStream, value: java.util.Optional) { - stream.writeOptional(value, codec::write) - } - - override fun copy(value: java.util.Optional): java.util.Optional { - return value.map(codec::copy) - } - } - - class Mapping(private val codec: StreamCodec, private val from: T.() -> R, private val to: R.() -> T, private val copy: R.() -> R = { this }) : StreamCodec { - override fun read(stream: DataInputStream): R { - return from(codec.read(stream)) - } - - override fun write(stream: DataOutputStream, value: R) { - codec.write(stream, to(value)) - } - - override fun copy(value: R): R { - return copy.invoke(value) - } - } - - class Pair(private val left: StreamCodec, private val right: StreamCodec) : StreamCodec> { - override fun read(stream: DataInputStream): kotlin.Pair { - val left = left.read(stream) - val right = right.read(stream) - return left to right - } - - override fun write(stream: DataOutputStream, value: kotlin.Pair) { - left.write(stream, value.first) - right.write(stream, value.second) - } - - override fun copy(value: kotlin.Pair): kotlin.Pair { - val left = left.copy(value.first) - val right = right.copy(value.second) - - if (left !== this.left && right !== this.right) { - return left to right - } else { - return value - } - } - } -} - -fun StreamCodec.map(from: T.() -> R, to: R.() -> T): StreamCodec { - return StreamCodec.Mapping(this, from, to) -} - -fun StreamCodec.map(from: T.() -> R, to: R.() -> T, copy: R.() -> R): StreamCodec { - return StreamCodec.Mapping(this, from, to, copy) -} - -fun StreamCodec.optional(): StreamCodec> = StreamCodec.Optional(this) -fun StreamCodec.koptional(): StreamCodec> = StreamCodec.KOptional(this) -fun StreamCodec.nullable(): StreamCodec = StreamCodec.Nullable(this) - -val NullValueCodec = StreamCodec.Impl({ _ -> null }, { _, _ -> }) -val BooleanValueCodec = StreamCodec.Impl(DataInputStream::readBoolean, 1L, DataOutputStream::writeBoolean) -val ByteValueCodec = StreamCodec.Impl(DataInputStream::readByte, 1L, { s, v -> s.writeByte(v.toInt()) }) -val ShortValueCodec = StreamCodec.Impl(DataInputStream::readShort, 2L, { s, v -> s.writeShort(v.toInt()) }) -val CharValueCodec = StreamCodec.Impl(DataInputStream::readChar, 2L, { s, v -> s.writeShort(v.code) }) -val IntValueCodec = StreamCodec.Impl(DataInputStream::readInt, 4L, DataOutputStream::writeInt) -val LongValueCodec = StreamCodec.Impl(DataInputStream::readLong, 8L, DataOutputStream::writeLong) -val FloatValueCodec = StreamCodec.Impl(DataInputStream::readFloat, 4L, DataOutputStream::writeFloat) -val DoubleValueCodec = StreamCodec.Impl(DataInputStream::readDouble, 8L, DataOutputStream::writeDouble) -val BigDecimalValueCodec = StreamCodec.Impl(DataInputStream::readBigDecimal, DataOutputStream::writeBigDecimal) -val UUIDValueCodec = StreamCodec.Impl({ s -> UUID(s.readLong(), s.readLong()) }, { s, v -> s.writeLong(v.mostSignificantBits); s.writeLong(v.leastSignificantBits) }) -val VarIntValueCodec = StreamCodec.Impl(DataInputStream::readSignedVarInt, DataOutputStream::writeSignedVarInt) -val VarLongValueCodec = StreamCodec.Impl(DataInputStream::readSignedVarLong, DataOutputStream::writeSignedVarLong) -val BinaryStringCodec = StreamCodec.Impl(DataInputStream::readBinaryString, DataOutputStream::writeBinaryString) - -val UnsignedVarLongCodec = StreamCodec.Impl(DataInputStream::readVarLong, DataOutputStream::writeVarLong) -val UnsignedVarIntCodec = StreamCodec.Impl(DataInputStream::readVarInt, DataOutputStream::writeVarInt) - -val ByteArrayCodec = StreamCodec.Impl(DataInputStream::readByteArray, DataOutputStream::writeByteArray) - -val RGBCodec = StreamCodec.Impl( - { s -> RGBAColor(s.readFloat(), s.readFloat(), s.readFloat()) }, - { s, v -> s.writeFloat(v.red); s.writeFloat(v.green); s.writeFloat(v.blue) }) - -val RGBACodec = StreamCodec.Impl( - { s -> RGBAColor(s.readFloat(), s.readFloat(), s.readFloat(), s.readFloat()) }, - { s, v -> s.writeFloat(v.red); s.writeFloat(v.green); s.writeFloat(v.blue); s.writeFloat(v.alpha) }) - -val OptionalBooleanValueCodec = StreamCodec.Optional(BooleanValueCodec) -val OptionalByteValueCodec = StreamCodec.Optional(ByteValueCodec) -val OptionalShortValueCodec = StreamCodec.Optional(ShortValueCodec) -val OptionalCharValueCodec = StreamCodec.Optional(CharValueCodec) -val OptionalIntValueCodec = StreamCodec.Optional(IntValueCodec) -val OptionalLongValueCodec = StreamCodec.Optional(LongValueCodec) -val OptionalFloatValueCodec = StreamCodec.Optional(FloatValueCodec) -val OptionalDoubleValueCodec = StreamCodec.Optional(DoubleValueCodec) -val OptionalBigDecimalValueCodec = StreamCodec.Optional(BigDecimalValueCodec) -val OptionalUUIDValueCodec = StreamCodec.Optional(UUIDValueCodec) -val OptionalVarIntValueCodec = StreamCodec.Optional(VarIntValueCodec) -val OptionalVarLongValueCodec = StreamCodec.Optional(VarLongValueCodec) -val OptionalBinaryStringCodec = StreamCodec.Optional(BinaryStringCodec) -val OptionalRGBCodec = StreamCodec.Optional(RGBCodec) -val OptionalRGBACodec = StreamCodec.Optional(RGBACodec) - -val KOptionalBooleanValueCodec = StreamCodec.KOptional(BooleanValueCodec) -val KOptionalByteValueCodec = StreamCodec.KOptional(ByteValueCodec) -val KOptionalShortValueCodec = StreamCodec.KOptional(ShortValueCodec) -val KOptionalCharValueCodec = StreamCodec.KOptional(CharValueCodec) -val KOptionalIntValueCodec = StreamCodec.KOptional(IntValueCodec) -val KOptionalLongValueCodec = StreamCodec.KOptional(LongValueCodec) -val KOptionalFloatValueCodec = StreamCodec.KOptional(FloatValueCodec) -val KOptionalDoubleValueCodec = StreamCodec.KOptional(DoubleValueCodec) -val KOptionalBigDecimalValueCodec = StreamCodec.KOptional(BigDecimalValueCodec) -val KOptionalUUIDValueCodec = StreamCodec.KOptional(UUIDValueCodec) -val KOptionalVarIntValueCodec = StreamCodec.KOptional(VarIntValueCodec) -val KOptionalVarLongValueCodec = StreamCodec.KOptional(VarLongValueCodec) -val KOptionalBinaryStringCodec = StreamCodec.KOptional(BinaryStringCodec) -val KOptionalRGBCodec = StreamCodec.KOptional(RGBCodec) -val KOptionalRGBACodec = StreamCodec.KOptional(RGBACodec) - -fun > Class.codec() = StreamCodec.Enum(this) -fun > KClass.codec() = StreamCodec.Enum(this.java) diff --git a/src/test/kotlin/ru/dbotthepony/kommons/test/DelegateSyncherTests.kt b/src/test/kotlin/ru/dbotthepony/kommons/test/DelegateSyncherTests.kt deleted file mode 100644 index db3e662..0000000 --- a/src/test/kotlin/ru/dbotthepony/kommons/test/DelegateSyncherTests.kt +++ /dev/null @@ -1,117 +0,0 @@ -package ru.dbotthepony.kommons.test - -import it.unimi.dsi.fastutil.io.FastByteArrayInputStream -import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Test -import ru.dbotthepony.kommons.io.DelegateSyncher -import ru.dbotthepony.kommons.util.Delegate -import kotlin.test.assertEquals - -object DelegateSyncherTests { - @Test - @DisplayName("Delegate Syncher basic test") - fun test() { - val a = DelegateSyncher() - val b = DelegateSyncher() - - a.int(0).delegate.accept(4) - val remote = a.Remote() - val payload = remote.write()!! - - val v = b.int(0) - b.read(FastByteArrayInputStream(payload.array, 0, payload.length)) - - assertEquals(4, v.delegate.get()) - } - - @Test - @DisplayName("Delegate Syncher multiple delegates") - fun multiple() { - val a = DelegateSyncher() - val b = DelegateSyncher() - val ints = ArrayList>() - val longs = ArrayList>() - - for (i in 0 .. 10) { - a.int().delegate.accept(i) - a.long().delegate.accept(i.toLong()) - - ints.add(b.int()) - longs.add(b.long()) - } - - val remote = a.Remote() - val payload = remote.write()!! - - b.read(FastByteArrayInputStream(payload.array, 0, payload.length)) - - for (i in 0 .. 10) { - assertEquals(i, ints[i].get()) - assertEquals(i.toLong(), longs[i].get()) - } - } - - @Test - @DisplayName("Delegate Syncher network half of delegates") - fun half() { - val a = DelegateSyncher() - val b = DelegateSyncher() - val ints = ArrayList>() - val longs = ArrayList>() - - for (i in 0 .. 10) { - if (i % 2 == 0) { - a.int() - a.long() - } else { - a.int().delegate.accept(i) - a.long().delegate.accept(i.toLong()) - } - - ints.add(b.int()) - longs.add(b.long()) - } - - val remote = a.Remote() - val payload = remote.write()!! - - b.read(FastByteArrayInputStream(payload.array, 0, payload.length)) - - for (i in 0 .. 10) { - if (i % 2 == 0) { - assertEquals(0, ints[i].get()) - assertEquals(0L, longs[i].get()) - } else { - assertEquals(i, ints[i].get()) - assertEquals(i.toLong(), longs[i].get()) - } - } - } - - @Test - @DisplayName("Delegate Syncher >127 delegates") - fun moreThanByte() { - val a = DelegateSyncher() - val b = DelegateSyncher() - val ints = ArrayList>() - val longs = ArrayList>() - - for (i in 0 .. 400) { - a.int().delegate.accept(i) - a.long().delegate.accept(i.toLong()) - - ints.add(b.int()) - longs.add(b.long()) - } - - val remote = a.Remote() - val payload = remote.write()!! - - b.read(FastByteArrayInputStream(payload.array, 0, payload.length)) - - for (i in 0 .. 400) { - assertEquals(i, ints[i].get()) - assertEquals(i.toLong(), longs[i].get()) - } - } -} \ No newline at end of file