Use WAL in storage databases

This commit is contained in:
DBotThePony 2024-04-24 10:29:31 +07:00
parent 9644eda14c
commit d93cc21dcd
Signed by: DBot
GPG Key ID: DCC23B5715498507
8 changed files with 120 additions and 103 deletions

View File

@ -76,13 +76,15 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread
private val database = DriverManager.getConnection("jdbc:sqlite:${File(universeFolder, "universe.db").absolutePath.replace('\\', '/')}")
init {
database.autoCommit = false
database.createStatement().use {
it.execute("PRAGMA locking_mode=EXCLUSIVE")
it.execute("PRAGMA journal_mode=WAL")
it.execute("CREATE TABLE IF NOT EXISTS `metadata` (`key` VARCHAR NOT NULL PRIMARY KEY, `value` BLOB NOT NULL)")
it.execute("CREATE TABLE IF NOT EXISTS `universe_flags` (`flag` VARCHAR NOT NULL PRIMARY KEY)")
it.execute("CREATE TABLE IF NOT EXISTS `client_context` (`uuid` VARCHAR NOT NULL PRIMARY KEY, `data` BLOB NOT NULL)")
}
database.autoCommit = false
}
private val lookupMetadata = database.prepareStatement("SELECT `value` FROM `metadata` WHERE `key` = ?")

View File

