From 56d8ab1b7aaa9febaa52a3c5d8404faeba76c43f Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Sat, 3 Feb 2024 18:54:47 +0700 Subject: [PATCH] Mailbox executor service --- gradle.properties | 2 +- .../kommons/util/MailboxExecutorService.kt | 421 ++++++++++++++++++ .../ru/dbotthepony/kommons/util/TimeSource.kt | 53 +++ 3 files changed, 475 insertions(+), 1 deletion(-) create mode 100644 src/main/kotlin/ru/dbotthepony/kommons/util/MailboxExecutorService.kt create mode 100644 src/main/kotlin/ru/dbotthepony/kommons/util/TimeSource.kt diff --git a/gradle.properties b/gradle.properties index bc9caae..a494137 100644 --- a/gradle.properties +++ b/gradle.properties @@ -4,7 +4,7 @@ kotlin.code.style=official specifyKotlinAsDependency=false projectGroup=ru.dbotthepony.kommons -projectVersion=1.3.0 +projectVersion=1.4.0 guavaDepVersion=33.0.0 gsonDepVersion=2.8.9 diff --git a/src/main/kotlin/ru/dbotthepony/kommons/util/MailboxExecutorService.kt b/src/main/kotlin/ru/dbotthepony/kommons/util/MailboxExecutorService.kt new file mode 100644 index 0000000..b5c8aa3 --- /dev/null +++ b/src/main/kotlin/ru/dbotthepony/kommons/util/MailboxExecutorService.kt @@ -0,0 +1,421 @@ +package ru.dbotthepony.kommons.util + +import java.util.LinkedList +import java.util.concurrent.Callable +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.Delayed +import java.util.concurrent.Future +import java.util.concurrent.FutureTask +import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.LockSupport +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + +private fun > LinkedList.enqueue(value: E) { + if (isEmpty()) { + add(value) + } else if (first >= value) { + addFirst(value) + } else if (last <= value) { + addLast(value) + } else { + val iterator = listIterator() + + while (iterator.hasNext()) { + val i = iterator.next() + + if (i >= value) { + iterator.previous() + iterator.add(value) + break + } + } + } +} + +class MailboxExecutorService(thread: Thread = Thread.currentThread()) : ScheduledExecutorService { + @Volatile + var thread: Thread = thread + private set + + private val futureQueue = ConcurrentLinkedQueue>() + + private val timers = LinkedList>() + private val repeatableTimers = LinkedList() + private val executionLock = ReentrantLock() + + @Volatile + private var isShutdown = false + @Volatile + private var isTerminated = false + + private val timeOrigin = JVMTimeSource() + + private inner class Timer(task: Callable, val executeAt: Long) : FutureTask(task), ScheduledFuture { + override fun compareTo(other: Delayed): Int { + return getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS)) + } + + override fun getDelay(unit: TimeUnit): Long { + return unit.convert(executeAt, TimeUnit.NANOSECONDS) - timeOrigin.nanos + } + } + + private data class CompletedFuture(private val value: T) : Future { + override fun cancel(mayInterruptIfRunning: Boolean): Boolean { + return false + } + + override fun isCancelled(): Boolean { + return false + } + + override fun isDone(): Boolean { + return true + } + + override fun get(): T { + return value + } + + override fun get(timeout: Long, unit: TimeUnit): T { + return value + } + + companion object { + val VOID = CompletedFuture(Unit) + } + } + + private inner class RepeatableTimer( + task: Runnable, + initialDelay: Long, + val period: Long, + val fixedDelay: Boolean, + ): FutureTask({ task.run() }), ScheduledFuture { + var next = initialDelay + private set + + public override fun runAndReset(): Boolean { + if (fixedDelay) { + next += period + return super.runAndReset() + } else { + try { + return super.runAndReset() + } finally { + next += period + } + } + } + + override fun compareTo(other: Delayed): Int { + return getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS)) + } + + override fun getDelay(unit: TimeUnit): Long { + return unit.convert(next, TimeUnit.NANOSECONDS) - timeOrigin.nanos + } + } + + fun isSameThread(): Boolean { + return Thread.currentThread() === thread + } + + fun executeQueuedTasks() { + thread = Thread.currentThread() + + if (isShutdown) { + if (!isTerminated) { + isTerminated = true + + executionLock.withLock { + timers.clear() + repeatableTimers.clear() + } + + return + } + } + + executionLock.withLock { + var next = futureQueue.poll() + + while (next != null) { + if (isTerminated) return + next.run() + Thread.interrupted() + next = futureQueue.poll() + } + + while (!timers.isEmpty()) { + if (isTerminated) return + val first = timers.first + + if (first.isCancelled) { + timers.removeFirst() + } else if (first.executeAt <= timeOrigin.nanos) { + first.run() + Thread.interrupted() + timers.removeFirst() + } else { + break + } + } + + if (repeatableTimers.isNotEmpty()) { + val executed = LinkedList() + + while (repeatableTimers.isNotEmpty()) { + if (isTerminated) return + val first = repeatableTimers.first + + if (first.isDone) { + repeatableTimers.removeFirst() + } else if (first.next <= timeOrigin.nanos) { + first.runAndReset() + executed.add(first) + repeatableTimers.removeFirst() + } else { + break + } + } + + executed.forEach { repeatableTimers.enqueue(it) } + } + } + } + + override fun execute(command: Runnable) { + if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") + + if (isSameThread()) { + command.run() + } else { + futureQueue.add(FutureTask(command, Unit)) + LockSupport.unpark(thread) + } + } + + override fun shutdown() { + isShutdown = true + } + + override fun shutdownNow(): MutableList { + isShutdown = true + isTerminated = true + + val result = ArrayList() + + executionLock.withLock { + futureQueue.forEach { + it.cancel(false) + result.add(it) + } + + futureQueue.clear() + + timers.forEach { it.cancel(false) } + repeatableTimers.forEach { it.cancel(false) } + + timers.clear() + repeatableTimers.clear() + } + + return result + } + + override fun isShutdown(): Boolean { + return isShutdown + } + + override fun isTerminated(): Boolean { + return isTerminated + } + + override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean { + throw UnsupportedOperationException() + } + + override fun submit(task: Callable): Future { + if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") + if (isSameThread()) return CompletedFuture(task.call()) + return FutureTask(task).also { futureQueue.add(it); LockSupport.unpark(thread) } + } + + override fun submit(task: Runnable, result: T): Future { + if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") + if (isSameThread()) { task.run(); return CompletedFuture(result) } + return FutureTask { task.run(); result }.also { futureQueue.add(it); LockSupport.unpark(thread) } + } + + override fun submit(task: Runnable): Future<*> { + if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") + if (isSameThread()) { task.run(); return CompletedFuture.VOID } + return FutureTask { task.run() }.also { futureQueue.add(it); LockSupport.unpark(thread) } + } + + override fun invokeAll(tasks: Collection>): List> { + if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") + + if (isSameThread()) { + return tasks.map { CompletedFuture(it.call()) } + } else { + return tasks.map { submit(it) }.onEach { it.get() } + } + } + + override fun invokeAll( + tasks: Collection>, + timeout: Long, + unit: TimeUnit + ): List> { + if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") + + if (isSameThread()) { + return tasks.map { CompletedFuture(it.call()) } + } else { + return tasks.map { submit(it) }.onEach { it.get(timeout, unit) } + } + } + + override fun invokeAny(tasks: Collection>): T { + if (tasks.isEmpty()) + throw NoSuchElementException("Provided task list is empty") + + if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") + + if (isSameThread()) { + return tasks.first().call() + } else { + return submit(tasks.first()).get() + } + } + + override fun invokeAny(tasks: Collection>, timeout: Long, unit: TimeUnit): T { + if (tasks.isEmpty()) + throw NoSuchElementException("Provided task list is empty") + + if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") + + if (isSameThread()) { + return tasks.first().call() + } else { + return submit(tasks.first()).get(timeout, unit) + } + } + + fun join(future: Future): V { + if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") + + if (!isSameThread()) + return future.get() + + while (!future.isDone) { + executeQueuedTasks() + LockSupport.parkNanos(1_000_000L) + } + + return future.get() + } + + override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { + if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") + val timer = Timer({ command.run() }, timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(delay, unit)) + + if (isSameThread() && delay <= 0L) { + timer.run() + Thread.interrupted() + } else if (isSameThread()) { + timers.enqueue(timer) + } else { + execute { + if (timer.isCancelled) { + // do nothing + } else if (timer.executeAt <= timeOrigin.nanos) { + timer.run() + Thread.interrupted() + } else { + timers.enqueue(timer) + } + } + } + + return timer + } + + override fun schedule(callable: Callable, delay: Long, unit: TimeUnit): ScheduledFuture { + if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") + + val timer = Timer(callable, timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(delay, unit)) + + if (isSameThread() && delay <= 0L) { + timer.run() + Thread.interrupted() + } else if (isSameThread()) { + timers.enqueue(timer) + } else { + execute { + if (timer.isCancelled) { + // do nothing + } else if (timer.executeAt <= timeOrigin.nanos) { + timer.run() + Thread.interrupted() + } else { + timers.enqueue(timer) + } + } + } + + return timer + } + + override fun scheduleAtFixedRate( + command: Runnable, + initialDelay: Long, + period: Long, + unit: TimeUnit + ): ScheduledFuture<*> { + if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") + + return RepeatableTimer( + command, + timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit), + TimeUnit.NANOSECONDS.convert(period, unit), true) + .also { + execute { + if (it.isCancelled) { + // do nothing + } else { + repeatableTimers.enqueue(it) + } + } + } + } + + override fun scheduleWithFixedDelay( + command: Runnable, + initialDelay: Long, + delay: Long, + unit: TimeUnit + ): ScheduledFuture<*> { + if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down") + + return RepeatableTimer( + command, + timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit), + TimeUnit.NANOSECONDS.convert(delay, unit), false) + .also { + execute { + if (it.isCancelled) { + // do nothing + } else { + repeatableTimers.enqueue(it) + } + } + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/ru/dbotthepony/kommons/util/TimeSource.kt b/src/main/kotlin/ru/dbotthepony/kommons/util/TimeSource.kt new file mode 100644 index 0000000..71ba6a8 --- /dev/null +++ b/src/main/kotlin/ru/dbotthepony/kommons/util/TimeSource.kt @@ -0,0 +1,53 @@ +package ru.dbotthepony.kommons.util + +interface ITimeSource { + val nanos: Long + val micros: Long get() = nanos / 1_000L + val millis: Long get() = nanos / 1_000_000L + val seconds: Double get() = (nanos / 1_000L) / 1_000_000.0 +} + +class JVMTimeSource : ITimeSource { + private val origin = System.nanoTime() + + override val nanos: Long + get() = System.nanoTime() - origin + + companion object { + @JvmField + val INSTANCE = JVMTimeSource() + } +} + +class ArtificialTimeSource(nanos: Long = 0L) : ITimeSource { + override var nanos: Long = nanos + private set + + fun advance(nanos: Long) { + this.nanos += nanos + } +} + +class PausableTimeSource(private val parent: ITimeSource) : ITimeSource { + override val nanos: Long get() { + if (isPaused) { + return pausedSince - skipped + } else { + return parent.nanos - skipped + } + } + + private var isPaused = false + private var pausedSince = 0L + private var skipped = 0L + + fun pause() { + if (!isPaused) { + isPaused = true + pausedSince = parent.nanos + } else { + isPaused = false + skipped += parent.nanos - pausedSince + } + } +}