Even better event execution scheduling when eventloop is overloaded
This commit is contained in:
parent
09f7e8ac70
commit
40d306544f
@ -27,7 +27,10 @@ import kotlin.math.absoluteValue
|
||||
// if you try to use them by yourself :(
|
||||
// so I made my own
|
||||
open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorService {
|
||||
private class ScheduledTask<T>(callable: Callable<T>, var executeAt: Long, val repeat: Boolean, val timeDelay: Long, val isFixedDelay: Boolean) : FutureTask<T>(callable), ScheduledFuture<T> {
|
||||
private class ScheduledTask<T>(callable: Callable<T>, var executeAt: Long, val repeat: Boolean, val timeDelay: Long, val isFixedDelay: Boolean, generation: Long) : FutureTask<T>(callable), ScheduledFuture<T> {
|
||||
var generation = generation
|
||||
private set
|
||||
|
||||
override fun compareTo(other: Delayed): Int {
|
||||
if (other is ScheduledTask<*>)
|
||||
return executeAt.compareTo(other.executeAt)
|
||||
@ -43,14 +46,14 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
|
||||
return executeAt <= System.nanoTime()
|
||||
}
|
||||
|
||||
fun shouldEnqueue(isShutdown: Boolean): Boolean {
|
||||
fun shouldEnqueue(isShutdown: Boolean, generation: Long): Boolean {
|
||||
if (isShutdown || executeAt <= System.nanoTime())
|
||||
return perform(isShutdown)
|
||||
return perform(isShutdown, generation)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
fun perform(isShutdown: Boolean): Boolean {
|
||||
fun perform(isShutdown: Boolean, generation: Long): Boolean {
|
||||
if (repeat) {
|
||||
if (isFixedDelay) {
|
||||
// fixed delay
|
||||
@ -64,6 +67,7 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
|
||||
|
||||
val result = runAndReset()
|
||||
executeAt = System.nanoTime() + timeDelay
|
||||
this.generation = generation
|
||||
return result && !isShutdown
|
||||
} else {
|
||||
// fixed rate
|
||||
@ -80,6 +84,7 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
|
||||
val result = runAndReset()
|
||||
val now = System.nanoTime()
|
||||
executeAt = now + timeDelay + deadlineMargin - (now - timeBefore)
|
||||
this.generation = generation
|
||||
return result && !isShutdown
|
||||
}
|
||||
} else {
|
||||
@ -91,6 +96,8 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
|
||||
|
||||
private class TaskPair<T>(val future: CompletableFuture<T>, var supplier: Callable<T>?)
|
||||
|
||||
private var generation = 0L
|
||||
private var extraBudget = 0L
|
||||
private val eventQueue = ConcurrentLinkedQueue<TaskPair<*>>()
|
||||
private val scheduledQueue = PriorityQueue<ScheduledTask<*>>()
|
||||
val coroutines = asCoroutineDispatcher()
|
||||
@ -119,10 +126,12 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
|
||||
|
||||
private fun eventLoopIteration(): Boolean {
|
||||
var executedAnything = false
|
||||
// poll queued event first, so we guarantee at least one is executed even if main event loop is very busy
|
||||
var next = eventQueue.poll()
|
||||
|
||||
while (next != null) {
|
||||
executedAnything = true
|
||||
val time = System.nanoTime()
|
||||
|
||||
try {
|
||||
val callable = next.supplier
|
||||
@ -140,26 +149,49 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
|
||||
}
|
||||
}
|
||||
|
||||
// keep executing queued tasks until we hit scheduled task deadline
|
||||
if (scheduledQueue.isEmpty() || !scheduledQueue.peek()!!.shouldExecuteNow())
|
||||
if (extraBudget > 0L) {
|
||||
// event loop is overloaded, keep executing queued tasks until we empty execution time budget
|
||||
extraBudget -= System.nanoTime() - time
|
||||
next = eventQueue.poll()
|
||||
} else if (scheduledQueue.isEmpty() || !scheduledQueue.peek()!!.shouldExecuteNow())
|
||||
// keep executing queued tasks until we hit scheduled task deadline
|
||||
next = eventQueue.poll()
|
||||
else
|
||||
next = null
|
||||
}
|
||||
|
||||
if (scheduledQueue.isNotEmpty()) {
|
||||
val time = System.nanoTime()
|
||||
val executed = ObjectArrayList<ScheduledTask<*>>(4)
|
||||
// first, poll all due events into local queue, and prioritize events by FIFO strategy based on how long ago they were scheduled
|
||||
val queue = PriorityQueue<ScheduledTask<*>>(Comparator { o1, o2 -> o1.generation.compareTo(o2.generation) })
|
||||
|
||||
while (scheduledQueue.isNotEmpty() && (isShutdown || scheduledQueue.peek()!!.shouldExecuteNow())) {
|
||||
executedAnything = true
|
||||
val poll = scheduledQueue.poll()!!
|
||||
queue.add(scheduledQueue.poll()!!)
|
||||
}
|
||||
|
||||
if (poll.perform(isShutdown)) {
|
||||
executedAnything = executedAnything || queue.isNotEmpty()
|
||||
|
||||
// then, execute collected events in proper order
|
||||
while (queue.isNotEmpty()) {
|
||||
val poll = queue.poll()!!
|
||||
|
||||
if (poll.perform(isShutdown, generation++)) {
|
||||
executed.add(poll)
|
||||
}
|
||||
}
|
||||
|
||||
scheduledQueue.addAll(executed)
|
||||
|
||||
if (scheduledQueue.isNotEmpty() && !isShutdown && scheduledQueue.peek()!!.shouldExecuteNow()) {
|
||||
// event loop is overloaded, interleave queued events with scheduled ones
|
||||
extraBudget = System.nanoTime() - time
|
||||
} else {
|
||||
// no overload, normal queued events execution
|
||||
extraBudget = 0L
|
||||
}
|
||||
} else {
|
||||
extraBudget = 0L
|
||||
}
|
||||
|
||||
return executedAnything
|
||||
@ -422,10 +454,10 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
|
||||
}
|
||||
|
||||
final override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> {
|
||||
val task = ScheduledTask({ command.run() }, System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, unit), false, 0L, false)
|
||||
val task = ScheduledTask({ command.run() }, System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, unit), false, 0L, false, generation++)
|
||||
|
||||
execute {
|
||||
if (task.shouldEnqueue(isShutdown))
|
||||
if (task.shouldEnqueue(isShutdown, generation++))
|
||||
scheduledQueue.add(task)
|
||||
}
|
||||
|
||||
@ -433,10 +465,10 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
|
||||
}
|
||||
|
||||
final override fun <V> schedule(callable: Callable<V>, delay: Long, unit: TimeUnit): ScheduledFuture<V> {
|
||||
val task = ScheduledTask(callable, System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, unit), false, 0L, false)
|
||||
val task = ScheduledTask(callable, System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, unit), false, 0L, false, generation++)
|
||||
|
||||
execute {
|
||||
if (task.shouldEnqueue(isShutdown))
|
||||
if (task.shouldEnqueue(isShutdown, generation++))
|
||||
scheduledQueue.add(task)
|
||||
}
|
||||
|
||||
@ -449,10 +481,10 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
|
||||
period: Long,
|
||||
unit: TimeUnit
|
||||
): ScheduledFuture<*> {
|
||||
val task = ScheduledTask({ command.run() }, System.nanoTime() + TimeUnit.NANOSECONDS.convert(initialDelay, unit), true, TimeUnit.NANOSECONDS.convert(period, unit), false)
|
||||
val task = ScheduledTask({ command.run() }, System.nanoTime() + TimeUnit.NANOSECONDS.convert(initialDelay, unit), true, TimeUnit.NANOSECONDS.convert(period, unit), false, generation++)
|
||||
|
||||
execute {
|
||||
if (task.shouldEnqueue(isShutdown))
|
||||
if (task.shouldEnqueue(isShutdown, generation++))
|
||||
scheduledQueue.add(task)
|
||||
}
|
||||
|
||||
@ -465,10 +497,10 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
|
||||
delay: Long,
|
||||
unit: TimeUnit
|
||||
): ScheduledFuture<*> {
|
||||
val task = ScheduledTask({ command.run() }, System.nanoTime() + TimeUnit.NANOSECONDS.convert(initialDelay, unit), true, TimeUnit.NANOSECONDS.convert(delay, unit), true)
|
||||
val task = ScheduledTask({ command.run() }, System.nanoTime() + TimeUnit.NANOSECONDS.convert(initialDelay, unit), true, TimeUnit.NANOSECONDS.convert(delay, unit), true, generation++)
|
||||
|
||||
execute {
|
||||
if (task.shouldEnqueue(isShutdown))
|
||||
if (task.shouldEnqueue(isShutdown, generation++))
|
||||
scheduledQueue.add(task)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user