Fix race condition in event loop shutdown when using awaitTermination()

This commit is contained in:
DBotThePony 2024-04-23 17:30:52 +07:00
parent 195de2d160
commit 9797202af2
Signed by: DBot
GPG Key ID: DCC23B5715498507
5 changed files with 103 additions and 51 deletions

View File

@ -298,7 +298,6 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn
run { run {
val action = ship.location.orbitalAction(world) val action = ship.location.orbitalAction(world)
currentOrbitalWarpAction = action currentOrbitalWarpAction = action
orbitalWarpAction = action
for (client in shipWorld.clients) { for (client in shipWorld.clients) {
client.client.orbitalWarpAction = action client.client.orbitalWarpAction = action

View File

@ -7,6 +7,7 @@ import com.google.gson.JsonObject
import com.google.gson.JsonPrimitive import com.google.gson.JsonPrimitive
import it.unimi.dsi.fastutil.io.FastByteArrayInputStream import it.unimi.dsi.fastutil.io.FastByteArrayInputStream
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import it.unimi.dsi.fastutil.objects.ObjectArrayList
import it.unimi.dsi.fastutil.objects.ObjectArraySet import it.unimi.dsi.fastutil.objects.ObjectArraySet
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
@ -164,24 +165,45 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread
} }
private suspend fun loadCelestialWorld(location: WorldID.Celestial): ServerWorld { private suspend fun loadCelestialWorld(location: WorldID.Celestial): ServerWorld {
val file = File(universeFolder, location.pos.toString().replace(':', '_') + ".db") val fileName = location.pos.toString().replace(':', '_') + ".db"
val file = File(universeFolder, fileName)
val firstTime = !file.exists() val firstTime = !file.exists()
val storage = LegacyWorldStorage.SQL(file) val storage = LegacyWorldStorage.SQL(file)
val world = if (firstTime) { val world = try {
LOGGER.info("Creating celestial world $location") if (firstTime) {
ServerWorld.create(this, WorldTemplate.create(location.pos, universe), storage, location) LOGGER.info("Creating celestial world $location")
} else { ServerWorld.create(this, WorldTemplate.create(location.pos, universe), storage, location)
LOGGER.info("Loading celestial world $location") } else {
ServerWorld.load(this, storage, location).await() LOGGER.info("Loading celestial world $location")
ServerWorld.load(this, storage, location).await()
}
} catch (err: Throwable) {
storage.close()
if (firstTime) {
file.delete()
throw err
} else {
LOGGER.fatal("Exception loading celestial world at $location, recreating!")
var i = 0
while (!file.renameTo(File(universeFolder, "$fileName-fail$i")) && ++i < 1000) {}
ServerWorld.create(this, WorldTemplate.create(location.pos, universe), LegacyWorldStorage.SQL(file), location)
}
} }
try { try {
world.sky.referenceClock = universeClock world.sky.referenceClock = universeClock
world.eventLoop.start() world.eventLoop.start()
world.prepare(firstTime).await() world.prepare(firstTime).await()
if (firstTime) {
world.saveMetadata()
}
} catch (err: Throwable) { } catch (err: Throwable) {
LOGGER.fatal("Exception while creating celestial world at $location!", err) LOGGER.fatal("Exception while initializing celestial world at $location!", err)
world.eventLoop.shutdown() world.eventLoop.shutdown()
throw err throw err
} }
@ -390,23 +412,21 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread
scope.cancel("Server shutting down") scope.cancel("Server shutting down")
channels.close() channels.close()
worlds.values.forEach { val worldSlice = ObjectArrayList(worlds.values)
if (it.isDone && !it.isCompletedExceptionally) {
it.get().eventLoop.shutdown() worldSlice.forEach {
} it.thenAccept { it.eventLoop.shutdown() }
} }
worlds.values.forEach { worldSlice.forEach {
if (it.isDone && !it.isCompletedExceptionally) { it.thenAccept {
it.get().eventLoop.awaitTermination(60L, TimeUnit.SECONDS) it.eventLoop.awaitTermination(60L, TimeUnit.SECONDS)
if (!it.get().eventLoop.isTerminated) { if (!it.eventLoop.isTerminated) {
LOGGER.warn("World ${it.get()} did not shutdown in 60 seconds, forcing termination. This might leave world in inconsistent state!") LOGGER.warn("World $it did not shutdown in 60 seconds, forcing termination. This might leave world in inconsistent state!")
it.get().eventLoop.shutdownNow() it.eventLoop.shutdownNow()
} }
} }
it.cancel(true)
} }
database.commit() database.commit()

View File

@ -197,8 +197,16 @@ sealed class LegacyWorldStorage() : WorldStorage() {
class SQL(path: File) : LegacyWorldStorage() { class SQL(path: File) : LegacyWorldStorage() {
private val carrier = CarriedExecutor(Starbound.IO_EXECUTOR) private val carrier = CarriedExecutor(Starbound.IO_EXECUTOR)
private val connection = DriverManager.getConnection("jdbc:sqlite:${path.canonicalPath.replace('\\', '/')}") private val connection = DriverManager.getConnection("jdbc:sqlite:${path.canonicalPath.replace('\\', '/')}")
private val cleaner: Cleaner.Cleanable
init { init {
val connection = connection
cleaner = Starbound.CLEANER.register(this) {
/*connection.commit();*/
connection.close()
}
connection.autoCommit = false connection.autoCommit = false
connection.createStatement().use { connection.createStatement().use {
@ -243,7 +251,7 @@ sealed class LegacyWorldStorage() : WorldStorage() {
override fun close() { override fun close() {
carrier.execute { connection.commit() } carrier.execute { connection.commit() }
carrier.execute { connection.close() } carrier.execute { cleaner.clean() }
carrier.wait(300L, TimeUnit.SECONDS) carrier.wait(300L, TimeUnit.SECONDS)
} }
} }

View File

@ -120,6 +120,21 @@ class ServerWorld private constructor(
} }
} }
fun saveMetadata() {
val metadata = MetadataJson(
playerStart = playerSpawnPosition,
respawnInWorld = respawnInWorld,
adjustPlayerStart = adjustPlayerSpawn,
worldTemplate = if (storage is LegacyWorldStorage) Starbound.legacyJson { template.toJson() } else template.toJson(),
centralStructure = centralStructure,
protectedDungeonIds = protectedDungeonIDs,
worldProperties = copyProperties(),
spawningEnabled = true
)
storage.saveMetadata(WorldStorage.Metadata(geometry, VersionRegistry.make("WorldMetadata", Starbound.gson.toJsonTree(metadata))))
}
override val eventLoop = object : BlockableEventLoop("Server World $worldID") { override val eventLoop = object : BlockableEventLoop("Server World $worldID") {
init { init {
isDaemon = true isDaemon = true
@ -148,18 +163,7 @@ class ServerWorld private constructor(
it.client.enqueueWarp(WarpAlias.Return) it.client.enqueueWarp(WarpAlias.Return)
} }
val metadata = MetadataJson( saveMetadata()
playerStart = playerSpawnPosition,
respawnInWorld = respawnInWorld,
adjustPlayerStart = adjustPlayerSpawn,
worldTemplate = if (storage is LegacyWorldStorage) Starbound.legacyJson { template.toJson() } else template.toJson(),
centralStructure = centralStructure,
protectedDungeonIds = protectedDungeonIDs,
worldProperties = copyProperties(),
spawningEnabled = true
)
storage.saveMetadata(WorldStorage.Metadata(geometry, VersionRegistry.make("WorldMetadata", Starbound.gson.toJsonTree(metadata))))
storage.close() storage.close()
} }
} }
@ -644,9 +648,11 @@ class ServerWorld private constructor(
world.protectedDungeonIDs.addAll(meta.protectedDungeonIds) world.protectedDungeonIDs.addAll(meta.protectedDungeonIds)
world world
} }
}.exceptionally { }.also {
LOGGER.error("Error while instancing world $worldID", it) it.exceptionally {
null LOGGER.error("Error while instancing world $worldID", it)
null
}
} }
} }
} }

