KStarbound/src/main/kotlin/ru/dbotthepony/kstarbound/util/BlockableEventLoop.kt

462 lines
12 KiB
Kotlin

package ru.dbotthepony.kstarbound.util
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.cancel
import org.apache.logging.log4j.LogManager
import java.util.PriorityQueue
import java.util.concurrent.Callable
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Delayed
import java.util.concurrent.Future
import java.util.concurrent.FutureTask
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.LockSupport
import java.util.function.Supplier
// I tried to make use of Netty's event loops, but they seem to be a bit overcomplicated
// if you try to use them by yourself :(
// so I made my own
open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorService {
private class ScheduledTask<T>(callable: Callable<T>, var executeAt: Long, val repeat: Boolean, val timeDelay: Long, val isFixedDelay: Boolean) : FutureTask<T>(callable), ScheduledFuture<T> {
override fun compareTo(other: Delayed): Int {
return getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS))
}
override fun getDelay(unit: TimeUnit): Long {
return executeAt - System.nanoTime()
}
fun shouldEnqueue(isShutdown: Boolean): Boolean {
if (isShutdown || executeAt <= System.nanoTime())
return perform(isShutdown)
return true
}
fun perform(isShutdown: Boolean): Boolean {
if (repeat) {
if (isFixedDelay) {
// fixed delay
val deadlineMargin = executeAt - System.nanoTime()
if (deadlineMargin <= -5_000_000_000L) {
if (!IS_IN_IDE) // since this will get spammed if debugging using breakpoints
LOGGER.warn("Event loop missed scheduled deadline by ${-deadlineMargin / 1_000_000L} milliseconds")
}
runAndReset()
executeAt = System.nanoTime() + timeDelay
} else {
// fixed rate
val timeBefore = System.nanoTime()
var deadlineMargin = executeAt - System.nanoTime()
if (deadlineMargin <= -5_000_000_000L) {
if (!IS_IN_IDE) // since this will get spammed if debugging using breakpoints
LOGGER.warn("Event loop missed scheduled deadline by ${-deadlineMargin / 1_000_000L} milliseconds, clamping to 5 seconds")
deadlineMargin = -5_000_000_000L
}
runAndReset()
val now = System.nanoTime()
executeAt = now + timeDelay + deadlineMargin - (now - timeBefore)
}
return !isShutdown
} else {
run()
return false
}
}
}
private class TaskPair<T>(val future: CompletableFuture<T>, var supplier: Callable<T>?)
private val eventQueue = ConcurrentLinkedQueue<TaskPair<*>>()
private val scheduledQueue = PriorityQueue<ScheduledTask<*>>()
val coroutines = asCoroutineDispatcher()
val scope = CoroutineScope(coroutines + SupervisorJob())
private fun nextDeadline(): Long {
if (isShutdown || eventQueue.isNotEmpty())
return 0L
val poll = scheduledQueue.peek()
if (poll == null) {
return Long.MAX_VALUE
} else {
return poll.executeAt - System.nanoTime()
}
}
@Volatile
private var isShutdown = false
private var isRunning = true
private fun eventLoopIteration(): Boolean {
var executedAnything = false
val next = eventQueue.poll()
if (next != null) {
executedAnything = true
try {
val callable = next.supplier
if (callable != null) {
(next.future as CompletableFuture<Any?>).complete(callable.call())
}
} catch (err: Throwable) {
LOGGER.error("Error executing scheduled task", err)
try {
next.future.completeExceptionally(err)
} catch (err: Throwable) {
LOGGER.error("Caught an exception while propagating CompletableFuture to completeExceptionally stage", err)
}
}
}
if (scheduledQueue.isNotEmpty()) {
val executed = ArrayList<ScheduledTask<*>>()
var lastSize: Int
do {
lastSize = executed.size
while (scheduledQueue.isNotEmpty() && (isShutdown || scheduledQueue.peek()!!.executeAt <= System.nanoTime())) {
executedAnything = true
val poll = scheduledQueue.poll()!!
if (poll.perform(isShutdown)) {
executed.add(poll)
}
}
} while (lastSize != executed.size)
scheduledQueue.addAll(executed)
}
return executedAnything
}
final override fun run() {
while (isRunning) {
LockSupport.parkNanos(nextDeadline())
eventLoopIteration()
if (isShutdown && isRunning) {
while (eventLoopIteration()) {}
isRunning = false
scope.cancel(CancellationException("EventLoop shut down"))
performShutdown()
}
}
LOGGER.info("Thread ${this.name} stopped gracefully")
}
final override fun execute(command: Runnable) {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
if (currentThread() === this) {
command.run()
} else {
val future = CompletableFuture<Unit>()
val pair = TaskPair(future) { command.run() }
future.exceptionally {
pair.supplier = null
}
eventQueue.add(pair)
LockSupport.unpark(this)
}
}
final override fun <T> submit(task: Callable<T>): CompletableFuture<T> {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
if (currentThread() === this) {
try {
return CompletableFuture.completedFuture(task.call())
} catch (err: Throwable) {
return CompletableFuture.failedFuture(err)
}
} else {
val future = CompletableFuture<T>()
val pair = TaskPair(future, task)
future.exceptionally {
pair.supplier = null
null
}
eventQueue.add(pair)
LockSupport.unpark(this)
return future
}
}
fun <T> supplyAsync(task: Supplier<T>): CompletableFuture<T> {
return submit(task::get)
}
fun <T> supplyAsync(task: () -> T): CompletableFuture<T> {
return submit(task::invoke)
}
fun ensureSameThread() {
check(this === currentThread()) { "Performing non-threadsafe operation outside of event loop thread" }
}
fun isSameThread() = this === currentThread()
final override fun submit(task: Runnable): CompletableFuture<*> {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
if (currentThread() === this) {
try {
return CompletableFuture.completedFuture(task.run())
} catch (err: Throwable) {
return CompletableFuture.failedFuture<Unit>(err)
}
} else {
val future = CompletableFuture<Unit>()
val pair = TaskPair(future) { task.run() }
future.exceptionally {
pair.supplier = null
}
eventQueue.add(pair)
LockSupport.unpark(this)
return future
}
}
final override fun <T> submit(task: Runnable, result: T): CompletableFuture<T> {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
if (currentThread() === this) {
try {
task.run()
return CompletableFuture.completedFuture(result)
} catch (err: Throwable) {
return CompletableFuture.failedFuture(err)
}
} else {
val future = CompletableFuture<T>()
val pair = TaskPair(future) { task.run(); result }
future.exceptionally {
pair.supplier = null
null
}
eventQueue.add(pair)
LockSupport.unpark(this)
return future
}
}
final override fun <T> invokeAll(tasks: Collection<Callable<T>>): List<Future<T>> {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
return tasks.map { submit(it) }
}
final override fun <T> invokeAll(tasks: Collection<Callable<T>>, timeout: Long, unit: TimeUnit): List<Future<T>> {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
val futures = tasks.map { submit(it) }
CompletableFuture.allOf(*futures.toTypedArray()).get(timeout, unit)
return futures
}
final override fun <T> invokeAny(tasks: Collection<Callable<T>>): T {
if (!isRunning)
throw RejectedExecutionException("EventLoop shut down")
return submit(tasks.first()).get()
}
final override fun <T> invokeAny(tasks: Collection<Callable<T>>, timeout: Long, unit: TimeUnit): T {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
return submit(tasks.first()).get(timeout, unit)
}
final override fun shutdown() {
if (!isShutdown) {
isShutdown = true
if (currentThread() === this || state == State.NEW) {
while (eventLoopIteration()) {}
while (scheduledQueue.isNotEmpty()) {
val remove = scheduledQueue.remove()
try {
remove.cancel(false)
} catch (err: Throwable) {
LOGGER.warn("Caught exception while cancelling future during event loop shutdown", err)
}
}
isRunning = false
scope.cancel(CancellationException("EventLoop shut down"))
performShutdown()
} else {
// wake up thread
LockSupport.unpark(this)
}
}
}
protected open fun performShutdown() {
}
final override fun shutdownNow(): List<Runnable> {
if (!isShutdown) {
isShutdown = true
if (currentThread() === this) {
while (eventQueue.isNotEmpty()) {
val remove = eventQueue.remove()
try {
remove.future.cancel(false)
} catch (err: Throwable) {
LOGGER.warn("Caught exception while cancelling future during event loop shutdown", err)
}
}
while (scheduledQueue.isNotEmpty()) {
val remove = scheduledQueue.remove()
try {
remove.cancel(false)
} catch (err: Throwable) {
LOGGER.warn("Caught exception while cancelling future during event loop shutdown", err)
}
}
isRunning = false
scope.cancel(CancellationException("EventLoop shut down"))
performShutdown()
} else {
// wake up thread
LockSupport.unpark(this)
}
}
return emptyList()
}
final override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean {
// lazy wait loop
var budget = TimeUnit.NANOSECONDS.convert(timeout, unit)
var origin = System.nanoTime()
while (budget > 0L && isRunning) {
val new = System.nanoTime()
budget -= new - origin
origin = new
LockSupport.parkNanos(budget.coerceAtMost(500_000L))
if (interrupted()) {
return !isRunning
}
}
return !isRunning
}
final override fun isShutdown(): Boolean {
return isShutdown
}
final override fun isTerminated(): Boolean {
return !isRunning
}
final override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> {
val task = ScheduledTask({ command.run() }, System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, unit), false, 0L, false)
execute {
if (task.shouldEnqueue(isShutdown))
scheduledQueue.add(task)
}
return task
}
final override fun <V> schedule(callable: Callable<V>, delay: Long, unit: TimeUnit): ScheduledFuture<V> {
val task = ScheduledTask(callable, System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, unit), false, 0L, false)
execute {
if (task.shouldEnqueue(isShutdown))
scheduledQueue.add(task)
}
return task
}
final override fun scheduleAtFixedRate(
command: Runnable,
initialDelay: Long,
period: Long,
unit: TimeUnit
): ScheduledFuture<*> {
val task = ScheduledTask({ command.run() }, System.nanoTime() + TimeUnit.NANOSECONDS.convert(initialDelay, unit), true, TimeUnit.NANOSECONDS.convert(period, unit), false)
execute {
if (task.shouldEnqueue(isShutdown))
scheduledQueue.add(task)
}
return task
}
final override fun scheduleWithFixedDelay(
command: Runnable,
initialDelay: Long,
delay: Long,
unit: TimeUnit
): ScheduledFuture<*> {
val task = ScheduledTask({ command.run() }, System.nanoTime() + TimeUnit.NANOSECONDS.convert(initialDelay, unit), true, TimeUnit.NANOSECONDS.convert(delay, unit), true)
execute {
if (task.shouldEnqueue(isShutdown))
scheduledQueue.add(task)
}
return task
}
companion object {
private val LOGGER = LogManager.getLogger()
private val IS_IN_IDE = java.lang.management.ManagementFactory.getRuntimeMXBean().inputArguments.toString().contains("-agentlib:jdwp")
}
}