Fix race condition in ServerUniverse when actively generating chunks

This commit is contained in:
DBotThePony 2024-04-29 18:18:10 +07:00
parent 0dc3c996be
commit dafc211c10
Signed by: DBot
GPG Key ID: DCC23B5715498507
4 changed files with 87 additions and 86 deletions
src/main/kotlin/ru/dbotthepony/kstarbound

View File

@ -74,10 +74,10 @@ import ru.dbotthepony.kstarbound.math.Vector4fTypeAdapter
import ru.dbotthepony.kstarbound.math.Vector4iTypeAdapter
import ru.dbotthepony.kstarbound.util.AssetPathStack
import ru.dbotthepony.kstarbound.util.BlockableEventLoop
import ru.dbotthepony.kstarbound.util.ExecutorWithScheduler
import ru.dbotthepony.kstarbound.util.Directives
import ru.dbotthepony.kstarbound.util.SBPattern
import ru.dbotthepony.kstarbound.util.HashTableInterner
import ru.dbotthepony.kstarbound.util.ScheduledCoroutineExecutor
import ru.dbotthepony.kstarbound.util.random.AbstractPerlinNoise
import ru.dbotthepony.kstarbound.util.random.nextRange
import ru.dbotthepony.kstarbound.util.random.random
@ -172,12 +172,12 @@ object Starbound : BlockableEventLoop("Universe Thread"), Scheduler, ISBFileLoca
val IO_EXECUTOR: ExecutorService = makeExecutor(8, "Disk IO %d", MIN_PRIORITY)
@JvmField
val IO_COROUTINES = ExecutorWithScheduler(IO_EXECUTOR, this).asCoroutineDispatcher()
val IO_COROUTINES = ScheduledCoroutineExecutor(IO_EXECUTOR)
@JvmField
val EXECUTOR: ForkJoinPool = makeExecutor(Runtime.getRuntime().availableProcessors(), "Worker %d", NORM_PRIORITY)
@JvmField
val COROUTINES = ExecutorWithScheduler(EXECUTOR, this).asCoroutineDispatcher()
val COROUTINES = ScheduledCoroutineExecutor(EXECUTOR)
@JvmField
val GLOBAL_SCOPE = CoroutineScope(COROUTINES + SupervisorJob())

View File