View File

@ -160,20 +160,27 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
if (isShutdown && isRunning) { if (isShutdown && isRunning) {
while (eventLoopIteration()) {} while (eventLoopIteration()) {}
isRunning = false
scope.cancel(CancellationException("EventLoop shut down")) try {
performShutdown() scope.cancel(CancellationException("EventLoop shut down"))
performShutdown()
} catch (err: Throwable) {
LOGGER.fatal("Exception shutting down $name")
return
} finally {
isRunning = false
}
} }
} }
LOGGER.info("Thread ${this.name} stopped gracefully") LOGGER.info("Thread $name stopped gracefully")
} }
final override fun execute(command: Runnable) { final override fun execute(command: Runnable) {
if (currentThread() === this) { if (currentThread() === this) {
command.run() command.run()
} else { } else {
if (!isRunning) if (isShutdown)
throw RejectedExecutionException("EventLoop is shutting down") throw RejectedExecutionException("EventLoop is shutting down")
val future = CompletableFuture<Unit>() val future = CompletableFuture<Unit>()
@ -196,7 +203,7 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
return CompletableFuture.failedFuture(err) return CompletableFuture.failedFuture(err)
} }
} else { } else {
if (!isRunning) if (isShutdown)
throw RejectedExecutionException("EventLoop is shutting down") throw RejectedExecutionException("EventLoop is shutting down")
val future = CompletableFuture<T>() val future = CompletableFuture<T>()
@ -235,7 +242,7 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
return CompletableFuture.failedFuture<Unit>(err) return CompletableFuture.failedFuture<Unit>(err)
} }
} else { } else {
if (!isRunning) if (isShutdown)
throw RejectedExecutionException("EventLoop is shutting down") throw RejectedExecutionException("EventLoop is shutting down")
val future = CompletableFuture<Unit>() val future = CompletableFuture<Unit>()
@ -260,7 +267,7 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
return CompletableFuture.failedFuture(err) return CompletableFuture.failedFuture(err)
} }
} else { } else {
if (!isRunning) if (isShutdown)
throw RejectedExecutionException("EventLoop is shutting down") throw RejectedExecutionException("EventLoop is shutting down")
val future = CompletableFuture<T>() val future = CompletableFuture<T>()
@ -297,6 +304,8 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
final override fun shutdown() { final override fun shutdown() {
if (!isShutdown) { if (!isShutdown) {
LOGGER.info("$name shutdown initiated")
isShutdown = true isShutdown = true
if (currentThread() === this || state == State.NEW) { if (currentThread() === this || state == State.NEW) {
@ -312,9 +321,14 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
} }
} }
isRunning = false try {
scope.cancel(CancellationException("EventLoop shut down")) scope.cancel(CancellationException("EventLoop shut down"))
performShutdown() performShutdown()
} catch (err: Throwable) {
LOGGER.fatal("Exception shutting down $name", err)
} finally {
isRunning = false
}
} else { } else {
// wake up thread // wake up thread
LockSupport.unpark(this) LockSupport.unpark(this)
@ -351,9 +365,14 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
} }
} }
isRunning = false try {
scope.cancel(CancellationException("EventLoop shut down")) scope.cancel(CancellationException("EventLoop shut down"))
performShutdown() performShutdown()
} catch (err: Throwable) {
LOGGER.fatal("Exception shutting down $name")
} finally {
isRunning = false
}
} else { } else {
// wake up thread // wake up thread
LockSupport.unpark(this) LockSupport.unpark(this)