Mailbox executor service

This commit is contained in:
DBotThePony 2024-02-03 18:54:47 +07:00
parent 54c180721a
commit 56d8ab1b7a
Signed by: DBot
GPG Key ID: DCC23B5715498507
3 changed files with 475 additions and 1 deletions

View File

@ -4,7 +4,7 @@ kotlin.code.style=official
specifyKotlinAsDependency=false
projectGroup=ru.dbotthepony.kommons
projectVersion=1.3.0
projectVersion=1.4.0
guavaDepVersion=33.0.0
gsonDepVersion=2.8.9

View File

@ -0,0 +1,421 @@
package ru.dbotthepony.kommons.util
import java.util.LinkedList
import java.util.concurrent.Callable
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Delayed
import java.util.concurrent.Future
import java.util.concurrent.FutureTask
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.LockSupport
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
private fun <E : Comparable<E>> LinkedList<E>.enqueue(value: E) {
if (isEmpty()) {
add(value)
} else if (first >= value) {
addFirst(value)
} else if (last <= value) {
addLast(value)
} else {
val iterator = listIterator()
while (iterator.hasNext()) {
val i = iterator.next()
if (i >= value) {
iterator.previous()
iterator.add(value)
break
}
}
}
}
class MailboxExecutorService(thread: Thread = Thread.currentThread()) : ScheduledExecutorService {
@Volatile
var thread: Thread = thread
private set
private val futureQueue = ConcurrentLinkedQueue<FutureTask<*>>()
private val timers = LinkedList<Timer<*>>()
private val repeatableTimers = LinkedList<RepeatableTimer>()
private val executionLock = ReentrantLock()
@Volatile
private var isShutdown = false
@Volatile
private var isTerminated = false
private val timeOrigin = JVMTimeSource()
private inner class Timer<T>(task: Callable<T>, val executeAt: Long) : FutureTask<T>(task), ScheduledFuture<T> {
override fun compareTo(other: Delayed): Int {
return getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS))
}
override fun getDelay(unit: TimeUnit): Long {
return unit.convert(executeAt, TimeUnit.NANOSECONDS) - timeOrigin.nanos
}
}
private data class CompletedFuture<T>(private val value: T) : Future<T> {
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
return false
}
override fun isCancelled(): Boolean {
return false
}
override fun isDone(): Boolean {
return true
}
override fun get(): T {
return value
}
override fun get(timeout: Long, unit: TimeUnit): T {
return value
}
companion object {
val VOID = CompletedFuture(Unit)
}
}
private inner class RepeatableTimer(
task: Runnable,
initialDelay: Long,
val period: Long,
val fixedDelay: Boolean,
): FutureTask<Unit>({ task.run() }), ScheduledFuture<Unit> {
var next = initialDelay
private set
public override fun runAndReset(): Boolean {
if (fixedDelay) {
next += period
return super.runAndReset()
} else {
try {
return super.runAndReset()
} finally {
next += period
}
}
}
override fun compareTo(other: Delayed): Int {
return getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS))
}
override fun getDelay(unit: TimeUnit): Long {
return unit.convert(next, TimeUnit.NANOSECONDS) - timeOrigin.nanos
}
}
fun isSameThread(): Boolean {
return Thread.currentThread() === thread
}
fun executeQueuedTasks() {
thread = Thread.currentThread()
if (isShutdown) {
if (!isTerminated) {
isTerminated = true
executionLock.withLock {
timers.clear()
repeatableTimers.clear()
}
return
}
}
executionLock.withLock {
var next = futureQueue.poll()
while (next != null) {
if (isTerminated) return
next.run()
Thread.interrupted()
next = futureQueue.poll()
}
while (!timers.isEmpty()) {
if (isTerminated) return
val first = timers.first
if (first.isCancelled) {
timers.removeFirst()
} else if (first.executeAt <= timeOrigin.nanos) {
first.run()
Thread.interrupted()
timers.removeFirst()
} else {
break
}
}
if (repeatableTimers.isNotEmpty()) {
val executed = LinkedList<RepeatableTimer>()
while (repeatableTimers.isNotEmpty()) {
if (isTerminated) return
val first = repeatableTimers.first
if (first.isDone) {
repeatableTimers.removeFirst()
} else if (first.next <= timeOrigin.nanos) {
first.runAndReset()
executed.add(first)
repeatableTimers.removeFirst()
} else {
break
}
}
executed.forEach { repeatableTimers.enqueue(it) }
}
}
}
override fun execute(command: Runnable) {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) {
command.run()
} else {
futureQueue.add(FutureTask(command, Unit))
LockSupport.unpark(thread)
}
}
override fun shutdown() {
isShutdown = true
}
override fun shutdownNow(): MutableList<Runnable> {
isShutdown = true
isTerminated = true
val result = ArrayList<Runnable>()
executionLock.withLock {
futureQueue.forEach {
it.cancel(false)
result.add(it)
}
futureQueue.clear()
timers.forEach { it.cancel(false) }
repeatableTimers.forEach { it.cancel(false) }
timers.clear()
repeatableTimers.clear()
}
return result
}
override fun isShutdown(): Boolean {
return isShutdown
}
override fun isTerminated(): Boolean {
return isTerminated
}
override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean {
throw UnsupportedOperationException()
}
override fun <T : Any?> submit(task: Callable<T>): Future<T> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) return CompletedFuture(task.call())
return FutureTask(task).also { futureQueue.add(it); LockSupport.unpark(thread) }
}
override fun <T : Any?> submit(task: Runnable, result: T): Future<T> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) { task.run(); return CompletedFuture(result) }
return FutureTask { task.run(); result }.also { futureQueue.add(it); LockSupport.unpark(thread) }
}
override fun submit(task: Runnable): Future<*> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) { task.run(); return CompletedFuture.VOID }
return FutureTask { task.run() }.also { futureQueue.add(it); LockSupport.unpark(thread) }
}
override fun <T : Any?> invokeAll(tasks: Collection<Callable<T>>): List<Future<T>> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) {
return tasks.map { CompletedFuture(it.call()) }
} else {
return tasks.map { submit(it) }.onEach { it.get() }
}
}
override fun <T : Any?> invokeAll(
tasks: Collection<Callable<T>>,
timeout: Long,
unit: TimeUnit
): List<Future<T>> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) {
return tasks.map { CompletedFuture(it.call()) }
} else {
return tasks.map { submit(it) }.onEach { it.get(timeout, unit) }
}
}
override fun <T : Any?> invokeAny(tasks: Collection<Callable<T>>): T {
if (tasks.isEmpty())
throw NoSuchElementException("Provided task list is empty")
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) {
return tasks.first().call()
} else {
return submit(tasks.first()).get()
}
}
override fun <T : Any?> invokeAny(tasks: Collection<Callable<T>>, timeout: Long, unit: TimeUnit): T {
if (tasks.isEmpty())
throw NoSuchElementException("Provided task list is empty")
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (isSameThread()) {
return tasks.first().call()
} else {
return submit(tasks.first()).get(timeout, unit)
}
}
fun <V> join(future: Future<V>): V {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
if (!isSameThread())
return future.get()
while (!future.isDone) {
executeQueuedTasks()
LockSupport.parkNanos(1_000_000L)
}
return future.get()
}
override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
val timer = Timer({ command.run() }, timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(delay, unit))
if (isSameThread() && delay <= 0L) {
timer.run()
Thread.interrupted()
} else if (isSameThread()) {
timers.enqueue(timer)
} else {
execute {
if (timer.isCancelled) {
// do nothing
} else if (timer.executeAt <= timeOrigin.nanos) {
timer.run()
Thread.interrupted()
} else {
timers.enqueue(timer)
}
}
}
return timer
}
override fun <V : Any?> schedule(callable: Callable<V>, delay: Long, unit: TimeUnit): ScheduledFuture<V> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
val timer = Timer(callable, timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(delay, unit))
if (isSameThread() && delay <= 0L) {
timer.run()
Thread.interrupted()
} else if (isSameThread()) {
timers.enqueue(timer)
} else {
execute {
if (timer.isCancelled) {
// do nothing
} else if (timer.executeAt <= timeOrigin.nanos) {
timer.run()
Thread.interrupted()
} else {
timers.enqueue(timer)
}
}
}
return timer
}
override fun scheduleAtFixedRate(
command: Runnable,
initialDelay: Long,
period: Long,
unit: TimeUnit
): ScheduledFuture<*> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
return RepeatableTimer(
command,
timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit),
TimeUnit.NANOSECONDS.convert(period, unit), true)
.also {
execute {
if (it.isCancelled) {
// do nothing
} else {
repeatableTimers.enqueue(it)
}
}
}
}
override fun scheduleWithFixedDelay(
command: Runnable,
initialDelay: Long,
delay: Long,
unit: TimeUnit
): ScheduledFuture<*> {
if (isShutdown) throw RejectedExecutionException("This mailbox is shutting down")
return RepeatableTimer(
command,
timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit),
TimeUnit.NANOSECONDS.convert(delay, unit), false)
.also {
execute {
if (it.isCancelled) {
// do nothing
} else {
repeatableTimers.enqueue(it)
}
}
}
}
}

View File

@ -0,0 +1,53 @@
package ru.dbotthepony.kommons.util
interface ITimeSource {
val nanos: Long
val micros: Long get() = nanos / 1_000L
val millis: Long get() = nanos / 1_000_000L
val seconds: Double get() = (nanos / 1_000L) / 1_000_000.0
}
class JVMTimeSource : ITimeSource {
private val origin = System.nanoTime()
override val nanos: Long
get() = System.nanoTime() - origin
companion object {
@JvmField
val INSTANCE = JVMTimeSource()
}
}
class ArtificialTimeSource(nanos: Long = 0L) : ITimeSource {
override var nanos: Long = nanos
private set
fun advance(nanos: Long) {
this.nanos += nanos
}
}
class PausableTimeSource(private val parent: ITimeSource) : ITimeSource {
override val nanos: Long get() {
if (isPaused) {
return pausedSince - skipped
} else {
return parent.nanos - skipped
}
}
private var isPaused = false
private var pausedSince = 0L
private var skipped = 0L
fun pause() {
if (!isPaused) {
isPaused = true
pausedSince = parent.nanos
} else {
isPaused = false
skipped += parent.nanos - pausedSince
}
}
}