diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/Starbound.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/Starbound.kt index 72b44574..d1a3b537 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/Starbound.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/Starbound.kt @@ -74,10 +74,10 @@ import ru.dbotthepony.kstarbound.math.Vector4fTypeAdapter import ru.dbotthepony.kstarbound.math.Vector4iTypeAdapter import ru.dbotthepony.kstarbound.util.AssetPathStack import ru.dbotthepony.kstarbound.util.BlockableEventLoop -import ru.dbotthepony.kstarbound.util.ExecutorWithScheduler import ru.dbotthepony.kstarbound.util.Directives import ru.dbotthepony.kstarbound.util.SBPattern import ru.dbotthepony.kstarbound.util.HashTableInterner +import ru.dbotthepony.kstarbound.util.ScheduledCoroutineExecutor import ru.dbotthepony.kstarbound.util.random.AbstractPerlinNoise import ru.dbotthepony.kstarbound.util.random.nextRange import ru.dbotthepony.kstarbound.util.random.random @@ -172,12 +172,12 @@ object Starbound : BlockableEventLoop("Universe Thread"), Scheduler, ISBFileLoca val IO_EXECUTOR: ExecutorService = makeExecutor(8, "Disk IO %d", MIN_PRIORITY) @JvmField - val IO_COROUTINES = ExecutorWithScheduler(IO_EXECUTOR, this).asCoroutineDispatcher() + val IO_COROUTINES = ScheduledCoroutineExecutor(IO_EXECUTOR) @JvmField val EXECUTOR: ForkJoinPool = makeExecutor(Runtime.getRuntime().availableProcessors(), "Worker %d", NORM_PRIORITY) @JvmField - val COROUTINES = ExecutorWithScheduler(EXECUTOR, this).asCoroutineDispatcher() + val COROUTINES = ScheduledCoroutineExecutor(EXECUTOR) @JvmField val GLOBAL_SCOPE = CoroutineScope(COROUTINES + SupervisorJob()) diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerUniverse.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerUniverse.kt index 611b7fa7..e1ea75c6 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerUniverse.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerUniverse.kt @@ -11,6 +11,7 @@ import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.async import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay import kotlinx.coroutines.future.asCompletableFuture import kotlinx.coroutines.future.await import ru.dbotthepony.kommons.gson.JsonArrayCollector @@ -36,6 +37,7 @@ import ru.dbotthepony.kstarbound.json.writeJsonElementDeflated import ru.dbotthepony.kstarbound.math.Line2d import ru.dbotthepony.kstarbound.math.vector.Vector3i import ru.dbotthepony.kstarbound.util.CarriedExecutor +import ru.dbotthepony.kstarbound.util.ScheduledCoroutineExecutor import ru.dbotthepony.kstarbound.util.binnedChoice import ru.dbotthepony.kstarbound.util.paddedNumber import ru.dbotthepony.kstarbound.util.random.AbstractPerlinNoise @@ -56,6 +58,7 @@ import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit import java.util.function.Consumer +import java.util.function.Function import java.util.function.Supplier import java.util.random.RandomGenerator import kotlin.collections.ArrayList @@ -121,7 +124,7 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable { } private val carrier = CarriedExecutor(Starbound.IO_EXECUTOR) - private val scope = CoroutineScope(carrier.asCoroutineDispatcher() + SupervisorJob()) + private val scope = CoroutineScope(ScheduledCoroutineExecutor(carrier) + SupervisorJob()) private val selectChunk = database.prepareStatement("SELECT `systems`, `constellations` FROM `chunk` WHERE `x` = ? AND `y` = ?") private val selectSystem = database.prepareStatement("SELECT `parameters`, `planets` FROM `system` WHERE `x` = ? AND `y` = ? AND `z` = ?") @@ -218,33 +221,28 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable { selectChunk.setInt(1, pos.x) selectChunk.setInt(2, pos.y) - val existing = selectChunk.executeQuery().use { + selectChunk.executeQuery().use { if (it.next()) { - Chunk(pos.x, pos.y, it) - } else { - null + return Chunk(pos.x, pos.y, it) } } - if (existing != null) { - chunkFutures.remove(pos) - return existing - } - // TODO // load legacy chunk here - val generated = generateChunk(pos).await() - generated.write(insertChunk) - chunkFutures.remove(pos) - database.commit() - return generated + return generateChunk(pos).await() } private fun getChunk(pos: Vector2i): CompletableFuture { return chunksCache.get(pos) { chunkFutures.computeIfAbsent(it) { - scope.async { getChunk0(it) }.asCompletableFuture() + scope.async { + try { + getChunk0(it) + } finally { + chunkFutures.remove(it) + } + }.asCompletableFuture() } } } @@ -290,9 +288,7 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable { if (existing != null) { // hit, system already exists - val wait = existing.await() - systemFutures.remove(pos) - return wait + return existing.await() } // lets try to get chunk this system is in @@ -304,13 +300,19 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable { if (pos !in chunk.systems) return null - return loadSystem(pos)?.await() + return loadSystem(pos)!!.await() } private fun getSystem(pos: Vector3i): CompletableFuture { return systemCache.get(pos) { systemFutures.computeIfAbsent(it) { - scope.async { loadOrComputeSystem(it) }.asCompletableFuture() + scope.async { + try { + loadOrComputeSystem(it) + } finally { + systemFutures.remove(it) + } + }.asCompletableFuture() } } } @@ -557,11 +559,12 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable { } } - CompletableFuture.allOf(*systems.toTypedArray()) - .thenRunAsync(Runnable { database.commit() }, carrier) + val chunk = Chunk(chunkPos.x, chunkPos.y, ObjectOpenHashSet(systemPositions), ObjectOpenHashSet(generateConstellations(random, constellationCandidates))) + val serialized = chunk.serialize() - Chunk(chunkPos.x, chunkPos.y, ObjectOpenHashSet(systemPositions), ObjectOpenHashSet(generateConstellations(random, constellationCandidates))) - }, Starbound.EXECUTOR) + CompletableFuture.allOf(*systems.toTypedArray()) + .thenApplyAsync(Function { serialized.write(insertChunk); database.commit(); chunk }, carrier) + }, Starbound.EXECUTOR).thenCompose { it } } private fun generateSystem(random: RandomGenerator, location: Vector3i): System? { diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/util/ExecutorWithScheduler.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/util/ExecutorWithScheduler.kt deleted file mode 100644 index 17c1999d..00000000 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/util/ExecutorWithScheduler.kt +++ /dev/null @@ -1,58 +0,0 @@ -package ru.dbotthepony.kstarbound.util - -import java.util.concurrent.Callable -import java.util.concurrent.CompletableFuture -import java.util.concurrent.CompletionStage -import java.util.concurrent.Delayed -import java.util.concurrent.ExecutorService -import java.util.concurrent.Future -import java.util.concurrent.ScheduledExecutorService -import java.util.concurrent.ScheduledFuture -import java.util.concurrent.TimeUnit - -class ExecutorWithScheduler(val executor: ExecutorService, val scheduler: ScheduledExecutorService) : ExecutorService by executor, ScheduledExecutorService { - override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { - return scheduler.schedule(Runnable { - executor.submit(command) - }, delay, unit) - } - - private class Proxy(val future: CompletableFuture, val parent: ScheduledFuture<*>) : Future by future, ScheduledFuture { - override fun compareTo(other: Delayed?): Int { - return parent.compareTo(other) - } - - override fun getDelay(unit: TimeUnit): Long { - return parent.getDelay(unit) - } - } - - // won't react to cancels... man. - override fun schedule(callable: Callable, delay: Long, unit: TimeUnit): ScheduledFuture { - val future = CompletableFuture>() - val scheduled = scheduler.schedule(Callable { future.complete(CompletableFuture.supplyAsync(callable::call, executor)) }, delay, unit) - return Proxy(future.thenCompose { it }, scheduled) - } - - override fun scheduleAtFixedRate( - command: Runnable, - initialDelay: Long, - period: Long, - unit: TimeUnit - ): ScheduledFuture<*> { - return scheduler.scheduleAtFixedRate(Runnable { - executor.submit(command) - }, initialDelay, period, unit) - } - - override fun scheduleWithFixedDelay( - command: Runnable, - initialDelay: Long, - delay: Long, - unit: TimeUnit - ): ScheduledFuture<*> { - return scheduler.scheduleWithFixedDelay(Runnable { - executor.submit(command) - }, initialDelay, delay, unit) - } -} \ No newline at end of file diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/util/ScheduledCoroutineExecutor.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/util/ScheduledCoroutineExecutor.kt new file mode 100644 index 00000000..30bc71ac --- /dev/null +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/util/ScheduledCoroutineExecutor.kt @@ -0,0 +1,56 @@ +package ru.dbotthepony.kstarbound.util + +import kotlinx.coroutines.CancellableContinuation +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Delay +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.DisposableHandle +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.Runnable +import kotlinx.coroutines.cancel +import ru.dbotthepony.kstarbound.Starbound +import java.util.concurrent.Executor +import java.util.concurrent.Future +import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.TimeUnit +import kotlin.coroutines.CoroutineContext + +/** + * Uses [Starbound] as delay scheduler instead of Kotlin's one + */ +@OptIn(InternalCoroutinesApi::class) +class ScheduledCoroutineExecutor(val executor: Executor) : CoroutineDispatcher(), Delay { + override fun dispatch(context: CoroutineContext, block: Runnable) { + try { + executor.execute(block) + } catch (err: RejectedExecutionException) { + context.cancel(CancellationException("The task was rejected", err)) + Dispatchers.IO.dispatch(context, block) + } + } + + @OptIn(ExperimentalCoroutinesApi::class) + override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + Starbound.schedule(java.lang.Runnable { + try { + executor.execute(java.lang.Runnable { + with(continuation) { resumeUndispatched(Unit) } + }) + } catch (err: RejectedExecutionException) { + continuation.context.cancel(CancellationException("The task was rejected", err)) + } + }, timeMillis, TimeUnit.MILLISECONDS) + } + + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { + return Handle(Starbound.schedule(block, timeMillis, TimeUnit.MILLISECONDS)) + } + + private data class Handle(private val future: Future<*>) : DisposableHandle { + override fun dispose() { + future.cancel(false) + } + } +}