From 40d306544f4b35a0d122090934b08bbd2ea280c0 Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Thu, 31 Oct 2024 16:30:00 +0700 Subject: [PATCH] Even better event execution scheduling when eventloop is overloaded --- .../kstarbound/util/BlockableEventLoop.kt | 66 ++++++++++++++----- 1 file changed, 49 insertions(+), 17 deletions(-) diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/util/BlockableEventLoop.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/util/BlockableEventLoop.kt index d20d5e91..01c85cba 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/util/BlockableEventLoop.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/util/BlockableEventLoop.kt @@ -27,7 +27,10 @@ import kotlin.math.absoluteValue // if you try to use them by yourself :( // so I made my own open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorService { - private class ScheduledTask(callable: Callable, var executeAt: Long, val repeat: Boolean, val timeDelay: Long, val isFixedDelay: Boolean) : FutureTask(callable), ScheduledFuture { + private class ScheduledTask(callable: Callable, var executeAt: Long, val repeat: Boolean, val timeDelay: Long, val isFixedDelay: Boolean, generation: Long) : FutureTask(callable), ScheduledFuture { + var generation = generation + private set + override fun compareTo(other: Delayed): Int { if (other is ScheduledTask<*>) return executeAt.compareTo(other.executeAt) @@ -43,14 +46,14 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer return executeAt <= System.nanoTime() } - fun shouldEnqueue(isShutdown: Boolean): Boolean { + fun shouldEnqueue(isShutdown: Boolean, generation: Long): Boolean { if (isShutdown || executeAt <= System.nanoTime()) - return perform(isShutdown) + return perform(isShutdown, generation) return true } - fun perform(isShutdown: Boolean): Boolean { + fun perform(isShutdown: Boolean, generation: Long): Boolean { if (repeat) { if (isFixedDelay) { // fixed delay @@ -64,6 +67,7 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer val result = runAndReset() executeAt = System.nanoTime() + timeDelay + this.generation = generation return result && !isShutdown } else { // fixed rate @@ -80,6 +84,7 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer val result = runAndReset() val now = System.nanoTime() executeAt = now + timeDelay + deadlineMargin - (now - timeBefore) + this.generation = generation return result && !isShutdown } } else { @@ -91,6 +96,8 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer private class TaskPair(val future: CompletableFuture, var supplier: Callable?) + private var generation = 0L + private var extraBudget = 0L private val eventQueue = ConcurrentLinkedQueue>() private val scheduledQueue = PriorityQueue>() val coroutines = asCoroutineDispatcher() @@ -119,10 +126,12 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer private fun eventLoopIteration(): Boolean { var executedAnything = false + // poll queued event first, so we guarantee at least one is executed even if main event loop is very busy var next = eventQueue.poll() while (next != null) { executedAnything = true + val time = System.nanoTime() try { val callable = next.supplier @@ -140,26 +149,49 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer } } - // keep executing queued tasks until we hit scheduled task deadline - if (scheduledQueue.isEmpty() || !scheduledQueue.peek()!!.shouldExecuteNow()) + if (extraBudget > 0L) { + // event loop is overloaded, keep executing queued tasks until we empty execution time budget + extraBudget -= System.nanoTime() - time + next = eventQueue.poll() + } else if (scheduledQueue.isEmpty() || !scheduledQueue.peek()!!.shouldExecuteNow()) + // keep executing queued tasks until we hit scheduled task deadline next = eventQueue.poll() else next = null } if (scheduledQueue.isNotEmpty()) { + val time = System.nanoTime() val executed = ObjectArrayList>(4) + // first, poll all due events into local queue, and prioritize events by FIFO strategy based on how long ago they were scheduled + val queue = PriorityQueue>(Comparator { o1, o2 -> o1.generation.compareTo(o2.generation) }) while (scheduledQueue.isNotEmpty() && (isShutdown || scheduledQueue.peek()!!.shouldExecuteNow())) { - executedAnything = true - val poll = scheduledQueue.poll()!! + queue.add(scheduledQueue.poll()!!) + } - if (poll.perform(isShutdown)) { + executedAnything = executedAnything || queue.isNotEmpty() + + // then, execute collected events in proper order + while (queue.isNotEmpty()) { + val poll = queue.poll()!! + + if (poll.perform(isShutdown, generation++)) { executed.add(poll) } } scheduledQueue.addAll(executed) + + if (scheduledQueue.isNotEmpty() && !isShutdown && scheduledQueue.peek()!!.shouldExecuteNow()) { + // event loop is overloaded, interleave queued events with scheduled ones + extraBudget = System.nanoTime() - time + } else { + // no overload, normal queued events execution + extraBudget = 0L + } + } else { + extraBudget = 0L } return executedAnything @@ -422,10 +454,10 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer } final override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { - val task = ScheduledTask({ command.run() }, System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, unit), false, 0L, false) + val task = ScheduledTask({ command.run() }, System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, unit), false, 0L, false, generation++) execute { - if (task.shouldEnqueue(isShutdown)) + if (task.shouldEnqueue(isShutdown, generation++)) scheduledQueue.add(task) } @@ -433,10 +465,10 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer } final override fun schedule(callable: Callable, delay: Long, unit: TimeUnit): ScheduledFuture { - val task = ScheduledTask(callable, System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, unit), false, 0L, false) + val task = ScheduledTask(callable, System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, unit), false, 0L, false, generation++) execute { - if (task.shouldEnqueue(isShutdown)) + if (task.shouldEnqueue(isShutdown, generation++)) scheduledQueue.add(task) } @@ -449,10 +481,10 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer period: Long, unit: TimeUnit ): ScheduledFuture<*> { - val task = ScheduledTask({ command.run() }, System.nanoTime() + TimeUnit.NANOSECONDS.convert(initialDelay, unit), true, TimeUnit.NANOSECONDS.convert(period, unit), false) + val task = ScheduledTask({ command.run() }, System.nanoTime() + TimeUnit.NANOSECONDS.convert(initialDelay, unit), true, TimeUnit.NANOSECONDS.convert(period, unit), false, generation++) execute { - if (task.shouldEnqueue(isShutdown)) + if (task.shouldEnqueue(isShutdown, generation++)) scheduledQueue.add(task) } @@ -465,10 +497,10 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer delay: Long, unit: TimeUnit ): ScheduledFuture<*> { - val task = ScheduledTask({ command.run() }, System.nanoTime() + TimeUnit.NANOSECONDS.convert(initialDelay, unit), true, TimeUnit.NANOSECONDS.convert(delay, unit), true) + val task = ScheduledTask({ command.run() }, System.nanoTime() + TimeUnit.NANOSECONDS.convert(initialDelay, unit), true, TimeUnit.NANOSECONDS.convert(delay, unit), true, generation++) execute { - if (task.shouldEnqueue(isShutdown)) + if (task.shouldEnqueue(isShutdown, generation++)) scheduledQueue.add(task) }