Improvements to scheduling inside blockable event loop
This commit is contained in:
parent
01c7a29fe8
commit
17a3de38bc
@ -107,9 +107,9 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
|
|||||||
|
|
||||||
private fun eventLoopIteration(): Boolean {
|
private fun eventLoopIteration(): Boolean {
|
||||||
var executedAnything = false
|
var executedAnything = false
|
||||||
val next = eventQueue.poll()
|
var next = eventQueue.poll()
|
||||||
|
|
||||||
if (next != null) {
|
while (next != null) {
|
||||||
executedAnything = true
|
executedAnything = true
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -127,24 +127,25 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
|
|||||||
LOGGER.error("Caught an exception while propagating CompletableFuture to completeExceptionally stage", err)
|
LOGGER.error("Caught an exception while propagating CompletableFuture to completeExceptionally stage", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// keep executing queued tasks until we hit scheduled task deadline
|
||||||
|
if (scheduledQueue.isEmpty() || scheduledQueue.peek()!!.executeAt > System.nanoTime())
|
||||||
|
next = eventQueue.poll()
|
||||||
|
else
|
||||||
|
next = null
|
||||||
}
|
}
|
||||||
|
|
||||||
if (scheduledQueue.isNotEmpty()) {
|
if (scheduledQueue.isNotEmpty()) {
|
||||||
val executed = ObjectArrayList<ScheduledTask<*>>(4)
|
val executed = ObjectArrayList<ScheduledTask<*>>(4)
|
||||||
var lastSize: Int
|
|
||||||
|
|
||||||
do {
|
while (scheduledQueue.isNotEmpty() && (isShutdown || scheduledQueue.peek()!!.executeAt <= System.nanoTime())) {
|
||||||
lastSize = executed.size
|
executedAnything = true
|
||||||
|
val poll = scheduledQueue.poll()!!
|
||||||
|
|
||||||
while (scheduledQueue.isNotEmpty() && (isShutdown || scheduledQueue.peek()!!.executeAt <= System.nanoTime())) {
|
if (poll.perform(isShutdown)) {
|
||||||
executedAnything = true
|
executed.add(poll)
|
||||||
val poll = scheduledQueue.poll()!!
|
|
||||||
|
|
||||||
if (poll.perform(isShutdown)) {
|
|
||||||
executed.add(poll)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} while (lastSize != executed.size)
|
}
|
||||||
|
|
||||||
scheduledQueue.addAll(executed)
|
scheduledQueue.addAll(executed)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user