Remove mailbox executor service since it has several design flaws

This commit is contained in:
DBotThePony 2024-04-05 10:05:38 +07:00
parent b0d4a1d0fe
commit efa659a755
Signed by: DBot
GPG Key ID: DCC23B5715498507
2 changed files with 1 additions and 448 deletions

View File

@ -4,7 +4,7 @@ kotlin.code.style=official
specifyKotlinAsDependency=false
projectGroup=ru.dbotthepony.kommons
projectVersion=2.11.1
projectVersion=2.12.0
guavaDepVersion=33.0.0
gsonDepVersion=2.8.9

View File

@ -1,447 +0,0 @@
package ru.dbotthepony.kommons.util
import java.lang.Thread.UncaughtExceptionHandler
import java.util.LinkedList
import java.util.concurrent.Callable
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Delayed
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.FutureTask
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.LockSupport
import java.util.concurrent.locks.ReentrantLock
import java.util.function.Consumer
import kotlin.concurrent.withLock
private fun <E : Comparable<E>> LinkedList<E>.enqueue(value: E) {
if (isEmpty()) {
add(value)
} else if (first >= value) {
addFirst(value)
} else if (last <= value) {
addLast(value)
} else {
val iterator = listIterator()
while (iterator.hasNext()) {
val i = iterator.next()
if (i >= value) {
iterator.previous()
iterator.add(value)
break
}
}
}
}
/**
* [ScheduledExecutorService] which act as a mailbox, [executeQueuedTasks] must be called from main thread.
*
* [submit], [execute], etc can be called on any thread. If any of enqueueing methods are called on the same thread
* as where [executeQueuedTasks] was called, executes provided lambda immediately and returns completed future.
*/
class MailboxExecutorService(thread: Thread = Thread.currentThread()) : ScheduledExecutorService {
@Volatile
var thread: Thread = thread
private set
private val futureQueue = ConcurrentLinkedQueue<FutureTask<*>>()
private val timers = LinkedList<Timer<*>>()
private val repeatableTimers = LinkedList<RepeatableTimer>()
@Volatile
private var isShutdown = false
@Volatile
private var isTerminated = false
private val timeOrigin = JVMTimeSource()
var exceptionHandler: Consumer<Throwable>? = null
private inner class Timer<T>(task: Callable<T>, val executeAt: Long) : FutureTask<T>(task), ScheduledFuture<T> {
override fun compareTo(other: Delayed): Int {
return getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS))
}
override fun getDelay(unit: TimeUnit): Long {
return unit.convert(executeAt, TimeUnit.NANOSECONDS) - timeOrigin.nanos
}
}
private data class CompletedFuture<T>(private val value: T) : Future<T> {
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
return false
}
override fun isCancelled(): Boolean {
return false
}
override fun isDone(): Boolean {
return true
}
override fun get(): T {
return value
}
override fun get(timeout: Long, unit: TimeUnit): T {
return value
}
companion object {
val VOID = CompletedFuture(Unit)
}
}
private inner class RepeatableTimer(
task: Runnable,
initialDelay: Long,
val period: Long,
val fixedDelay: Boolean,
): FutureTask<Unit>({ task.run() }), ScheduledFuture<Unit> {
var next = initialDelay
private set
public override fun runAndReset(): Boolean {
if (fixedDelay) {
next += period
return super.runAndReset()
} else {
try {
return super.runAndReset()
} finally {
next += period
}
}
}
override fun compareTo(other: Delayed): Int {
return getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS))
}
override fun getDelay(unit: TimeUnit): Long {
return unit.convert(next, TimeUnit.NANOSECONDS) - timeOrigin.nanos
}
}
fun isSameThread(): Boolean {
return Thread.currentThread() === thread
}
fun executeQueuedTasks() {
thread = Thread.currentThread()
if (isShutdown) {
if (!isTerminated) {
isTerminated = true
futureQueue.forEach {
it.cancel(false)
}
futureQueue.clear()
timers.clear()
repeatableTimers.clear()
return
}
}
var next = futureQueue.poll()
while (next != null) {
if (isTerminated) return
next.run()
Thread.interrupted()
try {
next.get()
} catch (err: ExecutionException) {
exceptionHandler?.accept(err)
}
next = futureQueue.poll()
}
while (!timers.isEmpty()) {
if (isTerminated) return
val first = timers.first
if (first.isCancelled) {
timers.removeFirst()
} else if (first.executeAt <= timeOrigin.nanos) {
first.run()
Thread.interrupted()
try {
first.get()
} catch (err: ExecutionException) {
exceptionHandler?.accept(err)
}
timers.removeFirst()
} else {
break
}
}
if (repeatableTimers.isNotEmpty()) {
val executed = LinkedList<RepeatableTimer>()
while (repeatableTimers.isNotEmpty()) {
if (isTerminated) return
val first = repeatableTimers.first
if (first.isDone) {
repeatableTimers.removeFirst()
} else if (first.next <= timeOrigin.nanos) {
if (first.runAndReset()) {
executed.add(first)
}
repeatableTimers.removeFirst()
} else {
break
}
}
executed.forEach { repeatableTimers.enqueue(it) }
}
}
override fun execute(command: Runnable) {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) {
command.run()
} else {
futureQueue.add(FutureTask(command, Unit))
LockSupport.unpark(thread)
}
}
override fun shutdown() {
isShutdown = true
}
override fun shutdownNow(): List<Runnable> {
if (isTerminated) return listOf()
isShutdown = true
isTerminated = true
val result = ArrayList<Runnable>()
futureQueue.forEach {
it.cancel(false)
result.add(it)
}
futureQueue.clear()
timers.forEach { it.cancel(false) }
repeatableTimers.forEach { it.cancel(false) }
timers.clear()
repeatableTimers.clear()
return result
}
override fun isShutdown(): Boolean {
return isShutdown
}
override fun isTerminated(): Boolean {
return isTerminated
}
override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean {
throw UnsupportedOperationException()
}
override fun <T : Any?> submit(task: Callable<T>): Future<T> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) return CompletedFuture(task.call())
return FutureTask(task).also { futureQueue.add(it); LockSupport.unpark(thread) }
}
override fun <T : Any?> submit(task: Runnable, result: T): Future<T> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) { task.run(); return CompletedFuture(result) }
return FutureTask { task.run(); result }.also { futureQueue.add(it); LockSupport.unpark(thread) }
}
override fun submit(task: Runnable): Future<*> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) { task.run(); return CompletedFuture.VOID }
return FutureTask { task.run() }.also { futureQueue.add(it); LockSupport.unpark(thread) }
}
override fun <T : Any?> invokeAll(tasks: Collection<Callable<T>>): List<Future<T>> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) {
return tasks.map { CompletedFuture(it.call()) }
} else {
return tasks.map { submit(it) }.onEach { it.get() }
}
}
override fun <T : Any?> invokeAll(
tasks: Collection<Callable<T>>,
timeout: Long,
unit: TimeUnit
): List<Future<T>> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) {
return tasks.map { CompletedFuture(it.call()) }
} else {
return tasks.map { submit(it) }.onEach { it.get(timeout, unit) }
}
}
override fun <T : Any?> invokeAny(tasks: Collection<Callable<T>>): T {
if (tasks.isEmpty())
throw NoSuchElementException("Provided task list is empty")
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) {
return tasks.first().call()
} else {
return submit(tasks.first()).get()
}
}
override fun <T : Any?> invokeAny(tasks: Collection<Callable<T>>, timeout: Long, unit: TimeUnit): T {
if (tasks.isEmpty())
throw NoSuchElementException("Provided task list is empty")
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) {
return tasks.first().call()
} else {
return submit(tasks.first()).get(timeout, unit)
}
}
fun <V> join(future: Future<V>): V {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (!isSameThread())
return future.get()
while (!future.isDone) {
executeQueuedTasks()
LockSupport.parkNanos(1_000_000L)
}
return future.get()
}
override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
val timer = Timer({ command.run() }, timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(delay, unit))
if (isSameThread() && delay <= 0L) {
timer.run()
Thread.interrupted()
} else if (isSameThread()) {
timers.enqueue(timer)
} else {
execute {
if (timer.isCancelled) {
// do nothing
} else if (timer.executeAt <= timeOrigin.nanos) {
timer.run()
Thread.interrupted()
} else {
timers.enqueue(timer)
}
}
}
return timer
}
override fun <V : Any?> schedule(callable: Callable<V>, delay: Long, unit: TimeUnit): ScheduledFuture<V> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
val timer = Timer(callable, timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(delay, unit))
if (isSameThread() && delay <= 0L) {
timer.run()
Thread.interrupted()
} else if (isSameThread()) {
timers.enqueue(timer)
} else {
execute {
if (timer.isCancelled) {
// do nothing
} else if (timer.executeAt <= timeOrigin.nanos) {
timer.run()
Thread.interrupted()
} else {
timers.enqueue(timer)
}
}
}
return timer
}
override fun scheduleAtFixedRate(
command: Runnable,
initialDelay: Long,
period: Long,
unit: TimeUnit
): ScheduledFuture<*> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
return RepeatableTimer(
command,
timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit),
TimeUnit.NANOSECONDS.convert(period, unit), true)
.also {
execute {
if (it.isCancelled) {
// do nothing
} else {
repeatableTimers.enqueue(it)
}
}
}
}
override fun scheduleWithFixedDelay(
command: Runnable,
initialDelay: Long,
delay: Long,
unit: TimeUnit
): ScheduledFuture<*> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
return RepeatableTimer(
command,
timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit),
TimeUnit.NANOSECONDS.convert(delay, unit), false)
.also {
execute {
if (it.isCancelled) {
// do nothing
} else {
repeatableTimers.enqueue(it)
}
}
}
}
}