Add ScheduledExecutorService impl to ManualExecutorService

This commit is contained in:
DBotThePony 2023-10-13 00:42:10 +07:00
parent 906e6a0373
commit 2280882247
Signed by: DBot
GPG Key ID: DCC23B5715498507
2 changed files with 191 additions and 5 deletions

View File

@ -7,7 +7,6 @@ import org.lwjgl.glfw.GLFW.glfwSetWindowShouldClose
import ru.dbotthepony.kstarbound.client.StarboundClient
import ru.dbotthepony.kstarbound.defs.animation.AnimationDefinition
import ru.dbotthepony.kstarbound.io.BTreeDB
import ru.dbotthepony.kstarbound.io.json.BinaryJsonReader
import ru.dbotthepony.kstarbound.player.Avatar
import ru.dbotthepony.kstarbound.player.QuestDescriptor
import ru.dbotthepony.kstarbound.player.QuestInstance
@ -26,10 +25,8 @@ import java.io.ByteArrayInputStream
import java.io.DataInputStream
import java.io.File
import java.util.*
import java.util.concurrent.ForkJoinPool
import java.util.zip.Inflater
import java.util.zip.InflaterInputStream
import kotlin.system.exitProcess
private val LOGGER = LogManager.getLogger()

View File

@ -1,17 +1,91 @@
package ru.dbotthepony.kstarbound.util
import com.google.common.util.concurrent.Futures
import it.unimi.dsi.fastutil.objects.ObjectHeapPriorityQueue
import java.util.LinkedList
import java.util.concurrent.Callable
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.Delayed
import java.util.concurrent.Future
import java.util.concurrent.FutureTask
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.LockSupport
class ManualExecutorService(val thread: Thread = Thread.currentThread()) : ExecutorService {
private fun <E : Comparable<E>> LinkedList<E>.enqueue(value: E) {
if (isEmpty()) {
add(value)
} else if (first >= value) {
addFirst(value)
} else if (last <= value) {
addLast(value)
} else {
val iterator = listIterator()
while (iterator.hasNext()) {
val i = iterator.next()
if (i >= value) {
iterator.previous()
iterator.add(value)
break
}
}
}
}
class ManualExecutorService(val thread: Thread = Thread.currentThread()) : ScheduledExecutorService {
private val executeQueue = ConcurrentLinkedQueue<Runnable>()
private val futureQueue = ConcurrentLinkedQueue<FutureTask<*>>()
private val timerBacklog = ConcurrentLinkedQueue<Timer<*>>()
private val repeatableTimersBacklog = ConcurrentLinkedQueue<RepeatableTimer>()
private val timers = LinkedList<Timer<*>>()
private val repeatableTimers = LinkedList<RepeatableTimer>()
private val timeOrigin = JVMTimeSource()
private inner class Timer<T>(task: Callable<T>, val executeAt: Long) : FutureTask<T>(task), ScheduledFuture<T> {
override fun compareTo(other: Delayed): Int {
return getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS))
}
override fun getDelay(unit: TimeUnit): Long {
return unit.convert(executeAt, TimeUnit.NANOSECONDS) - timeOrigin.nanos
}
}
private inner class RepeatableTimer(
task: Runnable,
initialDelay: Long,
val period: Long,
val fixedDelay: Boolean,
): FutureTask<Unit>({ task.run() }), ScheduledFuture<Unit> {
var next = initialDelay
private set
public override fun runAndReset(): Boolean {
if (fixedDelay) {
next += period
return super.runAndReset()
} else {
try {
return super.runAndReset()
} finally {
next += period
}
}
}
override fun compareTo(other: Delayed): Int {
return getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS))
}
override fun getDelay(unit: TimeUnit): Long {
return unit.convert(next, TimeUnit.NANOSECONDS) - timeOrigin.nanos
}
}
fun isSameThread(): Boolean {
return Thread.currentThread() === thread
@ -34,6 +108,67 @@ class ManualExecutorService(val thread: Thread = Thread.currentThread()) : Execu
Thread.interrupted()
next2 = futureQueue.poll()
}
var next3 = timerBacklog.poll()
while (next3 != null) {
if (next3.isCancelled) {
// do nothing
} else if (next3.executeAt <= timeOrigin.nanos) {
next3.run()
Thread.interrupted()
} else {
timers.enqueue(next3)
}
next3 = timerBacklog.poll()
}
var next4 = repeatableTimersBacklog.poll()
while (next4 != null) {
if (next4.isCancelled) {
// do nothing
} else {
repeatableTimers.enqueue(next4)
}
next4 = repeatableTimersBacklog.poll()
}
while (!timers.isEmpty()) {
val first = timers.first
if (first.isCancelled) {
timers.removeFirst()
} else if (first.executeAt <= timeOrigin.nanos) {
first.run()
Thread.interrupted()
timers.removeFirst()
} else {
break
}
}
if (repeatableTimers.isNotEmpty()) {
val executed = LinkedList<RepeatableTimer>()
while (repeatableTimers.isNotEmpty()) {
val first = repeatableTimers.first
if (first.isDone) {
repeatableTimers.removeFirst()
} else if (first.next <= timeOrigin.nanos) {
first.runAndReset()
executed.add(first)
repeatableTimers.removeFirst()
} else {
break
}
}
executed.forEach { repeatableTimers.enqueue(it) }
}
}
override fun execute(command: Runnable) {
@ -133,4 +268,58 @@ class ManualExecutorService(val thread: Thread = Thread.currentThread()) : Execu
return future.get()
}
override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> {
val timer = Timer({ command.run() }, timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(delay, unit))
if (isSameThread() && delay <= 0L) {
timer.run()
Thread.interrupted()
} else if (isSameThread()) {
timers.enqueue(timer)
} else {
timerBacklog.add(timer)
}
return timer
}
override fun <V : Any?> schedule(callable: Callable<V>, delay: Long, unit: TimeUnit): ScheduledFuture<V> {
val timer = Timer(callable, timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(delay, unit))
if (isSameThread() && delay <= 0L) {
timer.run()
Thread.interrupted()
} else if (isSameThread()) {
timers.enqueue(timer)
} else {
timerBacklog.add(timer)
}
return timer
}
override fun scheduleAtFixedRate(
command: Runnable,
initialDelay: Long,
period: Long,
unit: TimeUnit
): ScheduledFuture<*> {
return RepeatableTimer(
command,
timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit),
TimeUnit.NANOSECONDS.convert(period, unit), true).also { repeatableTimersBacklog.add(it) }
}
override fun scheduleWithFixedDelay(
command: Runnable,
initialDelay: Long,
delay: Long,
unit: TimeUnit
): ScheduledFuture<*> {
return RepeatableTimer(
command,
timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit),
TimeUnit.NANOSECONDS.convert(delay, unit), false).also { repeatableTimersBacklog.add(it) }
}
}