diff --git a/gradle.properties b/gradle.properties index bdb1a04..e195eff 100644 --- a/gradle.properties +++ b/gradle.properties @@ -4,7 +4,7 @@ kotlin.code.style=official specifyKotlinAsDependency=false projectGroup=ru.dbotthepony.kommons -projectVersion=2.7.10 +projectVersion=2.8.0 guavaDepVersion=33.0.0 gsonDepVersion=2.8.9 diff --git a/src/main/kotlin/ru/dbotthepony/kommons/io/DelegateSyncher.kt b/src/main/kotlin/ru/dbotthepony/kommons/io/DelegateSyncher.kt index 0a18f3c..83b5525 100644 --- a/src/main/kotlin/ru/dbotthepony/kommons/io/DelegateSyncher.kt +++ b/src/main/kotlin/ru/dbotthepony/kommons/io/DelegateSyncher.kt @@ -11,9 +11,12 @@ import ru.dbotthepony.kommons.math.Decimal import ru.dbotthepony.kommons.util.Delegate import ru.dbotthepony.kommons.util.DelegateGetter import ru.dbotthepony.kommons.util.DelegateSetter +import ru.dbotthepony.kommons.util.KOptional import ru.dbotthepony.kommons.util.Listenable import ru.dbotthepony.kommons.util.ListenableDelegate import ru.dbotthepony.kommons.util.Observer +import ru.dbotthepony.kommons.util.ValueObserver +import java.io.Closeable import java.io.DataInputStream import java.io.DataOutputStream import java.io.InputStream @@ -24,43 +27,28 @@ import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock import java.util.function.Consumer import java.util.function.Supplier -import kotlin.ConcurrentModificationException import kotlin.collections.ArrayList import kotlin.concurrent.withLock /** - * Synches attached delegates by serializing delta state on one end and deserializing on another. + * Universal, one-to-many delegate/value synchronizer. * - * Precautions when using this class: - * 1. Entire class is **not** thread safe, with the exception of attached delegates triggering [Listenable.addListener] listeners - * (this includes calling [observe] from multiple threads, but why would you do this). - * 2. [addListener] listeners are expected to be called once when [isDirty] turns true **on best effort**. - * If syncher is running in highly congested environment (multiple threads are mutating attached delegates) - * listeners may be called more than once IF [constructPayload] call is ongoing. - * 3. [addListener] listeners on both [DelegateSyncher] and [DelegateSyncher.Slot]s are called on the same thread - * where delegate mutation/observation/etc occurred. If you need to mutate [DelegateSyncher] internal structure (add/remove - * slots) you **MUST** synchronize mutations by yourself if listener in question is expected to be called from multiple threads. - * 4. When using [DelegateSyncher] in single-threaded context, all operations are legal at any point in time, including [DelegateSyncher.Slot] - * addition/removal during [observe], [readPayload] or [addListener] callback on either [DelegateSyncher] or [DelegateSyncher.Slot]. - * 5. Calling [DelegateSyncher.AbstractSlot.remove] does not shift any subsequent slots down, leaving all slot IDs intact. - * Gaps are filled when new slots are created. + * Values and delegates can be attached using [add], or by constructing subclassed directly. + * Delta changes are tracked by [DelegateSyncher.Remote] instances. + * + * In general, this class is not meant to be concurrently mutated by different threads, + * to avoid structure corruption a lock is employed. + * + * Attached delegates can be safely mutated by multiple threads concurrently + * (attached [ListenableDelegate] can call [Listenable.addListener] callback concurrently). */ -class DelegateSyncher : Listenable, Observer { +class DelegateSyncher : Observer { + private val lock = ReentrantLock() private val slots = ArrayList() - private val observers = CopyOnWriteArrayList() - private val dirtySlots = ConcurrentLinkedQueue() - private val dirtySubs = Listenable.Void() private val gaps = IntAVLTreeSet() - - @Volatile - private var changeset = 0 - - var isRemote = false - private set - - private val isDirtyF = AtomicBoolean() - val isDirty: Boolean - get() = isDirtyF.get() + private val observers = ArrayList() + private val remotes = ArrayList() + private var isRemote = false override fun observe(): Boolean { var any = false @@ -68,485 +56,362 @@ class DelegateSyncher : Listenable, Observer { return any } - override fun addListener(listener: Runnable): Listenable.L { - return dirtySubs.addListener(listener) + fun read(stream: DataInputStream) { + isRemote = true } - override fun addListener(listener: Consumer): Listenable.L { - return dirtySubs.addListener(listener) + fun read(stream: InputStream) { + if (stream is DataInputStream) + return read(stream) + + read(DataInputStream(stream)) } - fun constructPayload(): FastByteArrayOutputStream? { - if (!isDirtyF.get()) return null - - val slots = ObjectAVLTreeSet() - - do { - var poll = dirtySlots.poll() - - while (poll != null) { - slots.add(poll) - poll = dirtySlots.poll() - } - } while (isDirtyF.compareAndSet(true, false)) - - val stream = FastByteArrayOutputStream() - val dataStream = DataOutputStream(stream) - - for (slot in slots) { - dataStream.writeVarInt(slot.id + 1) - slot.write(dataStream) - } - - dataStream.writeVarInt(0) - return stream - } - - fun readPayload(stream: DataInputStream) { - var nextId = stream.readVarInt() - - while (nextId > 0) { - val slot = slots - nextId = stream.readVarInt() - } - } - - fun readPayload(stream: InputStream) { - readPayload(DataInputStream(stream)) - } - - abstract inner class AbstractSlot : Observer, Comparable { - protected val isDirtySlotF = AtomicBoolean(true) - - val isDirty: Boolean - get() = isDirtySlotF.get() - - var isRemoved = false - private set - + abstract inner class AbstractSlot : Closeable, Observer, Comparable { val id: Int - init { - val changeset = ++this@DelegateSyncher.changeset + protected val isRemoved = AtomicBoolean() - if (gaps.isNotEmpty()) { - val gap = gaps.firstInt() - gaps.remove(gap) - id = gap - check(slots[id] == null) { "Expected slot $id to be empty" } - slots[id] = this - } else { - id = slots.size - slots.add(this) + fun remove() { + if (isRemoved.compareAndSet(false, true)) { + lock.withLock { + slots[id] = null + gaps.add(id) + + remoteSlots.forEach { + it.remove() + } + + remoteSlots.clear() + remove0() + } } + } - dirtySlots.add(this) - - if (changeset != ++this@DelegateSyncher.changeset) { - throw ConcurrentModificationException("Manipulating DelegateSyncher across multiple threads") - } - - if (isDirtyF.compareAndSet(false, true)) { - dirtySubs.run() - } + final override fun close() { + return remove() } final override fun compareTo(other: AbstractSlot): Int { return id.compareTo(other.id) } - protected abstract fun actuallyRemove() + protected abstract fun remove0() + internal abstract fun read(stream: DataInputStream) + internal abstract fun write(stream: DataOutputStream) + + internal val remoteSlots = CopyOnWriteArrayList() protected fun markDirty() { - check(!isRemoved) { "This network slot was removed" } - - if (!isRemote && isDirtySlotF.compareAndSet(false, true)) { - dirtySlots.add(this) - - if (!isDirtyF.get() && !dirtySlots.isEmpty() && isDirtyF.compareAndSet(false, true) && !dirtySlots.isEmpty()) { - dirtySubs.run() + if (!isRemote && !isRemoved.get()) { + remoteSlots.forEach { + it.markDirty() } } } - fun remove() { - if (!isRemoved) { - val changeset = ++this@DelegateSyncher.changeset - - isRemoved = true - slots[id] = null - observers.remove(this) - - if (id == slots.size - 1) { - slots.removeAt(id) + init { + lock.withLock { + if (gaps.isNotEmpty()) { + val gap = gaps.firstInt() + gaps.remove(gap) + id = gap + check(slots[id] == null) { "Expected slot $id to be empty" } + slots[id] = this } else { - gaps.add(id) + id = slots.size + slots.add(this) } - if (isDirtySlotF.get()) { - dirtySlots.remove(this) - this@DelegateSyncher.isDirtyF.set(dirtySlots.isNotEmpty()) - } - - actuallyRemove() - - if (changeset != ++this@DelegateSyncher.changeset) { - throw ConcurrentModificationException("Manipulating DelegateSyncher across multiple threads") + remotes.forEach { + it.addSlot(this) } } } - - abstract fun write(stream: DataOutputStream) - abstract fun read(stream: DataInputStream) } - inner class Slot(private val parent: ListenableDelegate, private val codec: StreamCodec) : AbstractSlot(), ListenableDelegate { - private val subs = Listenable.Impl() - - private val parentSub = parent.addListener(Consumer { + inner class Slot(val delegate: ListenableDelegate, val codec: StreamCodec) : AbstractSlot(), ListenableDelegate { + private val l = delegate.addListener(Runnable { markDirty() - subs.accept(it) }) + init { + if (delegate is ListenableDelegate.AbstractShadow) { + observers.add(this) + } + } + + private val listeners = Listenable.Impl() + + override fun remove0() { + l.remove() + listeners.clear() + } + + override fun read(stream: DataInputStream) { + check(!isRemoved.get()) { "This network slot was removed" } + delegate.accept(codec.read(stream)) + } + + override fun write(stream: DataOutputStream) { + check(!isRemoved.get()) { "This network slot was removed" } + codec.write(stream, delegate.get()) + } + override fun get(): V { - check(!isRemoved) { "This network slot was removed" } - return parent.get() + check(!isRemoved.get()) { "This network slot was removed" } + return delegate.get() } override fun accept(t: V) { - check(!isRemoved) { "This network slot was removed" } - parent.accept(t) + check(!isRemoved.get()) { "This network slot was removed" } + delegate.accept(t) } override fun addListener(listener: Consumer): Listenable.L { - check(!isRemoved) { "This network slot was removed" } - return subs.addListener(listener) - } - - override fun write(stream: DataOutputStream) { - check(!isRemoved) { "This network slot was removed" } - codec.write(stream, parent.get()) - isDirtySlotF.set(false) - } - - override fun read(stream: DataInputStream) { - check(!isRemoved) { "This network slot was removed" } - parent.accept(codec.read(stream)) + check(!isRemoved.get()) { "This network slot was removed" } + return listeners.addListener(listener) } override fun observe(): Boolean { - check(!isRemoved) { "This network slot was removed" } - return false - } + check(!isRemoved.get()) { "This network slot was removed" } - override fun actuallyRemove() { - subs.clear() - parentSub.remove() + if (delegate is ListenableDelegate.AbstractShadow) { + return delegate.observe() + } + + return false } } - inner class ObservedSlot(private val parent: Delegate, private val codec: StreamCodec) : AbstractSlot(), ListenableDelegate { - private var lastValue: Any? = Mark - private val localLock = ReentrantLock() // avoid deadlock when code inside subscribers cause another value observation - private val subs = Listenable.Impl() + inner class ObservedSlot(val delegate: Delegate, val codec: StreamCodec) : AbstractSlot(), ListenableDelegate, ValueObserver { + private val listeners = Listenable.Impl() + private var observed: Any? = Mark + private val observeLock = ReentrantLock() - override fun actuallyRemove() { - subs.clear() - lastValue = Mark + init { + observers.add(this) } - override fun write(stream: DataOutputStream) { - check(!isRemoved) { "This network slot was removed" } - codec.write(stream, parent.get()) + override fun remove0() { + listeners.clear() } override fun read(stream: DataInputStream) { - check(!isRemoved) { "This network slot was removed" } + check(!isRemoved.get()) { "This network slot was removed" } val read = codec.read(stream) - parent.accept(read) + observed = codec.copy(read) + delegate.accept(read) + } - localLock.withLock { - lastValue = read - subs.accept(read) - } + override fun write(stream: DataOutputStream) { + check(!isRemoved.get()) { "This network slot was removed" } + codec.write(stream, delegate.get()) } override fun observe(): Boolean { - check(!isRemoved) { "This network slot was removed" } - if (isRemote) return false + val get = delegate.get() - val get = get() - - if (lastValue === Mark || !codec.compare(get, lastValue as V)) { - localLock.withLock { - if (lastValue === Mark || !codec.compare(get, lastValue as V)) { + if (observed === Mark || !codec.compare(get, observed as V)) { + observeLock.withLock { + if (observed === Mark || !codec.compare(get, observed as V)) { + observed = codec.copy(get) markDirty() - lastValue = codec.copy(get) - subs.accept(get) - return true + listeners.accept(get) } + + return true } } return false } - override fun get(): V { - check(!isRemoved) { "This network slot was removed" } - val get = get() + override fun getAndObserve(): Pair { + val get = delegate.get() - if (!isRemote) { - if (lastValue === Mark || !codec.compare(get, lastValue as V)) { - localLock.withLock { - if (lastValue === Mark || !codec.compare(get, lastValue as V)) { - markDirty() - lastValue = codec.copy(get) - subs.accept(get) - } + if (observed === Mark || !codec.compare(get, observed as V)) { + observeLock.withLock { + if (observed === Mark || !codec.compare(get, observed as V)) { + observed = codec.copy(get) + markDirty() + listeners.accept(get) } + + return get to true } } - return get - } - - override fun accept(t: V) { - check(!isRemoved) { "This network slot was removed" } - - parent.accept(t) - - localLock.withLock { - lastValue = codec.copy(t) - subs.accept(t) - } - } - - override fun addListener(listener: Consumer): Listenable.L { - check(!isRemoved) { "This network slot was removed" } - return subs.addListener(listener) - } - } - - inner class ComputedSlot(parent: Supplier, private val codec: StreamCodec) : AbstractSlot(), ListenableDelegate { - private val shadow = ListenableDelegate.CustomShadow(parent, codec::compare, codec::copy) - - override fun write(stream: DataOutputStream) { - check(!isRemoved) { "This network slot was removed" } - codec.write(stream, shadow.get()) - isDirtySlotF.set(false) - } - - override fun read(stream: DataInputStream) { - check(!isRemoved) { "This network slot was removed" } - shadow.accept(codec.read(stream)) - } - - override fun observe(): Boolean { - check(!isRemoved) { "This network slot was removed" } - if (shadow.observe()) { - markDirty() - return true - } - - return false + return get to false } override fun get(): V { - check(!isRemoved) { "This network slot was removed" } - return shadow.get() + check(!isRemoved.get()) { "This network slot was removed" } + return delegate.get() } override fun accept(t: V) { - check(!isRemoved) { "This network slot was removed" } - shadow.accept(t) + check(!isRemoved.get()) { "This network slot was removed" } + delegate.accept(t) } override fun addListener(listener: Consumer): Listenable.L { - check(!isRemoved) { "This network slot was removed" } - return shadow.addListener(listener) - } - - override fun actuallyRemove() { - shadow.clearListeners() + check(!isRemoved.get()) { "This network slot was removed" } + return listeners.addListener(listener) } } - inner class SetSlot(val value: ListenableSet, private val codec: StreamCodec) : AbstractSlot(), ListenableSet.SetListener, Listenable> { - private val sub = value.addListener(this) - private val subs = Listenable.Impl>() - private val backlog = ArrayList Unit>() + private object Mark - override fun onClear() { - check(!isRemoved) { "This network slot was removed" } - synchronized(backlog) { - markDirty() + internal data class SetAction(val value: KOptional, val action: Int) - backlog.clear() - backlog.add { write(CLEAR) } + inner class SetSlot(val delegate: ListenableSet, val codec: StreamCodec) : AbstractSlot() { + private val listener = object : ListenableSet.SetListener { + override fun onClear() { + remoteSlots.forEach { + (it as Remote.RemoteSetSlot).changelist.clear() + + synchronized(it.changelistLock) { + it.changelist.add(SetAction(KOptional(), CLEAR)) + } + + it.markDirty() + } } - } - override fun onValueAdded(element: E) { - check(!isRemoved) { "This network slot was removed" } - val copy = codec.copy(element) + override fun onValueAdded(element: V) { + remoteSlots.forEach { + it as Remote.RemoteSetSlot - synchronized(backlog) { - markDirty() + synchronized(it.changelistLock) { + it.changelist.removeIf { it.value.valueEquals(element) } + it.changelist.add(SetAction(KOptional(codec.copy(element)), ADD)) + } - backlog.add { - write(ADD) - codec.write(this, copy) + it.markDirty() + } + } + + override fun onValueRemoved(element: V) { + remoteSlots.forEach { + it as Remote.RemoteSetSlot + + synchronized(it.changelistLock) { + it.changelist.removeIf { it.value.valueEquals(element) } + it.changelist.add(SetAction(KOptional(codec.copy(element)), REMOVE)) + } + + it.markDirty() } } } - override fun onValueRemoved(element: E) { - check(!isRemoved) { "This network slot was removed" } - val copy = codec.copy(element) + private val l = delegate.addListener(listener) - synchronized(backlog) { - markDirty() - - backlog.add { - write(REMOVE) - codec.write(this, copy) - } - } - } - - override fun actuallyRemove() { - backlog.clear() - sub.remove() - subs.clear() - } - - override fun write(stream: DataOutputStream) { - check(!isRemoved) { "This network slot was removed" } - synchronized(backlog) { - isDirtySlotF.set(false) - backlog.forEach { it(stream) } - backlog.clear() - } - - stream.write(END) + override fun remove0() { + l.remove() } override fun read(stream: DataInputStream) { - check(!isRemoved) { "This network slot was removed" } + check(!isRemoved.get()) { "This network slot was removed" } var action = stream.read() while (true) { when (action) { - ADD -> value.add(codec.read(stream)) - REMOVE -> value.remove(codec.read(stream)) - CLEAR -> value.clear() + ADD -> delegate.add(codec.read(stream)) + REMOVE -> delegate.remove(codec.read(stream)) + CLEAR -> delegate.clear() else -> break } action = stream.read() } - - subs.accept(value) - } - - override fun observe(): Boolean { - check(!isRemoved) { "This network slot was removed" } - return false - } - - override fun addListener(listener: Consumer>): Listenable.L { - check(!isRemoved) { "This network slot was removed" } - return subs.addListener(listener) - } - } - - inner class MapSlot(val value: ListenableMap, private val keyCodec: StreamCodec, private val valueCodec: StreamCodec) : AbstractSlot(), Listenable>, ListenableMap.MapListener { - private val sub = value.addListener(this) - private val subs = Listenable.Impl>() - private val backlog = ArrayList Unit>() - - override fun onClear() { - check(!isRemoved) { "This network slot was removed" } - synchronized(backlog) { - markDirty() - - backlog.clear() - backlog.add { write(CLEAR) } - } - } - - override fun onValueAdded(key: K, value: V) { - check(!isRemoved) { "This network slot was removed" } - val keyCopy = keyCodec.copy(key) - val valueCopy = valueCodec.copy(value) - - synchronized(backlog) { - markDirty() - - backlog.add { - write(ADD) - keyCodec.write(this, keyCopy) - valueCodec.write(this, valueCopy) - } - } - } - - override fun onValueRemoved(key: K, value: V) { - check(!isRemoved) { "This network slot was removed" } - val keyCopy = keyCodec.copy(key) - - synchronized(backlog) { - markDirty() - - backlog.add { - write(REMOVE) - keyCodec.write(this, keyCopy) - } - } - } - - override fun actuallyRemove() { - backlog.clear() - sub.remove() - subs.clear() } override fun write(stream: DataOutputStream) { - check(!isRemoved) { "This network slot was removed" } - synchronized(backlog) { - isDirtySlotF.set(false) - backlog.forEach { it(stream) } - backlog.clear() + check(!isRemoved.get()) { "This network slot was removed" } + throw RuntimeException("unreachable code") + } + + override fun observe(): Boolean { + check(!isRemoved.get()) { "This network slot was removed" } + return false + } + } + + internal data class MapAction(val key: KOptional, val value: KOptional, val action: Int) + + inner class MapSlot(val delegate: ListenableMap, val keyCodec: StreamCodec, val valueCodec: StreamCodec) : AbstractSlot() { + private val listener = object : ListenableMap.MapListener { + override fun onClear() { + remoteSlots.forEach { + it as Remote.RemoteMapSlot + + synchronized(it.changelistLock) { + it.changelist.clear() + it.changelist.add(MapAction(KOptional(), KOptional(), CLEAR)) + } + + it.markDirty() + } } - stream.write(END) + override fun onValueAdded(key: K, value: V) { + remoteSlots.forEach { + it as Remote.RemoteMapSlot + + synchronized(it.changelistLock) { + it.changelist.removeIf { it.key.valueEquals(key) } + it.changelist.add(MapAction(KOptional(keyCodec.copy(key)), KOptional(valueCodec.copy(value)), ADD)) + } + + it.markDirty() + } + } + + override fun onValueRemoved(key: K, value: V) { + remoteSlots.forEach { + it as Remote.RemoteMapSlot + + synchronized(it.changelistLock) { + it.changelist.removeIf { it.key.valueEquals(key) } + it.changelist.add(MapAction(KOptional(keyCodec.copy(key)), KOptional(valueCodec.copy(value)), REMOVE)) + } + + it.markDirty() + } + } + } + + private val l = delegate.addListener(listener) + + override fun remove0() { + l.remove() } override fun read(stream: DataInputStream) { - check(!isRemoved) { "This network slot was removed" } + check(!isRemoved.get()) { "This network slot was removed" } var action = stream.read() while (true) { when (action) { - ADD -> value.put(keyCodec.read(stream), valueCodec.read(stream)) - REMOVE -> value.remove(keyCodec.read(stream)) - CLEAR -> value.clear() + ADD -> delegate.put(keyCodec.read(stream), valueCodec.read(stream)) + REMOVE -> delegate.remove(keyCodec.read(stream)) + CLEAR -> delegate.clear() else -> break } action = stream.read() } - - subs.accept(value) } - override fun addListener(listener: Consumer>): Listenable.L { - check(!isRemoved) { "This network slot was removed" } - return subs.addListener(listener) + override fun write(stream: DataOutputStream) { + check(!isRemoved.get()) { "This network slot was removed" } + throw RuntimeException("unreachable code") } override fun observe(): Boolean { - check(!isRemoved) { "This network slot was removed" } + check(!isRemoved.get()) { "This network slot was removed" } return false } } @@ -559,8 +424,8 @@ class DelegateSyncher : Listenable, Observer { return ObservedSlot(delegate, codec) } - fun add(delegate: Supplier, codec: StreamCodec): ComputedSlot { - return ComputedSlot(delegate, codec) + fun add(delegate: Supplier, codec: StreamCodec): Slot { + return Slot(ListenableDelegate.CustomShadow(delegate, codec::compare, codec::copy), codec) } fun add(delegate: ListenableSet, codec: StreamCodec): SetSlot { @@ -573,201 +438,289 @@ class DelegateSyncher : Listenable, Observer { @JvmName("vbyte") @JvmOverloads - fun byte(value: Byte = 0, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): Slot { + fun byte(value: Byte = 0, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { return add(ListenableDelegate.maskSmart(value, getter, setter), ByteValueCodec) } @JvmName("vshort") @JvmOverloads - fun short(value: Short = 0, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): Slot { + fun short(value: Short = 0, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { return add(ListenableDelegate.maskSmart(value, getter, setter), ShortValueCodec) } @JvmName("vchar") @JvmOverloads - fun char(value: Char, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): Slot { + fun char(value: Char, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { return add(ListenableDelegate.maskSmart(value, getter, setter), CharValueCodec) } @JvmName("vint") @JvmOverloads - fun int(value: Int = 0, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): Slot { + fun int(value: Int = 0, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { return add(ListenableDelegate.maskSmart(value, getter, setter), VarIntValueCodec) } @JvmName("vlong") @JvmOverloads - fun long(value: Long = 0L, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): Slot { + fun long(value: Long = 0L, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { return add(ListenableDelegate.maskSmart(value, getter, setter), VarLongValueCodec) } @JvmName("vfloat") @JvmOverloads - fun float(value: Float = 0f, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): Slot { + fun float(value: Float = 0f, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { return add(ListenableDelegate.maskSmart(value, getter, setter), FloatValueCodec) } @JvmName("vdouble") @JvmOverloads - fun double(value: Double = 0.0, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): Slot { + fun double(value: Double = 0.0, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { return add(ListenableDelegate.maskSmart(value, getter, setter), DoubleValueCodec) } @JvmName("vboolean") @JvmOverloads - fun boolean(value: Boolean = false, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): Slot { + fun boolean(value: Boolean = false, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { return add(ListenableDelegate.maskSmart(value, getter, setter), BooleanValueCodec) } @JvmOverloads - fun string(value: String, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): Slot { + fun string(value: String, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { return add(ListenableDelegate.maskSmart(value, getter, setter), BinaryStringCodec) } @JvmOverloads - fun uuid(value: UUID, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): Slot { + fun uuid(value: UUID, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { return add(ListenableDelegate.maskSmart(value, getter, setter), UUIDValueCodec) } @JvmOverloads - fun decimal(value: Decimal = Decimal.ZERO, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): Slot { + fun decimal(value: Decimal = Decimal.ZERO, setter: DelegateSetter = DelegateSetter.passthrough(), getter: DelegateGetter = DelegateGetter.passthrough()): Slot { return add(ListenableDelegate.maskSmart(value, getter, setter), DecimalValueCodec) } - private object Mark + fun set(codec: StreamCodec, backing: MutableSet = ObjectOpenHashSet()): SetSlot { + return add(ListenableSet(backing), codec) + } + + fun map(keyCodec: StreamCodec, valueCodec: StreamCodec, backing: MutableMap = Object2ObjectOpenHashMap()): MapSlot { + return add(ListenableMap(backing), keyCodec, valueCodec) + } /** - * Helper class which allows quick creation of [DelegateSyncher] from attached delegates. + * Remotes must be [close]d when no longer in use, otherwise they will + * leak memory and decrease performance of syncher. + * + * To get changes to be networked to remote client, [write] should be called. */ - class Group { - private val constructors = ArrayList Unit>() + inner class Remote : Closeable { + private val isRemoved = AtomicBoolean() + internal val dirty = ConcurrentLinkedQueue() + private val remoteSlots = CopyOnWriteArrayList() - fun , V> add(delegate: D, codec: StreamCodec): D { - constructors.add { - add(delegate, codec) + internal open inner class RemoteSlot(val parent: AbstractSlot) : Comparable { + private val isDirty = AtomicBoolean() + + final override fun compareTo(other: RemoteSlot): Int { + return parent.compareTo(other.parent) } - return delegate - } - - fun , V> add(delegate: D, codec: StreamCodec): D { - constructors.add { - add(delegate, codec) + init { + remoteSlots.add(this) + dirty.add(this) } - return delegate - } - - fun , E> add(delegate: D, codec: StreamCodec): D { - constructors.add { - add(delegate, codec) + fun markDirty() { + if (!isRemoved.get() && isDirty.compareAndSet(false, true)) { + dirty.add(this) + } } - return delegate - } - - fun , K, V> add(delegate: D, keyCodec: StreamCodec, valueCodec: StreamCodec): D { - constructors.add { - add(delegate, keyCodec, valueCodec) + fun markClean() { + isDirty.set(false) } - return delegate + open fun invalidate() { + markDirty() + } + + open fun remove() { + isDirty.set(true) + remoteSlots.remove(this) + dirty.remove(this) + } + + open fun write(stream: DataOutputStream) { + parent.write(stream) + } + + override fun hashCode(): Int { + return parent.hashCode() + } + + override fun equals(other: Any?): Boolean { + return this === other || other is RemoteSlot && other.parent == parent + } } - @JvmName("vbyte") - @JvmOverloads - fun byte(value: Byte = 0, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): ListenableDelegate { - return add(ListenableDelegate.maskSmart(value, getter, setter), ByteValueCodec) + internal inner class RemoteSetSlot(parent: SetSlot) : RemoteSlot(parent) { + val changelist = ArrayList>() + val changelistLock = Any() + + override fun remove() { + super.remove() + changelist.clear() + } + + override fun invalidate() { + synchronized(changelistLock) { + changelist.clear() + changelist.add(SetAction(KOptional(), CLEAR)) + + parent as SetSlot + + for (v in parent.delegate) { + changelist.add(SetAction(KOptional(parent.codec.copy(v)), ADD)) + } + } + + super.invalidate() + } + + override fun write(stream: DataOutputStream) { + synchronized(changelistLock) { + for (change in changelist) { + stream.write(change.action) + change.value.ifPresent { (parent as SetSlot).codec.write(stream, it) } + } + + changelist.clear() + } + + stream.write(0) + } } - @JvmName("vshort") - @JvmOverloads - fun short(value: Short = 0, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): ListenableDelegate { - return add(ListenableDelegate.maskSmart(value, getter, setter), ShortValueCodec) + internal inner class RemoteMapSlot(parent: MapSlot) : RemoteSlot(parent) { + val changelist = ArrayList>() + val changelistLock = Any() + + override fun remove() { + super.remove() + changelist.clear() + } + + override fun invalidate() { + synchronized(changelistLock) { + changelist.clear() + changelist.add(MapAction(KOptional(), KOptional(), CLEAR)) + + parent as MapSlot + + for ((k, v) in parent.delegate) { + changelist.add(MapAction(KOptional(parent.keyCodec.copy(k)), KOptional(parent.valueCodec.copy(v)), ADD)) + } + } + + super.invalidate() + } + + override fun write(stream: DataOutputStream) { + synchronized(changelistLock) { + for (change in changelist) { + stream.write(change.action) + change.key.ifPresent { (parent as MapSlot).keyCodec.write(stream, it) } + change.value.ifPresent { (parent as MapSlot).valueCodec.write(stream, it) } + } + + changelist.clear() + } + + stream.write(0) + } } - @JvmName("vchar") - @JvmOverloads - fun char(value: Char, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): ListenableDelegate { - return add(ListenableDelegate.maskSmart(value, getter, setter), CharValueCodec) + init { + lock.withLock { + slots.forEach { + if (it != null) { + addSlot(it) + } + } + + remotes.add(this) + } } - @JvmName("vint") - @JvmOverloads - fun int(value: Int = 0, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): ListenableDelegate { - return add(ListenableDelegate.maskSmart(value, getter, setter), VarIntValueCodec) + internal fun addSlot(slot: AbstractSlot) { + if (slot is SetSlot<*>) { + slot.remoteSlots.add(RemoteSetSlot(slot)) + } else if (slot is MapSlot<*, *>) { + slot.remoteSlots.add(RemoteMapSlot(slot)) + } else { + slot.remoteSlots.add(RemoteSlot(slot)) + } } - @JvmName("vlong") - @JvmOverloads - fun long(value: Long = 0L, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): ListenableDelegate { - return add(ListenableDelegate.maskSmart(value, getter, setter), VarLongValueCodec) + /** + * Returns null of this remote is clean. + */ + fun write(): FastByteArrayOutputStream? { + this@DelegateSyncher.observe() + + if (dirty.isNotEmpty()) { + val data = FastByteArrayOutputStream() + val stream = DataOutputStream(data) + + val sorted = ObjectAVLTreeSet() + var next = dirty.poll() + + while (next != null) { + sorted.add(next) + next.markClean() + next = dirty.poll() + } + + for (v in sorted) { + stream.writeVarInt(v.parent.id) + v.write(stream) + } + + stream.write(0) + + return data + } + + return null } - @JvmName("vfloat") - @JvmOverloads - fun float(value: Float = 0f, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): ListenableDelegate { - return add(ListenableDelegate.maskSmart(value, getter, setter), FloatValueCodec) + /** + * Marks all networked slots dirty + */ + fun invalidate() { + remoteSlots.forEach { it.invalidate() } } - @JvmName("vdouble") - @JvmOverloads - fun double(value: Double = 0.0, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): ListenableDelegate { - return add(ListenableDelegate.maskSmart(value, getter, setter), DoubleValueCodec) - } + override fun close() { + if (isRemoved.compareAndSet(false, true)) { + lock.withLock { + remoteSlots.forEach { + it.remove() + it.parent.remoteSlots.remove(it) + } - @JvmName("vboolean") - @JvmOverloads - fun boolean(value: Boolean = false, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): ListenableDelegate { - return add(ListenableDelegate.maskSmart(value, getter, setter), BooleanValueCodec) - } + remotes.remove(this) + } - @JvmOverloads - fun string(value: String, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): ListenableDelegate { - return add(ListenableDelegate.maskSmart(value, getter, setter), BinaryStringCodec) - } - - @JvmOverloads - fun uuid(value: UUID, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): ListenableDelegate { - return add(ListenableDelegate.maskSmart(value, getter, setter), UUIDValueCodec) - } - - @JvmOverloads - fun decimal(value: Decimal = Decimal.ZERO, getter: DelegateGetter = DelegateGetter.passthrough(), setter: DelegateSetter = DelegateSetter.passthrough()): ListenableDelegate { - return add(ListenableDelegate.maskSmart(value, getter, setter), DecimalValueCodec) - } - - fun set(codec: StreamCodec, backing: MutableSet = ObjectOpenHashSet()): ListenableSet { - val delegate = ListenableSet(backing) - constructors.add { add(delegate, codec) } - return delegate - } - - fun map(keyCodec: StreamCodec, valueCodec: StreamCodec, backing: MutableMap = Object2ObjectOpenHashMap()): ListenableMap { - val delegate = ListenableMap(backing) - constructors.add { add(delegate, keyCodec, valueCodec) } - return delegate - } - - fun add(other: Group): Group { - constructors.addAll(other.constructors) - return this - } - - fun create(): DelegateSyncher { - return addInto(DelegateSyncher()) - } - - fun addInto(syncher: DelegateSyncher): DelegateSyncher { - constructors.forEach { it(syncher) } - return syncher + dirty.clear() + } } } companion object { - const val END = 0 - const val CLEAR = 1 - const val ADD = 2 - const val REMOVE = 3 + private const val END = 0 + private const val CLEAR = 1 + private const val ADD = 2 + private const val REMOVE = 3 } } diff --git a/src/main/kotlin/ru/dbotthepony/kommons/util/KOptional.kt b/src/main/kotlin/ru/dbotthepony/kommons/util/KOptional.kt index 24882cb..61a292d 100644 --- a/src/main/kotlin/ru/dbotthepony/kommons/util/KOptional.kt +++ b/src/main/kotlin/ru/dbotthepony/kommons/util/KOptional.kt @@ -83,6 +83,14 @@ class KOptional private constructor(private val _value: T, val isPresent: Boo } } + fun valueEquals(other: Any?): Boolean { + if (isPresent) { + return _value == other + } + + return false + } + override fun equals(other: Any?): Boolean { return this === other || other is KOptional<*> && isPresent == other.isPresent && _value == other._value } diff --git a/src/main/kotlin/ru/dbotthepony/kommons/util/ValueObserver.kt b/src/main/kotlin/ru/dbotthepony/kommons/util/ValueObserver.kt index c533d0a..26e1cb1 100644 --- a/src/main/kotlin/ru/dbotthepony/kommons/util/ValueObserver.kt +++ b/src/main/kotlin/ru/dbotthepony/kommons/util/ValueObserver.kt @@ -15,7 +15,7 @@ interface ValueObserver : Listenable, Supplier, Observer { * * This implementation is not synchronized, calls to [get]/[observe]/[getAndObserve] from multiple threads * will result in undefined behavior; - * For thread-safe implementation, use [SynchronizedObserved]. + * For thread-safe implementation, use [SynchronizedImpl]. */ class Impl(private val provider: Supplier) : ValueObserver { private val subs = Listenable.Impl()