@ -11,6 +11,7 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.future.asCompletableFuture
import kotlinx.coroutines.future.await
import ru.dbotthepony.kommons.gson.JsonArrayCollector
@ -36,6 +37,7 @@ import ru.dbotthepony.kstarbound.json.writeJsonElementDeflated
import ru.dbotthepony.kstarbound.math.Line2d
import ru.dbotthepony.kstarbound.math.vector.Vector3i
import ru.dbotthepony.kstarbound.util.CarriedExecutor
import ru.dbotthepony.kstarbound.util.ScheduledCoroutineExecutor
import ru.dbotthepony.kstarbound.util.binnedChoice
import ru.dbotthepony.kstarbound.util.paddedNumber
import ru.dbotthepony.kstarbound.util.random.AbstractPerlinNoise
@ -56,6 +58,7 @@ import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import java.util.function.Function
import java.util.function.Supplier
import java.util.random.RandomGenerator
import kotlin.collections.ArrayList
@ -121,7 +124,7 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable {
}
private val carrier = CarriedExecutor(Starbound.IO_EXECUTOR)
private val scope = CoroutineScope(carrier.asCoroutineDispatcher() + SupervisorJob())
private val scope = CoroutineScope(ScheduledCoroutineExecutor(carrier) + SupervisorJob())
private val selectChunk = database.prepareStatement("SELECT `systems`, `constellations` FROM `chunk` WHERE `x` = ? AND `y` = ?")
private val selectSystem = database.prepareStatement("SELECT `parameters`, `planets` FROM `system` WHERE `x` = ? AND `y` = ? AND `z` = ?")
@ -218,33 +221,28 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable {
selectChunk.setInt(1, pos.x)
selectChunk.setInt(2, pos.y)
val existing = selectChunk.executeQuery().use {
selectChunk.executeQuery().use {
if (it.next()) {
Chunk(pos.x, pos.y, it)
} else {
null
return Chunk(pos.x, pos.y, it)
}
}
if (existing != null) {
chunkFutures.remove(pos)
return existing
}
// TODO
// load legacy chunk here
val generated = generateChunk(pos).await()
generated.write(insertChunk)
chunkFutures.remove(pos)
database.commit()
return generated
return generateChunk(pos).await()
}
private fun getChunk(pos: Vector2i): CompletableFuture<Chunk> {
return chunksCache.get(pos) {
chunkFutures.computeIfAbsent(it) {
scope.async { getChunk0(it) }.asCompletableFuture()
scope.async {
try {
getChunk0(it)
} finally {
chunkFutures.remove(it)
}
}.asCompletableFuture()
}
}
}
@ -290,9 +288,7 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable {
if (existing != null) {
// hit, system already exists
val wait = existing.await()
systemFutures.remove(pos)
return wait
return existing.await()
}
// lets try to get chunk this system is in
@ -304,13 +300,19 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable {
if (pos !in chunk.systems)
return null
return loadSystem(pos)?.await()
return loadSystem(pos)!!.await()
}
private fun getSystem(pos: Vector3i): CompletableFuture<System?> {
return systemCache.get(pos) {
systemFutures.computeIfAbsent(it) {
scope.async { loadOrComputeSystem(it) }.asCompletableFuture()
scope.async {
try {
loadOrComputeSystem(it)
} finally {
systemFutures.remove(it)
}
}.asCompletableFuture()
}
}
}
@ -557,11 +559,12 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable {
}
}
CompletableFuture.allOf(*systems.toTypedArray())
.thenRunAsync(Runnable { database.commit() }, carrier)
val chunk = Chunk(chunkPos.x, chunkPos.y, ObjectOpenHashSet(systemPositions), ObjectOpenHashSet(generateConstellations(random, constellationCandidates)))
val serialized = chunk.serialize()
Chunk(chunkPos.x, chunkPos.y, ObjectOpenHashSet(systemPositions), ObjectOpenHashSet(generateConstellations(random, constellationCandidates)))
}, Starbound.EXECUTOR)
CompletableFuture.allOf(*systems.toTypedArray())
.thenApplyAsync(Function { serialized.write(insertChunk); database.commit(); chunk }, carrier)
}, Starbound.EXECUTOR).thenCompose { it }
}
private fun generateSystem(random: RandomGenerator, location: Vector3i): System? {

View File

@ -1,58 +0,0 @@
package ru.dbotthepony.kstarbound.util
import java.util.concurrent.Callable
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.Delayed
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
class ExecutorWithScheduler(val executor: ExecutorService, val scheduler: ScheduledExecutorService) : ExecutorService by executor, ScheduledExecutorService {
override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> {
return scheduler.schedule(Runnable {
executor.submit(command)
}, delay, unit)
}
private class Proxy<T>(val future: CompletableFuture<T>, val parent: ScheduledFuture<*>) : Future<T> by future, ScheduledFuture<T> {
override fun compareTo(other: Delayed?): Int {
return parent.compareTo(other)
}
override fun getDelay(unit: TimeUnit): Long {
return parent.getDelay(unit)
}
}
// won't react to cancels... man.
override fun <V : Any?> schedule(callable: Callable<V>, delay: Long, unit: TimeUnit): ScheduledFuture<V> {
val future = CompletableFuture<CompletionStage<V>>()
val scheduled = scheduler.schedule(Callable { future.complete(CompletableFuture.supplyAsync(callable::call, executor)) }, delay, unit)
return Proxy(future.thenCompose { it }, scheduled)
}
override fun scheduleAtFixedRate(
command: Runnable,
initialDelay: Long,
period: Long,
unit: TimeUnit
): ScheduledFuture<*> {
return scheduler.scheduleAtFixedRate(Runnable {
executor.submit(command)
}, initialDelay, period, unit)
}
override fun scheduleWithFixedDelay(
command: Runnable,
initialDelay: Long,
delay: Long,
unit: TimeUnit
): ScheduledFuture<*> {
return scheduler.scheduleWithFixedDelay(Runnable {
executor.submit(command)
}, initialDelay, delay, unit)
}
}

View File

@ -0,0 +1,56 @@
package ru.dbotthepony.kstarbound.util
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Delay
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.cancel
import ru.dbotthepony.kstarbound.Starbound
import java.util.concurrent.Executor
import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext
/**
* Uses [Starbound] as delay scheduler instead of Kotlin's one
*/
@OptIn(InternalCoroutinesApi::class)
class ScheduledCoroutineExecutor(val executor: Executor) : CoroutineDispatcher(), Delay {
override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
executor.execute(block)
} catch (err: RejectedExecutionException) {
context.cancel(CancellationException("The task was rejected", err))
Dispatchers.IO.dispatch(context, block)
}
}
@OptIn(ExperimentalCoroutinesApi::class)
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
Starbound.schedule(java.lang.Runnable {
try {
executor.execute(java.lang.Runnable {
with(continuation) { resumeUndispatched(Unit) }
})
} catch (err: RejectedExecutionException) {
continuation.context.cancel(CancellationException("The task was rejected", err))
}
}, timeMillis, TimeUnit.MILLISECONDS)
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
return Handle(Starbound.schedule(block, timeMillis, TimeUnit.MILLISECONDS))
}
private data class Handle(private val future: Future<*>) : DisposableHandle {
override fun dispose() {
future.cancel(false)
}
}
}