From 228088224773ab64fdfede9431d343675c0f02bf Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Fri, 13 Oct 2023 00:42:10 +0700 Subject: [PATCH] Add ScheduledExecutorService impl to ManualExecutorService --- .../kotlin/ru/dbotthepony/kstarbound/Main.kt | 3 - .../kstarbound/util/ManualExecutorService.kt | 193 +++++++++++++++++- 2 files changed, 191 insertions(+), 5 deletions(-) diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/Main.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/Main.kt index be2e83b5..71ff8ef0 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/Main.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/Main.kt @@ -7,7 +7,6 @@ import org.lwjgl.glfw.GLFW.glfwSetWindowShouldClose import ru.dbotthepony.kstarbound.client.StarboundClient import ru.dbotthepony.kstarbound.defs.animation.AnimationDefinition import ru.dbotthepony.kstarbound.io.BTreeDB -import ru.dbotthepony.kstarbound.io.json.BinaryJsonReader import ru.dbotthepony.kstarbound.player.Avatar import ru.dbotthepony.kstarbound.player.QuestDescriptor import ru.dbotthepony.kstarbound.player.QuestInstance @@ -26,10 +25,8 @@ import java.io.ByteArrayInputStream import java.io.DataInputStream import java.io.File import java.util.* -import java.util.concurrent.ForkJoinPool import java.util.zip.Inflater import java.util.zip.InflaterInputStream -import kotlin.system.exitProcess private val LOGGER = LogManager.getLogger() diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/util/ManualExecutorService.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/util/ManualExecutorService.kt index 9ffc09d9..720f62d7 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/util/ManualExecutorService.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/util/ManualExecutorService.kt @@ -1,17 +1,91 @@ package ru.dbotthepony.kstarbound.util import com.google.common.util.concurrent.Futures +import it.unimi.dsi.fastutil.objects.ObjectHeapPriorityQueue +import java.util.LinkedList import java.util.concurrent.Callable import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.ExecutorService +import java.util.concurrent.Delayed import java.util.concurrent.Future import java.util.concurrent.FutureTask +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit import java.util.concurrent.locks.LockSupport -class ManualExecutorService(val thread: Thread = Thread.currentThread()) : ExecutorService { +private fun > LinkedList.enqueue(value: E) { + if (isEmpty()) { + add(value) + } else if (first >= value) { + addFirst(value) + } else if (last <= value) { + addLast(value) + } else { + val iterator = listIterator() + + while (iterator.hasNext()) { + val i = iterator.next() + + if (i >= value) { + iterator.previous() + iterator.add(value) + break + } + } + } +} + +class ManualExecutorService(val thread: Thread = Thread.currentThread()) : ScheduledExecutorService { private val executeQueue = ConcurrentLinkedQueue() private val futureQueue = ConcurrentLinkedQueue>() + private val timerBacklog = ConcurrentLinkedQueue>() + private val repeatableTimersBacklog = ConcurrentLinkedQueue() + + private val timers = LinkedList>() + private val repeatableTimers = LinkedList() + + private val timeOrigin = JVMTimeSource() + + private inner class Timer(task: Callable, val executeAt: Long) : FutureTask(task), ScheduledFuture { + override fun compareTo(other: Delayed): Int { + return getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS)) + } + + override fun getDelay(unit: TimeUnit): Long { + return unit.convert(executeAt, TimeUnit.NANOSECONDS) - timeOrigin.nanos + } + } + + private inner class RepeatableTimer( + task: Runnable, + initialDelay: Long, + val period: Long, + val fixedDelay: Boolean, + ): FutureTask({ task.run() }), ScheduledFuture { + var next = initialDelay + private set + + public override fun runAndReset(): Boolean { + if (fixedDelay) { + next += period + return super.runAndReset() + } else { + try { + return super.runAndReset() + } finally { + next += period + } + } + } + + override fun compareTo(other: Delayed): Int { + return getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS)) + } + + override fun getDelay(unit: TimeUnit): Long { + return unit.convert(next, TimeUnit.NANOSECONDS) - timeOrigin.nanos + } + } fun isSameThread(): Boolean { return Thread.currentThread() === thread @@ -34,6 +108,67 @@ class ManualExecutorService(val thread: Thread = Thread.currentThread()) : Execu Thread.interrupted() next2 = futureQueue.poll() } + + var next3 = timerBacklog.poll() + + while (next3 != null) { + if (next3.isCancelled) { + // do nothing + } else if (next3.executeAt <= timeOrigin.nanos) { + next3.run() + Thread.interrupted() + } else { + timers.enqueue(next3) + } + + next3 = timerBacklog.poll() + } + + var next4 = repeatableTimersBacklog.poll() + + while (next4 != null) { + if (next4.isCancelled) { + // do nothing + } else { + repeatableTimers.enqueue(next4) + } + + next4 = repeatableTimersBacklog.poll() + } + + while (!timers.isEmpty()) { + val first = timers.first + + if (first.isCancelled) { + timers.removeFirst() + } else if (first.executeAt <= timeOrigin.nanos) { + first.run() + Thread.interrupted() + timers.removeFirst() + } else { + break + } + } + + if (repeatableTimers.isNotEmpty()) { + val executed = LinkedList() + + while (repeatableTimers.isNotEmpty()) { + val first = repeatableTimers.first + + if (first.isDone) { + repeatableTimers.removeFirst() + } else if (first.next <= timeOrigin.nanos) { + first.runAndReset() + executed.add(first) + repeatableTimers.removeFirst() + } else { + break + } + } + + executed.forEach { repeatableTimers.enqueue(it) } + } } override fun execute(command: Runnable) { @@ -133,4 +268,58 @@ class ManualExecutorService(val thread: Thread = Thread.currentThread()) : Execu return future.get() } + + override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { + val timer = Timer({ command.run() }, timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(delay, unit)) + + if (isSameThread() && delay <= 0L) { + timer.run() + Thread.interrupted() + } else if (isSameThread()) { + timers.enqueue(timer) + } else { + timerBacklog.add(timer) + } + + return timer + } + + override fun schedule(callable: Callable, delay: Long, unit: TimeUnit): ScheduledFuture { + val timer = Timer(callable, timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(delay, unit)) + + if (isSameThread() && delay <= 0L) { + timer.run() + Thread.interrupted() + } else if (isSameThread()) { + timers.enqueue(timer) + } else { + timerBacklog.add(timer) + } + + return timer + } + + override fun scheduleAtFixedRate( + command: Runnable, + initialDelay: Long, + period: Long, + unit: TimeUnit + ): ScheduledFuture<*> { + return RepeatableTimer( + command, + timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit), + TimeUnit.NANOSECONDS.convert(period, unit), true).also { repeatableTimersBacklog.add(it) } + } + + override fun scheduleWithFixedDelay( + command: Runnable, + initialDelay: Long, + delay: Long, + unit: TimeUnit + ): ScheduledFuture<*> { + return RepeatableTimer( + command, + timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit), + TimeUnit.NANOSECONDS.convert(delay, unit), false).also { repeatableTimersBacklog.add(it) } + } }