From 839e7cc503b7e2d460e09a828ff22625fffcf7c2 Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Sat, 12 Aug 2023 01:13:42 +0700 Subject: [PATCH] Allow to remove subscribers while iterating them --- .../dbotthepony/mc/otm/core/ISubscripable.kt | 49 ++++++++++++++++--- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/ru/dbotthepony/mc/otm/core/ISubscripable.kt b/src/main/kotlin/ru/dbotthepony/mc/otm/core/ISubscripable.kt index ff17bd269..2dabdfb28 100644 --- a/src/main/kotlin/ru/dbotthepony/mc/otm/core/ISubscripable.kt +++ b/src/main/kotlin/ru/dbotthepony/mc/otm/core/ISubscripable.kt @@ -2,6 +2,7 @@ package ru.dbotthepony.mc.otm.core import it.unimi.dsi.fastutil.booleans.BooleanConsumer import it.unimi.dsi.fastutil.floats.FloatConsumer +import it.unimi.dsi.fastutil.objects.ReferenceArraySet import it.unimi.dsi.fastutil.objects.ReferenceLinkedOpenHashSet import java.util.function.Consumer import java.util.function.DoubleConsumer @@ -23,22 +24,28 @@ interface ISubscriptable { class Impl : ISubscriptable, Consumer { private inner class L(val callback: Consumer) : ISubscriptable.L { + private var isRemoved = false + init { subscribers.add(this) } override fun remove() { - subscribers.remove(this) + isRemoved = true + queue.add(this) } } private val subscribers = ReferenceLinkedOpenHashSet(0) + private val queue = ReferenceArraySet(0) override fun addListener(listener: Consumer): ISubscriptable.L { return L(listener) } override fun accept(t: V) { + queue.forEach { subscribers.remove(it) } + queue.clear() subscribers.forEach { it.callback.accept(t) } } } @@ -67,22 +74,28 @@ interface IFloatSubcripable : ISubscriptable { class Impl : IFloatSubcripable, Consumer, FloatConsumer { private inner class L(val callback: FloatConsumer) : ISubscriptable.L { + private var isRemoved = false + init { subscribers.add(this) } override fun remove() { - subscribers.remove(this) + isRemoved = true + queue.add(this) } } private val subscribers = ReferenceLinkedOpenHashSet(0) + private val queue = ReferenceArraySet(0) override fun addListener(listener: FloatConsumer): ISubscriptable.L { return L(listener) } override fun accept(t: Float) { + queue.forEach { subscribers.remove(it) } + queue.clear() subscribers.forEach { it.callback.accept(t) } } } @@ -98,22 +111,28 @@ interface IDoubleSubcripable : ISubscriptable { class Impl : IDoubleSubcripable, Consumer, DoubleConsumer { private inner class L(val callback: DoubleConsumer) : ISubscriptable.L { + private var isRemoved = false + init { subscribers.add(this) } override fun remove() { - subscribers.remove(this) + isRemoved = true + queue.add(this) } } private val subscribers = ReferenceLinkedOpenHashSet(0) + private val queue = ReferenceArraySet(0) override fun addListener(listener: DoubleConsumer): ISubscriptable.L { return L(listener) } override fun accept(t: Double) { + queue.forEach { subscribers.remove(it) } + queue.clear() subscribers.forEach { it.callback.accept(t) } } } @@ -129,22 +148,28 @@ interface IIntSubcripable : ISubscriptable { class Impl : IIntSubcripable, Consumer, IntConsumer { private inner class L(val callback: IntConsumer) : ISubscriptable.L { + private var isRemoved = false + init { subscribers.add(this) } override fun remove() { - subscribers.remove(this) + isRemoved = true + queue.add(this) } } private val subscribers = ReferenceLinkedOpenHashSet(0) + private val queue = ReferenceArraySet(0) override fun addListener(listener: IntConsumer): ISubscriptable.L { return L(listener) } override fun accept(t: Int) { + queue.forEach { subscribers.remove(it) } + queue.clear() subscribers.forEach { it.callback.accept(t) } } } @@ -160,22 +185,28 @@ interface ILongSubcripable : ISubscriptable { class Impl : ILongSubcripable, Consumer, LongConsumer { private inner class L(val callback: LongConsumer) : ISubscriptable.L { + private var isRemoved = false + init { subscribers.add(this) } override fun remove() { - subscribers.remove(this) + isRemoved = true + queue.add(this) } } private val subscribers = ReferenceLinkedOpenHashSet(0) + private val queue = ReferenceArraySet(0) override fun addListener(listener: LongConsumer): ISubscriptable.L { return L(listener) } override fun accept(t: Long) { + queue.forEach { subscribers.remove(it) } + queue.clear() subscribers.forEach { it.callback.accept(t) } } } @@ -191,22 +222,28 @@ interface IBooleanSubscriptable : ISubscriptable { class Impl : IBooleanSubscriptable, Consumer, BooleanConsumer { private inner class L(val callback: BooleanConsumer) : ISubscriptable.L { + private var isRemoved = false + init { subscribers.add(this) } override fun remove() { - subscribers.remove(this) + isRemoved = true + queue.add(this) } } private val subscribers = ReferenceLinkedOpenHashSet(0) + private val queue = ReferenceArraySet(0) override fun addListener(listener: BooleanConsumer): ISubscriptable.L { return L(listener) } override fun accept(t: Boolean) { + queue.forEach { subscribers.remove(it) } + queue.clear() subscribers.forEach { it.callback.accept(t) } } }