@ -16,6 +16,7 @@ import ru.dbotthepony.kstarbound.io.BTreeDB5
import ru.dbotthepony.kstarbound.json.VersionedJson
import ru.dbotthepony.kstarbound.math.vector.Vector2i
import ru.dbotthepony.kstarbound.util.CarriedExecutor
import ru.dbotthepony.kstarbound.util.supplyAsync
import ru.dbotthepony.kstarbound.world.CHUNK_SIZE
import ru.dbotthepony.kstarbound.world.ChunkPos
import ru.dbotthepony.kstarbound.world.ChunkState
@ -75,15 +76,6 @@ sealed class LegacyWorldStorage() : WorldStorage() {
for (y in 0 until CHUNK_SIZE) {
for (x in 0 until CHUNK_SIZE) {
val read = MutableCell().readLegacy(reader, tileSerializationVersion)
if (read.foreground.material == BuiltinMetaMaterials.STRUCTURE) {
read.foreground.material = BuiltinMetaMaterials.EMPTY
}
if (read.background.material == BuiltinMetaMaterials.STRUCTURE) {
read.background.material = BuiltinMetaMaterials.EMPTY
}
result[x, y] = read.immutable()
}
}
@ -120,42 +112,48 @@ sealed class LegacyWorldStorage() : WorldStorage() {
}
override fun saveEntities(pos: ChunkPos, data: Collection<AbstractEntity>): Boolean {
val chunkX = pos.x
val chunkY = pos.y
val key = ByteKey(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
Starbound.EXECUTOR.execute {
val chunkX = pos.x
val chunkY = pos.y
val key = ByteKey(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
write(key, writeEntities(data))
}
write(key, writeEntities(data))
return true
}
override fun saveCells(pos: ChunkPos, data: Object2DArray<out AbstractCell>, state: ChunkState): Boolean {
val buff = FastByteArrayOutputStream()
val stream = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buff)))
Starbound.EXECUTOR.execute {
val buff = FastByteArrayOutputStream()
val stream = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buff)))
stream.writeVarInt(when (state) {
ChunkState.FRESH -> 0
ChunkState.EMPTY -> 0
ChunkState.TERRAIN -> 1
ChunkState.MICRO_DUNGEONS -> 2
ChunkState.CAVE_LIQUID -> 3
ChunkState.FULL -> 4
})
stream.writeVarInt(when (state) {
ChunkState.FRESH -> 0
ChunkState.EMPTY -> 0
ChunkState.TERRAIN -> 1
ChunkState.MICRO_DUNGEONS -> 2
ChunkState.CAVE_LIQUID -> 3
ChunkState.FULL -> 4
})
stream.writeVarInt(418)
stream.writeVarInt(418)
for (y in 0 until CHUNK_SIZE) {
for (x in 0 until CHUNK_SIZE) {
val cell = data.getOrNull(x, y) ?: AbstractCell.NULL
cell.writeLegacy(stream)
for (y in 0 until CHUNK_SIZE) {
for (x in 0 until CHUNK_SIZE) {
val cell = data.getOrNull(x, y) ?: AbstractCell.NULL
cell.writeLegacy(stream)
}
}
val chunkX = pos.x
val chunkY = pos.y
val key = ByteKey(1, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
stream.close()
write(key, buff.array.copyOf(buff.length))
}
val chunkX = pos.x
val chunkY = pos.y
val key = ByteKey(1, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
stream.close()
write(key, buff.array.copyOf(buff.length))
return true
}
@ -169,6 +167,7 @@ sealed class LegacyWorldStorage() : WorldStorage() {
stream.close()
write(metadataKey, buff.array.copyOf(buff.length))
return true
}
@ -182,7 +181,6 @@ sealed class LegacyWorldStorage() : WorldStorage() {
}
override fun close() {}
override fun sync() {}
}
class DB5(private val database: BTreeDB5) : LegacyWorldStorage() {
@ -196,10 +194,6 @@ sealed class LegacyWorldStorage() : WorldStorage() {
throw UnsupportedOperationException()
}
override fun sync() {
// do nothing
}
override fun close() {
carrier.execute { database.close() }
carrier.wait(300L, TimeUnit.SECONDS)
@ -219,9 +213,10 @@ sealed class LegacyWorldStorage() : WorldStorage() {
connection.close()
}
connection.autoCommit = false
connection.createStatement().use {
it.execute("PRAGMA locking_mode=EXCLUSIVE")
it.execute("PRAGMA journal_mode=WAL")
it.execute("""CREATE TABLE IF NOT EXISTS `data` (
|`key` BLOB NOT NULL PRIMARY KEY,
|`value` BLOB NOT NULL
@ -229,10 +224,6 @@ sealed class LegacyWorldStorage() : WorldStorage() {
}
}
override fun sync() {
carrier.execute { connection.commit() }
}
// concurrent safety - load(), write() and close() are always called on one thread
// and load()/write() will never be called after close()
private val loader = connection.prepareStatement("SELECT `value` FROM `data` WHERE `key` = ? LIMIT 1")
@ -262,7 +253,7 @@ sealed class LegacyWorldStorage() : WorldStorage() {
}
override fun close() {
carrier.execute { connection.commit() }
// carrier.execute { connection.commit() }
carrier.execute { cleaner.clean() }
carrier.wait(300L, TimeUnit.SECONDS)
}

View File

@ -35,10 +35,6 @@ class NativeWorldStorage() : WorldStorage() {
return super.saveMetadata(data)
}
override fun sync() {
}
override fun close() {
}

View File

@ -1,11 +1,8 @@
package ru.dbotthepony.kstarbound.server.world
import com.github.benmanes.caffeine.cache.AsyncCacheLoader
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import com.google.gson.JsonArray
import com.google.gson.JsonObject
import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap
import it.unimi.dsi.fastutil.ints.IntArraySet
import it.unimi.dsi.fastutil.objects.ObjectArrayList
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet
@ -16,8 +13,6 @@ import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.future.asCompletableFuture
import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import ru.dbotthepony.kommons.collect.chainOptionalFutures
import ru.dbotthepony.kommons.gson.JsonArrayCollector
import ru.dbotthepony.kommons.gson.contains
import ru.dbotthepony.kommons.gson.get
@ -34,17 +29,12 @@ import ru.dbotthepony.kstarbound.fromJson
import ru.dbotthepony.kstarbound.io.BTreeDB5
import ru.dbotthepony.kstarbound.json.jsonArrayOf
import ru.dbotthepony.kstarbound.json.mergeJson
import ru.dbotthepony.kstarbound.json.readJsonArray
import ru.dbotthepony.kstarbound.json.readJsonArrayInflated
import ru.dbotthepony.kstarbound.json.readJsonElement
import ru.dbotthepony.kstarbound.json.readJsonElementInflated
import ru.dbotthepony.kstarbound.json.writeJsonArray
import ru.dbotthepony.kstarbound.json.writeJsonArrayDeflated
import ru.dbotthepony.kstarbound.json.writeJsonElement
import ru.dbotthepony.kstarbound.json.writeJsonElementDeflated
import ru.dbotthepony.kstarbound.math.Line2d
import ru.dbotthepony.kstarbound.math.vector.Vector3i
import ru.dbotthepony.kstarbound.util.BlockableEventLoop
import ru.dbotthepony.kstarbound.util.CarriedExecutor
import ru.dbotthepony.kstarbound.util.binnedChoice
import ru.dbotthepony.kstarbound.util.paddedNumber
@ -60,7 +50,6 @@ import java.sql.Connection
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
@ -99,6 +88,9 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable {
}
database.createStatement().use {
it.execute("PRAGMA locking_mode=EXCLUSIVE")
it.execute("PRAGMA journal_mode=WAL")
it.execute("""
CREATE TABLE IF NOT EXISTS `chunk` (
`x` INTEGER NOT NULL,
@ -130,8 +122,19 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable {
private val selectChunk = database.prepareStatement("SELECT `systems`, `constellations` FROM `chunk` WHERE `x` = ? AND `y` = ?")
private val selectSystem = database.prepareStatement("SELECT `parameters`, `planets` FROM `system` WHERE `x` = ? AND `y` = ? AND `z` = ?")
private val insertChunk = database.prepareStatement("REPLACE INTO `chunk` (`x`, `y`, `systems`, `constellations`) VALUES (?, ?, ?, ?)")
private val insertSystem = database.prepareStatement("REPLACE INTO `system` (`x`, `y`, `z`, `parameters`, `planets`) VALUES (?, ?, ?, ?, ?)")
private val insertChunk = database.prepareStatement("INSERT INTO `chunk` (`x`, `y`, `systems`, `constellations`) VALUES (?, ?, ?, ?)")
private val insertSystem = database.prepareStatement("INSERT INTO `system` (`x`, `y`, `z`, `parameters`, `planets`) VALUES (?, ?, ?, ?, ?)")
private class SerializedChunk(val x: Int, val y: Int, val systems: ByteArray, val constellations: ByteArray) {
fun write(statement: PreparedStatement) {
statement.setInt(1, x)
statement.setInt(2, y)
statement.setBytes(3, systems)
statement.setBytes(4, constellations)
statement.execute()
}
}
private data class Chunk(val x: Int, val y: Int, val systems: Set<Vector3i>, val constellations: Set<Pair<Vector2i, Vector2i>>) {
constructor(x: Int, y: Int, data: ResultSet) : this(
@ -144,18 +147,31 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable {
}.toSet()
)
fun serialize(): SerializedChunk {
return SerializedChunk(
x, y,
systems.stream()
.map { jsonArrayOf(it.x, it.y, it.z) }
.collect(JsonArrayCollector)
.writeJsonArrayDeflated(),
constellations.stream().map {
jsonArrayOf(jsonArrayOf(it.first.x, it.first.y), jsonArrayOf(it.second.x, it.second.y))
}.collect(JsonArrayCollector).writeJsonArrayDeflated()
)
}
fun write(statement: PreparedStatement) {
serialize().write(statement)
}
}
private class SerializedSystem(val x: Int, val y: Int, val z: Int, val parameters: ByteArray, val planets: ByteArray) {
fun write(statement: PreparedStatement) {
statement.setInt(1, x)
statement.setInt(2, y)
statement.setBytes(3, systems.stream()
.map { jsonArrayOf(it.x, it.y, it.z) }
.collect(JsonArrayCollector)
.writeJsonArrayDeflated())
statement.setBytes(4, constellations.stream().map {
jsonArrayOf(jsonArrayOf(it.first.x, it.first.y), jsonArrayOf(it.second.x, it.second.y))
}.collect(JsonArrayCollector).writeJsonArrayDeflated())
statement.setInt(3, z)
statement.setBytes(4, parameters)
statement.setBytes(5, planets)
statement.execute()
}
@ -179,16 +195,18 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable {
}
}
fun write(statement: PreparedStatement) {
statement.setInt(1, x)
statement.setInt(2, y)
statement.setInt(3, z)
statement.setBytes(4, Starbound.gson.toJsonTree(parameters).writeJsonElementDeflated())
statement.setBytes(5, planets.entries.stream()
.map { jsonArrayOf(it.key.first, it.key.second, it.value) }
.collect(JsonArrayCollector).writeJsonArrayDeflated())
fun serialize(): SerializedSystem {
return SerializedSystem(
x, y, z,
Starbound.gson.toJsonTree(parameters).writeJsonElementDeflated(),
planets.entries.stream()
.map { jsonArrayOf(it.key.first, it.key.second, it.value) }
.collect(JsonArrayCollector).writeJsonArrayDeflated()
)
}
statement.execute()
fun write(statement: PreparedStatement) {
serialize().write(statement)
}
}
@ -224,6 +242,7 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable {
val generated = generateChunk(pos).await()
generated.write(insertChunk)
chunkFutures.remove(pos)
database.commit()
return generated
}
@ -502,7 +521,8 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable {
return CompletableFuture.supplyAsync(Supplier {
val constellationCandidates = ArrayList<Vector2i>()
val systems = ArrayList<Vector3i>()
val systemPositions = ArrayList<Vector3i>()
val systems = ArrayList<CompletableFuture<*>>()
for (x in region.mins.x until region.maxs.x) {
for (y in region.mins.y until region.maxs.y) {
@ -511,10 +531,12 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable {
val pos = Vector3i(x, y, z)
val system = generateSystem(random, pos) ?: continue
systems.add(pos)
systemPositions.add(pos)
systemCache.put(pos, CompletableFuture.completedFuture(system))
carrier.executePriority { system.write(insertSystem) }
systems.add(CompletableFuture.supplyAsync(Supplier { system.serialize() }, Starbound.EXECUTOR)
.thenAcceptAsync(Consumer { it.write(insertSystem) }, carrier))
systemCache.put(Vector3i(system.x, system.y, system.z), CompletableFuture.completedFuture(system))
if (
system.parameters.parameters.get("constellationCapable", true) &&
@ -526,7 +548,10 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable {
}
}
Chunk(chunkPos.x, chunkPos.y, ObjectOpenHashSet(systems), ObjectOpenHashSet(generateConstellations(random, constellationCandidates)))
CompletableFuture.allOf(*systems.toTypedArray())
.thenRunAsync(Runnable { database.commit() }, carrier)
Chunk(chunkPos.x, chunkPos.y, ObjectOpenHashSet(systemPositions), ObjectOpenHashSet(generateConstellations(random, constellationCandidates)))
}, Starbound.EXECUTOR)
}

View File

@ -172,10 +172,6 @@ class ServerWorld private constructor(
eventLoop.scheduleAtFixedRate(Runnable {
tick(Starbound.TIMESTEP)
}, 0L, Starbound.TIMESTEP_NANOS, TimeUnit.NANOSECONDS)
eventLoop.scheduleWithFixedDelay(Runnable {
storage.sync()
}, 10L, 10L, TimeUnit.SECONDS)
}
override fun toString(): String {

View File

@ -36,8 +36,6 @@ abstract class WorldStorage : Closeable {
abstract fun loadEntities(pos: ChunkPos): CompletableFuture<KOptional<Collection<AbstractEntity>>>
abstract fun loadMetadata(): CompletableFuture<KOptional<Metadata>>
abstract fun sync()
protected fun readEntities(pos: ChunkPos, data: ByteArray): List<AbstractEntity> {
val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(data))))
val i = reader.readVarInt()
@ -109,8 +107,6 @@ abstract class WorldStorage : Closeable {
override fun loadMetadata(): CompletableFuture<KOptional<Metadata>> {
return CompletableFuture.completedFuture(KOptional())
}
override fun sync() {}
}
object Nothing : WorldStorage() {
@ -125,8 +121,6 @@ abstract class WorldStorage : Closeable {
override fun loadMetadata(): CompletableFuture<KOptional<Metadata>> {
return CompletableFuture.completedFuture(KOptional())
}
override fun sync() {}
}
companion object {
@ -169,9 +163,5 @@ abstract class WorldStorage : Closeable {
override fun close() {
children.forEach { it.close() }
}
override fun sync() {
children.forEach { it.sync() }
}
}
}

View File

@ -1,5 +1,6 @@
package ru.dbotthepony.kstarbound.util
import org.apache.logging.log4j.LogManager
import java.lang.ref.Reference
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.ConcurrentLinkedQueue
@ -33,7 +34,12 @@ class CarriedExecutor(private val parent: Executor) : Executor, Runnable {
var next = queue.poll()
while (next != null) {
next.run()
try {
next.run()
} catch (err: Throwable) {
LOGGER.error("Exception running task", err)
}
next = queue.poll()
}
@ -62,4 +68,8 @@ class CarriedExecutor(private val parent: Executor) : Executor, Runnable {
LockSupport.parkNanos(500_000L)
}
}
companion object {
private val LOGGER = LogManager.getLogger()
}
}

View File

@ -5,6 +5,9 @@ import ru.dbotthepony.kommons.io.StreamCodec
import ru.dbotthepony.kstarbound.Starbound
import ru.dbotthepony.kstarbound.json.builder.IStringSerializable
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executor
import java.util.function.Supplier
import java.util.stream.Stream
import kotlin.NoSuchElementException
import kotlin.collections.Collection
@ -73,3 +76,7 @@ fun <C : Comparable<C>, T : Any> Stream<Pair<C, T>>.binnedChoice(value: C): Opti
fun <E : IStringSerializable> Collection<E>.valueOf(value: String): E {
return firstOrNull { it.match(value) } ?: throw NoSuchElementException("'$value' is not a valid ${first()::class.qualifiedName}")
}
fun <T> Executor.supplyAsync(block: Supplier<T>): CompletableFuture<T> {
return CompletableFuture.supplyAsync(block, this)
}