From efa659a755943e777f267e94db75cce7ff938806 Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Fri, 5 Apr 2024 10:05:38 +0700 Subject: [PATCH] Remove mailbox executor service since it has several design flaws --- gradle.properties | 2 +- .../kommons/util/MailboxExecutorService.kt | 447 ------------------ 2 files changed, 1 insertion(+), 448 deletions(-) delete mode 100644 src/main/kotlin/ru/dbotthepony/kommons/util/MailboxExecutorService.kt diff --git a/gradle.properties b/gradle.properties index 9ec7ced..4d0e3d6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -4,7 +4,7 @@ kotlin.code.style=official specifyKotlinAsDependency=false projectGroup=ru.dbotthepony.kommons -projectVersion=2.11.1 +projectVersion=2.12.0 guavaDepVersion=33.0.0 gsonDepVersion=2.8.9 diff --git a/src/main/kotlin/ru/dbotthepony/kommons/util/MailboxExecutorService.kt b/src/main/kotlin/ru/dbotthepony/kommons/util/MailboxExecutorService.kt deleted file mode 100644 index a0a36f9..0000000 --- a/src/main/kotlin/ru/dbotthepony/kommons/util/MailboxExecutorService.kt +++ /dev/null @@ -1,447 +0,0 @@ -package ru.dbotthepony.kommons.util - -import java.lang.Thread.UncaughtExceptionHandler -import java.util.LinkedList -import java.util.concurrent.Callable -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.Delayed -import java.util.concurrent.ExecutionException -import java.util.concurrent.Future -import java.util.concurrent.FutureTask -import java.util.concurrent.RejectedExecutionException -import java.util.concurrent.ScheduledExecutorService -import java.util.concurrent.ScheduledFuture -import java.util.concurrent.TimeUnit -import java.util.concurrent.locks.LockSupport -import java.util.concurrent.locks.ReentrantLock -import java.util.function.Consumer -import kotlin.concurrent.withLock - -private fun > LinkedList.enqueue(value: E) { - if (isEmpty()) { - add(value) - } else if (first >= value) { - addFirst(value) - } else if (last <= value) { - addLast(value) - } else { - val iterator = listIterator() - - while (iterator.hasNext()) { - val i = iterator.next() - - if (i >= value) { - iterator.previous() - iterator.add(value) - break - } - } - } -} - -/** - * [ScheduledExecutorService] which act as a mailbox, [executeQueuedTasks] must be called from main thread. - * - * [submit], [execute], etc can be called on any thread. If any of enqueueing methods are called on the same thread - * as where [executeQueuedTasks] was called, executes provided lambda immediately and returns completed future. - */ -class MailboxExecutorService(thread: Thread = Thread.currentThread()) : ScheduledExecutorService { - @Volatile - var thread: Thread = thread - private set - - private val futureQueue = ConcurrentLinkedQueue>() - - private val timers = LinkedList>() - private val repeatableTimers = LinkedList() - - @Volatile - private var isShutdown = false - @Volatile - private var isTerminated = false - - private val timeOrigin = JVMTimeSource() - - var exceptionHandler: Consumer? = null - - private inner class Timer(task: Callable, val executeAt: Long) : FutureTask(task), ScheduledFuture { - override fun compareTo(other: Delayed): Int { - return getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS)) - } - - override fun getDelay(unit: TimeUnit): Long { - return unit.convert(executeAt, TimeUnit.NANOSECONDS) - timeOrigin.nanos - } - } - - private data class CompletedFuture(private val value: T) : Future { - override fun cancel(mayInterruptIfRunning: Boolean): Boolean { - return false - } - - override fun isCancelled(): Boolean { - return false - } - - override fun isDone(): Boolean { - return true - } - - override fun get(): T { - return value - } - - override fun get(timeout: Long, unit: TimeUnit): T { - return value - } - - companion object { - val VOID = CompletedFuture(Unit) - } - } - - private inner class RepeatableTimer( - task: Runnable, - initialDelay: Long, - val period: Long, - val fixedDelay: Boolean, - ): FutureTask({ task.run() }), ScheduledFuture { - var next = initialDelay - private set - - public override fun runAndReset(): Boolean { - if (fixedDelay) { - next += period - return super.runAndReset() - } else { - try { - return super.runAndReset() - } finally { - next += period - } - } - } - - override fun compareTo(other: Delayed): Int { - return getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS)) - } - - override fun getDelay(unit: TimeUnit): Long { - return unit.convert(next, TimeUnit.NANOSECONDS) - timeOrigin.nanos - } - } - - fun isSameThread(): Boolean { - return Thread.currentThread() === thread - } - - fun executeQueuedTasks() { - thread = Thread.currentThread() - - if (isShutdown) { - if (!isTerminated) { - isTerminated = true - - futureQueue.forEach { - it.cancel(false) - } - - futureQueue.clear() - timers.clear() - repeatableTimers.clear() - - return - } - } - - var next = futureQueue.poll() - - while (next != null) { - if (isTerminated) return - next.run() - Thread.interrupted() - - try { - next.get() - } catch (err: ExecutionException) { - exceptionHandler?.accept(err) - } - - next = futureQueue.poll() - } - - while (!timers.isEmpty()) { - if (isTerminated) return - val first = timers.first - - if (first.isCancelled) { - timers.removeFirst() - } else if (first.executeAt <= timeOrigin.nanos) { - first.run() - Thread.interrupted() - - try { - first.get() - } catch (err: ExecutionException) { - exceptionHandler?.accept(err) - } - - timers.removeFirst() - } else { - break - } - } - - if (repeatableTimers.isNotEmpty()) { - val executed = LinkedList() - - while (repeatableTimers.isNotEmpty()) { - if (isTerminated) return - val first = repeatableTimers.first - - if (first.isDone) { - repeatableTimers.removeFirst() - } else if (first.next <= timeOrigin.nanos) { - if (first.runAndReset()) { - executed.add(first) - } - - repeatableTimers.removeFirst() - } else { - break - } - } - - executed.forEach { repeatableTimers.enqueue(it) } - } - } - - override fun execute(command: Runnable) { - if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") - - if (isSameThread()) { - command.run() - } else { - futureQueue.add(FutureTask(command, Unit)) - LockSupport.unpark(thread) - } - } - - override fun shutdown() { - isShutdown = true - } - - override fun shutdownNow(): List { - if (isTerminated) return listOf() - isShutdown = true - isTerminated = true - - val result = ArrayList() - - futureQueue.forEach { - it.cancel(false) - result.add(it) - } - - futureQueue.clear() - - timers.forEach { it.cancel(false) } - repeatableTimers.forEach { it.cancel(false) } - - timers.clear() - repeatableTimers.clear() - - return result - } - - override fun isShutdown(): Boolean { - return isShutdown - } - - override fun isTerminated(): Boolean { - return isTerminated - } - - override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean { - throw UnsupportedOperationException() - } - - override fun submit(task: Callable): Future { - if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") - if (isSameThread()) return CompletedFuture(task.call()) - return FutureTask(task).also { futureQueue.add(it); LockSupport.unpark(thread) } - } - - override fun submit(task: Runnable, result: T): Future { - if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") - if (isSameThread()) { task.run(); return CompletedFuture(result) } - return FutureTask { task.run(); result }.also { futureQueue.add(it); LockSupport.unpark(thread) } - } - - override fun submit(task: Runnable): Future<*> { - if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") - if (isSameThread()) { task.run(); return CompletedFuture.VOID } - return FutureTask { task.run() }.also { futureQueue.add(it); LockSupport.unpark(thread) } - } - - override fun invokeAll(tasks: Collection>): List> { - if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") - - if (isSameThread()) { - return tasks.map { CompletedFuture(it.call()) } - } else { - return tasks.map { submit(it) }.onEach { it.get() } - } - } - - override fun invokeAll( - tasks: Collection>, - timeout: Long, - unit: TimeUnit - ): List> { - if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") - - if (isSameThread()) { - return tasks.map { CompletedFuture(it.call()) } - } else { - return tasks.map { submit(it) }.onEach { it.get(timeout, unit) } - } - } - - override fun invokeAny(tasks: Collection>): T { - if (tasks.isEmpty()) - throw NoSuchElementException("Provided task list is empty") - - if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") - - if (isSameThread()) { - return tasks.first().call() - } else { - return submit(tasks.first()).get() - } - } - - override fun invokeAny(tasks: Collection>, timeout: Long, unit: TimeUnit): T { - if (tasks.isEmpty()) - throw NoSuchElementException("Provided task list is empty") - - if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") - - if (isSameThread()) { - return tasks.first().call() - } else { - return submit(tasks.first()).get(timeout, unit) - } - } - - fun join(future: Future): V { - if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") - - if (!isSameThread()) - return future.get() - - while (!future.isDone) { - executeQueuedTasks() - LockSupport.parkNanos(1_000_000L) - } - - return future.get() - } - - override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { - if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") - val timer = Timer({ command.run() }, timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(delay, unit)) - - if (isSameThread() && delay <= 0L) { - timer.run() - Thread.interrupted() - } else if (isSameThread()) { - timers.enqueue(timer) - } else { - execute { - if (timer.isCancelled) { - // do nothing - } else if (timer.executeAt <= timeOrigin.nanos) { - timer.run() - Thread.interrupted() - } else { - timers.enqueue(timer) - } - } - } - - return timer - } - - override fun schedule(callable: Callable, delay: Long, unit: TimeUnit): ScheduledFuture { - if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") - - val timer = Timer(callable, timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(delay, unit)) - - if (isSameThread() && delay <= 0L) { - timer.run() - Thread.interrupted() - } else if (isSameThread()) { - timers.enqueue(timer) - } else { - execute { - if (timer.isCancelled) { - // do nothing - } else if (timer.executeAt <= timeOrigin.nanos) { - timer.run() - Thread.interrupted() - } else { - timers.enqueue(timer) - } - } - } - - return timer - } - - override fun scheduleAtFixedRate( - command: Runnable, - initialDelay: Long, - period: Long, - unit: TimeUnit - ): ScheduledFuture<*> { - if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") - - return RepeatableTimer( - command, - timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit), - TimeUnit.NANOSECONDS.convert(period, unit), true) - .also { - execute { - if (it.isCancelled) { - // do nothing - } else { - repeatableTimers.enqueue(it) - } - } - } - } - - override fun scheduleWithFixedDelay( - command: Runnable, - initialDelay: Long, - delay: Long, - unit: TimeUnit - ): ScheduledFuture<*> { - if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") - - return RepeatableTimer( - command, - timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit), - TimeUnit.NANOSECONDS.convert(delay, unit), false) - .also { - execute { - if (it.isCancelled) { - // do nothing - } else { - repeatableTimers.enqueue(it) - } - } - } - } -} \ No newline at end of file