From d93cc21dcd5f5502bc7b81ab2772280fb629b1da Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Wed, 24 Apr 2024 10:29:31 +0700 Subject: [PATCH] Use WAL in storage databases --- .../kstarbound/server/StarboundServer.kt | 6 +- .../server/world/LegacyWorldStorage.kt | 83 +++++++--------- .../server/world/NativeWorldStorage.kt | 4 - .../kstarbound/server/world/ServerUniverse.kt | 97 ++++++++++++------- .../kstarbound/server/world/ServerWorld.kt | 4 - .../kstarbound/server/world/WorldStorage.kt | 10 -- .../kstarbound/util/CarriedExecutor.kt | 12 ++- .../ru/dbotthepony/kstarbound/util/Utils.kt | 7 ++ 8 files changed, 120 insertions(+), 103 deletions(-) diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt index 2165318a..f33780f8 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt @@ -76,13 +76,15 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread private val database = DriverManager.getConnection("jdbc:sqlite:${File(universeFolder, "universe.db").absolutePath.replace('\\', '/')}") init { - database.autoCommit = false - database.createStatement().use { + it.execute("PRAGMA locking_mode=EXCLUSIVE") + it.execute("PRAGMA journal_mode=WAL") it.execute("CREATE TABLE IF NOT EXISTS `metadata` (`key` VARCHAR NOT NULL PRIMARY KEY, `value` BLOB NOT NULL)") it.execute("CREATE TABLE IF NOT EXISTS `universe_flags` (`flag` VARCHAR NOT NULL PRIMARY KEY)") it.execute("CREATE TABLE IF NOT EXISTS `client_context` (`uuid` VARCHAR NOT NULL PRIMARY KEY, `data` BLOB NOT NULL)") } + + database.autoCommit = false } private val lookupMetadata = database.prepareStatement("SELECT `value` FROM `metadata` WHERE `key` = ?") diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyWorldStorage.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyWorldStorage.kt index a0f83ddb..0398f125 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyWorldStorage.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyWorldStorage.kt @@ -16,6 +16,7 @@ import ru.dbotthepony.kstarbound.io.BTreeDB5 import ru.dbotthepony.kstarbound.json.VersionedJson import ru.dbotthepony.kstarbound.math.vector.Vector2i import ru.dbotthepony.kstarbound.util.CarriedExecutor +import ru.dbotthepony.kstarbound.util.supplyAsync import ru.dbotthepony.kstarbound.world.CHUNK_SIZE import ru.dbotthepony.kstarbound.world.ChunkPos import ru.dbotthepony.kstarbound.world.ChunkState @@ -75,15 +76,6 @@ sealed class LegacyWorldStorage() : WorldStorage() { for (y in 0 until CHUNK_SIZE) { for (x in 0 until CHUNK_SIZE) { val read = MutableCell().readLegacy(reader, tileSerializationVersion) - - if (read.foreground.material == BuiltinMetaMaterials.STRUCTURE) { - read.foreground.material = BuiltinMetaMaterials.EMPTY - } - - if (read.background.material == BuiltinMetaMaterials.STRUCTURE) { - read.background.material = BuiltinMetaMaterials.EMPTY - } - result[x, y] = read.immutable() } } @@ -120,42 +112,48 @@ sealed class LegacyWorldStorage() : WorldStorage() { } override fun saveEntities(pos: ChunkPos, data: Collection): Boolean { - val chunkX = pos.x - val chunkY = pos.y - val key = ByteKey(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) + Starbound.EXECUTOR.execute { + val chunkX = pos.x + val chunkY = pos.y + val key = ByteKey(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) + + write(key, writeEntities(data)) + } - write(key, writeEntities(data)) return true } override fun saveCells(pos: ChunkPos, data: Object2DArray, state: ChunkState): Boolean { - val buff = FastByteArrayOutputStream() - val stream = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buff))) + Starbound.EXECUTOR.execute { + val buff = FastByteArrayOutputStream() + val stream = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buff))) - stream.writeVarInt(when (state) { - ChunkState.FRESH -> 0 - ChunkState.EMPTY -> 0 - ChunkState.TERRAIN -> 1 - ChunkState.MICRO_DUNGEONS -> 2 - ChunkState.CAVE_LIQUID -> 3 - ChunkState.FULL -> 4 - }) + stream.writeVarInt(when (state) { + ChunkState.FRESH -> 0 + ChunkState.EMPTY -> 0 + ChunkState.TERRAIN -> 1 + ChunkState.MICRO_DUNGEONS -> 2 + ChunkState.CAVE_LIQUID -> 3 + ChunkState.FULL -> 4 + }) - stream.writeVarInt(418) + stream.writeVarInt(418) - for (y in 0 until CHUNK_SIZE) { - for (x in 0 until CHUNK_SIZE) { - val cell = data.getOrNull(x, y) ?: AbstractCell.NULL - cell.writeLegacy(stream) + for (y in 0 until CHUNK_SIZE) { + for (x in 0 until CHUNK_SIZE) { + val cell = data.getOrNull(x, y) ?: AbstractCell.NULL + cell.writeLegacy(stream) + } } + + val chunkX = pos.x + val chunkY = pos.y + val key = ByteKey(1, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) + + stream.close() + write(key, buff.array.copyOf(buff.length)) } - val chunkX = pos.x - val chunkY = pos.y - val key = ByteKey(1, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) - - stream.close() - write(key, buff.array.copyOf(buff.length)) return true } @@ -169,6 +167,7 @@ sealed class LegacyWorldStorage() : WorldStorage() { stream.close() write(metadataKey, buff.array.copyOf(buff.length)) + return true } @@ -182,7 +181,6 @@ sealed class LegacyWorldStorage() : WorldStorage() { } override fun close() {} - override fun sync() {} } class DB5(private val database: BTreeDB5) : LegacyWorldStorage() { @@ -196,10 +194,6 @@ sealed class LegacyWorldStorage() : WorldStorage() { throw UnsupportedOperationException() } - override fun sync() { - // do nothing - } - override fun close() { carrier.execute { database.close() } carrier.wait(300L, TimeUnit.SECONDS) @@ -219,9 +213,10 @@ sealed class LegacyWorldStorage() : WorldStorage() { connection.close() } - connection.autoCommit = false - connection.createStatement().use { + it.execute("PRAGMA locking_mode=EXCLUSIVE") + it.execute("PRAGMA journal_mode=WAL") + it.execute("""CREATE TABLE IF NOT EXISTS `data` ( |`key` BLOB NOT NULL PRIMARY KEY, |`value` BLOB NOT NULL @@ -229,10 +224,6 @@ sealed class LegacyWorldStorage() : WorldStorage() { } } - override fun sync() { - carrier.execute { connection.commit() } - } - // concurrent safety - load(), write() and close() are always called on one thread // and load()/write() will never be called after close() private val loader = connection.prepareStatement("SELECT `value` FROM `data` WHERE `key` = ? LIMIT 1") @@ -262,7 +253,7 @@ sealed class LegacyWorldStorage() : WorldStorage() { } override fun close() { - carrier.execute { connection.commit() } + // carrier.execute { connection.commit() } carrier.execute { cleaner.clean() } carrier.wait(300L, TimeUnit.SECONDS) } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/NativeWorldStorage.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/NativeWorldStorage.kt index ac29e4f3..a28fa927 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/NativeWorldStorage.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/NativeWorldStorage.kt @@ -35,10 +35,6 @@ class NativeWorldStorage() : WorldStorage() { return super.saveMetadata(data) } - override fun sync() { - - } - override fun close() { } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerUniverse.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerUniverse.kt index 0cb7881e..00255fd3 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerUniverse.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerUniverse.kt @@ -1,11 +1,8 @@ package ru.dbotthepony.kstarbound.server.world -import com.github.benmanes.caffeine.cache.AsyncCacheLoader -import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Caffeine import com.google.gson.JsonArray import com.google.gson.JsonObject -import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap import it.unimi.dsi.fastutil.ints.IntArraySet import it.unimi.dsi.fastutil.objects.ObjectArrayList import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet @@ -16,8 +13,6 @@ import kotlinx.coroutines.async import kotlinx.coroutines.cancel import kotlinx.coroutines.future.asCompletableFuture import kotlinx.coroutines.future.await -import kotlinx.coroutines.launch -import ru.dbotthepony.kommons.collect.chainOptionalFutures import ru.dbotthepony.kommons.gson.JsonArrayCollector import ru.dbotthepony.kommons.gson.contains import ru.dbotthepony.kommons.gson.get @@ -34,17 +29,12 @@ import ru.dbotthepony.kstarbound.fromJson import ru.dbotthepony.kstarbound.io.BTreeDB5 import ru.dbotthepony.kstarbound.json.jsonArrayOf import ru.dbotthepony.kstarbound.json.mergeJson -import ru.dbotthepony.kstarbound.json.readJsonArray import ru.dbotthepony.kstarbound.json.readJsonArrayInflated -import ru.dbotthepony.kstarbound.json.readJsonElement import ru.dbotthepony.kstarbound.json.readJsonElementInflated -import ru.dbotthepony.kstarbound.json.writeJsonArray import ru.dbotthepony.kstarbound.json.writeJsonArrayDeflated -import ru.dbotthepony.kstarbound.json.writeJsonElement import ru.dbotthepony.kstarbound.json.writeJsonElementDeflated import ru.dbotthepony.kstarbound.math.Line2d import ru.dbotthepony.kstarbound.math.vector.Vector3i -import ru.dbotthepony.kstarbound.util.BlockableEventLoop import ru.dbotthepony.kstarbound.util.CarriedExecutor import ru.dbotthepony.kstarbound.util.binnedChoice import ru.dbotthepony.kstarbound.util.paddedNumber @@ -60,7 +50,6 @@ import java.sql.Connection import java.sql.DriverManager import java.sql.PreparedStatement import java.sql.ResultSet -import java.time.Duration import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit @@ -99,6 +88,9 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable { } database.createStatement().use { + it.execute("PRAGMA locking_mode=EXCLUSIVE") + it.execute("PRAGMA journal_mode=WAL") + it.execute(""" CREATE TABLE IF NOT EXISTS `chunk` ( `x` INTEGER NOT NULL, @@ -130,8 +122,19 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable { private val selectChunk = database.prepareStatement("SELECT `systems`, `constellations` FROM `chunk` WHERE `x` = ? AND `y` = ?") private val selectSystem = database.prepareStatement("SELECT `parameters`, `planets` FROM `system` WHERE `x` = ? AND `y` = ? AND `z` = ?") - private val insertChunk = database.prepareStatement("REPLACE INTO `chunk` (`x`, `y`, `systems`, `constellations`) VALUES (?, ?, ?, ?)") - private val insertSystem = database.prepareStatement("REPLACE INTO `system` (`x`, `y`, `z`, `parameters`, `planets`) VALUES (?, ?, ?, ?, ?)") + private val insertChunk = database.prepareStatement("INSERT INTO `chunk` (`x`, `y`, `systems`, `constellations`) VALUES (?, ?, ?, ?)") + private val insertSystem = database.prepareStatement("INSERT INTO `system` (`x`, `y`, `z`, `parameters`, `planets`) VALUES (?, ?, ?, ?, ?)") + + private class SerializedChunk(val x: Int, val y: Int, val systems: ByteArray, val constellations: ByteArray) { + fun write(statement: PreparedStatement) { + statement.setInt(1, x) + statement.setInt(2, y) + statement.setBytes(3, systems) + statement.setBytes(4, constellations) + + statement.execute() + } + } private data class Chunk(val x: Int, val y: Int, val systems: Set, val constellations: Set>) { constructor(x: Int, y: Int, data: ResultSet) : this( @@ -144,18 +147,31 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable { }.toSet() ) + fun serialize(): SerializedChunk { + return SerializedChunk( + x, y, + systems.stream() + .map { jsonArrayOf(it.x, it.y, it.z) } + .collect(JsonArrayCollector) + .writeJsonArrayDeflated(), + constellations.stream().map { + jsonArrayOf(jsonArrayOf(it.first.x, it.first.y), jsonArrayOf(it.second.x, it.second.y)) + }.collect(JsonArrayCollector).writeJsonArrayDeflated() + ) + } + + fun write(statement: PreparedStatement) { + serialize().write(statement) + } + } + + private class SerializedSystem(val x: Int, val y: Int, val z: Int, val parameters: ByteArray, val planets: ByteArray) { fun write(statement: PreparedStatement) { statement.setInt(1, x) statement.setInt(2, y) - - statement.setBytes(3, systems.stream() - .map { jsonArrayOf(it.x, it.y, it.z) } - .collect(JsonArrayCollector) - .writeJsonArrayDeflated()) - - statement.setBytes(4, constellations.stream().map { - jsonArrayOf(jsonArrayOf(it.first.x, it.first.y), jsonArrayOf(it.second.x, it.second.y)) - }.collect(JsonArrayCollector).writeJsonArrayDeflated()) + statement.setInt(3, z) + statement.setBytes(4, parameters) + statement.setBytes(5, planets) statement.execute() } @@ -179,16 +195,18 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable { } } - fun write(statement: PreparedStatement) { - statement.setInt(1, x) - statement.setInt(2, y) - statement.setInt(3, z) - statement.setBytes(4, Starbound.gson.toJsonTree(parameters).writeJsonElementDeflated()) - statement.setBytes(5, planets.entries.stream() - .map { jsonArrayOf(it.key.first, it.key.second, it.value) } - .collect(JsonArrayCollector).writeJsonArrayDeflated()) + fun serialize(): SerializedSystem { + return SerializedSystem( + x, y, z, + Starbound.gson.toJsonTree(parameters).writeJsonElementDeflated(), + planets.entries.stream() + .map { jsonArrayOf(it.key.first, it.key.second, it.value) } + .collect(JsonArrayCollector).writeJsonArrayDeflated() + ) + } - statement.execute() + fun write(statement: PreparedStatement) { + serialize().write(statement) } } @@ -224,6 +242,7 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable { val generated = generateChunk(pos).await() generated.write(insertChunk) chunkFutures.remove(pos) + database.commit() return generated } @@ -502,7 +521,8 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable { return CompletableFuture.supplyAsync(Supplier { val constellationCandidates = ArrayList() - val systems = ArrayList() + val systemPositions = ArrayList() + val systems = ArrayList>() for (x in region.mins.x until region.maxs.x) { for (y in region.mins.y until region.maxs.y) { @@ -511,10 +531,12 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable { val pos = Vector3i(x, y, z) val system = generateSystem(random, pos) ?: continue - systems.add(pos) + systemPositions.add(pos) - systemCache.put(pos, CompletableFuture.completedFuture(system)) - carrier.executePriority { system.write(insertSystem) } + systems.add(CompletableFuture.supplyAsync(Supplier { system.serialize() }, Starbound.EXECUTOR) + .thenAcceptAsync(Consumer { it.write(insertSystem) }, carrier)) + + systemCache.put(Vector3i(system.x, system.y, system.z), CompletableFuture.completedFuture(system)) if ( system.parameters.parameters.get("constellationCapable", true) && @@ -526,7 +548,10 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable { } } - Chunk(chunkPos.x, chunkPos.y, ObjectOpenHashSet(systems), ObjectOpenHashSet(generateConstellations(random, constellationCandidates))) + CompletableFuture.allOf(*systems.toTypedArray()) + .thenRunAsync(Runnable { database.commit() }, carrier) + + Chunk(chunkPos.x, chunkPos.y, ObjectOpenHashSet(systemPositions), ObjectOpenHashSet(generateConstellations(random, constellationCandidates))) }, Starbound.EXECUTOR) } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorld.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorld.kt index 08fd33f4..75917297 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorld.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorld.kt @@ -172,10 +172,6 @@ class ServerWorld private constructor( eventLoop.scheduleAtFixedRate(Runnable { tick(Starbound.TIMESTEP) }, 0L, Starbound.TIMESTEP_NANOS, TimeUnit.NANOSECONDS) - - eventLoop.scheduleWithFixedDelay(Runnable { - storage.sync() - }, 10L, 10L, TimeUnit.SECONDS) } override fun toString(): String { diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/WorldStorage.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/WorldStorage.kt index 4edf35ae..d1e7d697 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/WorldStorage.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/WorldStorage.kt @@ -36,8 +36,6 @@ abstract class WorldStorage : Closeable { abstract fun loadEntities(pos: ChunkPos): CompletableFuture>> abstract fun loadMetadata(): CompletableFuture> - abstract fun sync() - protected fun readEntities(pos: ChunkPos, data: ByteArray): List { val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(data)))) val i = reader.readVarInt() @@ -109,8 +107,6 @@ abstract class WorldStorage : Closeable { override fun loadMetadata(): CompletableFuture> { return CompletableFuture.completedFuture(KOptional()) } - - override fun sync() {} } object Nothing : WorldStorage() { @@ -125,8 +121,6 @@ abstract class WorldStorage : Closeable { override fun loadMetadata(): CompletableFuture> { return CompletableFuture.completedFuture(KOptional()) } - - override fun sync() {} } companion object { @@ -169,9 +163,5 @@ abstract class WorldStorage : Closeable { override fun close() { children.forEach { it.close() } } - - override fun sync() { - children.forEach { it.sync() } - } } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/util/CarriedExecutor.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/util/CarriedExecutor.kt index da7a6a62..3795e80d 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/util/CarriedExecutor.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/util/CarriedExecutor.kt @@ -1,5 +1,6 @@ package ru.dbotthepony.kstarbound.util +import org.apache.logging.log4j.LogManager import java.lang.ref.Reference import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.ConcurrentLinkedQueue @@ -33,7 +34,12 @@ class CarriedExecutor(private val parent: Executor) : Executor, Runnable { var next = queue.poll() while (next != null) { - next.run() + try { + next.run() + } catch (err: Throwable) { + LOGGER.error("Exception running task", err) + } + next = queue.poll() } @@ -62,4 +68,8 @@ class CarriedExecutor(private val parent: Executor) : Executor, Runnable { LockSupport.parkNanos(500_000L) } } + + companion object { + private val LOGGER = LogManager.getLogger() + } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/util/Utils.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/util/Utils.kt index 44aa886a..8c5e4ea2 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/util/Utils.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/util/Utils.kt @@ -5,6 +5,9 @@ import ru.dbotthepony.kommons.io.StreamCodec import ru.dbotthepony.kstarbound.Starbound import ru.dbotthepony.kstarbound.json.builder.IStringSerializable import java.util.* +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executor +import java.util.function.Supplier import java.util.stream.Stream import kotlin.NoSuchElementException import kotlin.collections.Collection @@ -73,3 +76,7 @@ fun , T : Any> Stream>.binnedChoice(value: C): Opti fun Collection.valueOf(value: String): E { return firstOrNull { it.match(value) } ?: throw NoSuchElementException("'$value' is not a valid ${first()::class.qualifiedName}") } + +fun Executor.supplyAsync(block: Supplier): CompletableFuture { + return CompletableFuture.supplyAsync(block, this) +}