SQLite as underlying file storage engine

This commit is contained in:
DBotThePony 2024-04-23 13:51:24 +07:00
parent 87966e5fa1
commit c016dade54
Signed by: DBot
GPG Key ID: DCC23B5715498507
10 changed files with 145 additions and 97 deletions

View File

@ -22,6 +22,7 @@ val gsonVersion: String by project
val log4jVersion: String by project
val guavaVersion: String by project
val junitVersion: String by project
val sqliteVersion: String by project
repositories {
maven {
@ -90,6 +91,8 @@ dependencies {
implementation("com.github.ben-manes.caffeine:caffeine:$caffeineVersion")
implementation(project(":luna"))
implementation("org.xerial:sqlite-jdbc:$sqliteVersion")
implementation("io.netty:netty-transport:$nettyVersion")
}

View File

@ -16,3 +16,4 @@ gsonVersion=2.8.9
log4jVersion=2.17.1
guavaVersion=33.0.0-jre
junitVersion=5.8.2
sqliteVersion=3.45.3.0

View File

@ -36,7 +36,6 @@ import ru.dbotthepony.kstarbound.server.world.LegacyWorldStorage
import ru.dbotthepony.kstarbound.server.world.NativeWorldStorage
import ru.dbotthepony.kstarbound.server.world.ServerSystemWorld
import ru.dbotthepony.kstarbound.server.world.ServerWorld
import ru.dbotthepony.kstarbound.server.world.StorageEngine
import ru.dbotthepony.kstarbound.world.SystemWorldLocation
import ru.dbotthepony.kstarbound.world.UniversePos
import java.util.HashMap
@ -93,16 +92,14 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn
var shipChunkSource by Delegates.notNull<WorldStorage>()
private set
private val storageEngine = StorageEngine.Memory({ shipChunks[it]?.orNull() }, { key, value -> shipChunks[key] = KOptional(value) })
override fun setupLegacy() {
super.setupLegacy()
shipChunkSource = LegacyWorldStorage(storageEngine)
shipChunkSource = LegacyWorldStorage.Memory({ shipChunks[it]?.orNull() }, { key, value -> shipChunks[key] = KOptional(value) })
}
override fun setupNative() {
super.setupNative()
shipChunkSource = NativeWorldStorage(storageEngine)
shipChunkSource = WorldStorage.NULL
}
fun receiveShipChunks(chunks: Map<ByteKey, KOptional<ByteArray>>) {

View File

@ -10,7 +10,6 @@ import kotlinx.coroutines.future.asCompletableFuture
import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import ru.dbotthepony.kommons.io.BTreeDB6
import ru.dbotthepony.kstarbound.math.vector.Vector3i
import ru.dbotthepony.kstarbound.Globals
import ru.dbotthepony.kstarbound.Starbound
@ -24,13 +23,13 @@ import ru.dbotthepony.kstarbound.server.world.LegacyWorldStorage
import ru.dbotthepony.kstarbound.server.world.ServerUniverse
import ru.dbotthepony.kstarbound.server.world.ServerWorld
import ru.dbotthepony.kstarbound.server.world.ServerSystemWorld
import ru.dbotthepony.kstarbound.server.world.StorageEngine
import ru.dbotthepony.kstarbound.server.world.WorldStorage
import ru.dbotthepony.kstarbound.util.BlockableEventLoop
import ru.dbotthepony.kstarbound.util.JVMClock
import ru.dbotthepony.kstarbound.util.random.random
import ru.dbotthepony.kstarbound.world.UniversePos
import java.io.File
import java.sql.DriverManager
import java.util.UUID
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
@ -84,23 +83,16 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread
private suspend fun loadCelestialWorld(location: WorldID.Celestial): ServerWorld {
val file = File(universeFolder, location.pos.toString().replace(':', '_') + ".kworld")
var firstTime = false
val firstTime = !file.exists()
val storage = LegacyWorldStorage.SQL(file)
val engine = if (!file.exists()) {
firstTime = true
val world = if (firstTime) {
LOGGER.info("Creating celestial world $location")
BTreeDB6.create(file, blockSize = 1024)
ServerWorld.create(this, WorldTemplate.create(location.pos, universe), storage, location)
} else {
LOGGER.info("Loading celestial world $location")
BTreeDB6(file)
}
val storage = LegacyWorldStorage(StorageEngine.DB6(engine))
val world = if (firstTime)
ServerWorld.create(this, WorldTemplate.create(location.pos, universe), storage, location)
else
ServerWorld.load(this, storage, location).await()
}
try {
world.sky.referenceClock = universeClock
@ -319,10 +311,10 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread
worlds.values.forEach {
if (it.isDone && !it.isCompletedExceptionally) {
it.get().eventLoop.awaitTermination(10L, TimeUnit.SECONDS)
it.get().eventLoop.awaitTermination(60L, TimeUnit.SECONDS)
if (!it.get().eventLoop.isTerminated) {
LOGGER.warn("World ${it.get()} did not shutdown in 10 seconds, forcing termination. This might leave world in inconsistent state!")
LOGGER.warn("World ${it.get()} did not shutdown in 60 seconds, forcing termination. This might leave world in inconsistent state!")
it.get().eventLoop.shutdownNow()
}
}

View File

@ -4,13 +4,16 @@ import it.unimi.dsi.fastutil.io.FastByteArrayInputStream
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import org.apache.logging.log4j.LogManager
import ru.dbotthepony.kommons.arrays.Object2DArray
import ru.dbotthepony.kommons.io.BTreeDB6
import ru.dbotthepony.kommons.io.ByteKey
import ru.dbotthepony.kommons.io.readVarInt
import ru.dbotthepony.kommons.io.writeVarInt
import ru.dbotthepony.kommons.util.KOptional
import ru.dbotthepony.kstarbound.Starbound
import ru.dbotthepony.kstarbound.io.BTreeDB5
import ru.dbotthepony.kstarbound.json.VersionedJson
import ru.dbotthepony.kstarbound.math.vector.Vector2i
import ru.dbotthepony.kstarbound.util.CarriedExecutor
import ru.dbotthepony.kstarbound.world.CHUNK_SIZE
import ru.dbotthepony.kstarbound.world.ChunkPos
import ru.dbotthepony.kstarbound.world.ChunkState
@ -25,18 +28,32 @@ import java.io.BufferedOutputStream
import java.io.ByteArrayInputStream
import java.io.DataInputStream
import java.io.DataOutputStream
import java.io.File
import java.lang.ref.Cleaner
import java.lang.ref.WeakReference
import java.sql.Connection
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import java.util.function.Function
import java.util.function.Supplier
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterInputStream
import kotlin.concurrent.withLock
sealed class LegacyWorldStorage() : WorldStorage() {
protected abstract fun load(at: ByteKey): CompletableFuture<KOptional<ByteArray>>
protected abstract fun write(at: ByteKey, value: ByteArray)
class LegacyWorldStorage(private val engine: StorageEngine) : WorldStorage() {
override fun loadCells(pos: ChunkPos): CompletableFuture<KOptional<Pair<Object2DArray<out AbstractCell>, ChunkState>>> {
val chunkX = pos.x
val chunkY = pos.y
val key = ByteKey(1, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
return engine.load(key).thenApplyAsync(Function {
return load(key).thenApplyAsync(Function {
it.map {
val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(it))))
val generationLevel = reader.readVarInt()
@ -70,13 +87,13 @@ class LegacyWorldStorage(private val engine: StorageEngine) : WorldStorage() {
val chunkY = pos.y
val key = ByteKey(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
return engine.load(key).thenApplyAsync(Function {
return load(key).thenApplyAsync(Function {
it.map { readEntities(pos, it) }
}, Starbound.EXECUTOR)
}
override fun loadMetadata(): CompletableFuture<KOptional<Metadata>> {
return engine.load(metadataKey).thenApplyAsync(Function {
return load(metadataKey).thenApplyAsync(Function {
it.flatMap {
val stream = DataInputStream(BufferedInputStream(InflaterInputStream(FastByteArrayInputStream(it))))
@ -95,7 +112,7 @@ class LegacyWorldStorage(private val engine: StorageEngine) : WorldStorage() {
val chunkY = pos.y
val key = ByteKey(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
engine.write(key, writeEntities(data))
write(key, writeEntities(data))
return true
}
@ -126,7 +143,7 @@ class LegacyWorldStorage(private val engine: StorageEngine) : WorldStorage() {
val key = ByteKey(1, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
stream.close()
engine.write(key, buff.array.copyOf(buff.length))
write(key, buff.array.copyOf(buff.length))
return true
}
@ -139,12 +156,96 @@ class LegacyWorldStorage(private val engine: StorageEngine) : WorldStorage() {
data.data.write(stream)
stream.close()
engine.write(metadataKey, buff.array.copyOf(buff.length))
write(metadataKey, buff.array.copyOf(buff.length))
return true
}
override fun close() {
engine.close()
class Memory(private val get: (ByteKey) -> ByteArray?, private val set: (ByteKey, ByteArray) -> Unit) : LegacyWorldStorage() {
override fun load(at: ByteKey): CompletableFuture<KOptional<ByteArray>> {
return CompletableFuture.completedFuture(KOptional.ofNullable(get(at)))
}
override fun write(at: ByteKey, value: ByteArray) {
set(at, value)
}
override fun close() {}
override fun sync() {}
}
class DB5(private val database: BTreeDB5) : LegacyWorldStorage() {
private val carrier = CarriedExecutor(Starbound.IO_EXECUTOR)
override fun load(at: ByteKey): CompletableFuture<KOptional<ByteArray>> {
return CompletableFuture.supplyAsync(Supplier { database.read(at) }, carrier)
}
override fun write(at: ByteKey, value: ByteArray) {
throw UnsupportedOperationException()
}
override fun sync() {
// do nothing
}
override fun close() {
carrier.execute { database.close() }
carrier.wait(300L, TimeUnit.SECONDS)
}
}
class SQL(path: File) : LegacyWorldStorage() {
private val carrier = CarriedExecutor(Starbound.IO_EXECUTOR)
private val connection = DriverManager.getConnection("jdbc:sqlite:${path.canonicalPath.replace('\\', '/')}")
init {
connection.autoCommit = false
connection.createStatement().use {
it.execute("""CREATE TABLE IF NOT EXISTS `data` (
|`key` BLOB NOT NULL PRIMARY KEY,
|`value` BLOB NOT NULL
|)""".trimMargin())
}
}
override fun sync() {
carrier.execute { connection.commit() }
}
// concurrent safety - load(), write() and close() are always called on one thread
// and load()/write() will never be called after close()
private val loader = connection.prepareStatement("SELECT `value` FROM `data` WHERE `key` = ? LIMIT 1")
private val writer = connection.prepareStatement("REPLACE INTO `data` (`key`, `value`) VALUES (?, ?)")
override fun load(at: ByteKey): CompletableFuture<KOptional<ByteArray>> {
return CompletableFuture.supplyAsync(Supplier {
loader.setBytes(1, at.toByteArray())
loader.executeQuery().use {
if (it.next()) {
val blob = it.getBytes(1)
KOptional(blob)
} else {
KOptional()
}
}
}, carrier)
}
override fun write(at: ByteKey, value: ByteArray) {
carrier.execute {
writer.setBytes(1, at.toByteArray())
writer.setBytes(2, value)
writer.execute()
}
}
override fun close() {
carrier.execute { connection.commit() }
carrier.execute { connection.close() }
carrier.wait(300L, TimeUnit.SECONDS)
}
}
companion object {

View File

@ -10,7 +10,7 @@ import ru.dbotthepony.kstarbound.world.entities.AbstractEntity
import java.io.Closeable
import java.util.concurrent.CompletableFuture
class NativeWorldStorage(private val engine: StorageEngine) : WorldStorage() {
class NativeWorldStorage() : WorldStorage() {
override fun loadCells(pos: ChunkPos): CompletableFuture<KOptional<Pair<Object2DArray<out AbstractCell>, ChunkState>>> {
TODO("Not yet implemented")
}
@ -35,7 +35,11 @@ class NativeWorldStorage(private val engine: StorageEngine) : WorldStorage() {
return super.saveMetadata(data)
}
override fun sync() {
}
override fun close() {
engine.close()
}
}

View File

@ -555,7 +555,7 @@ class ServerChunk(world: ServerWorld, pos: ChunkPos) : Chunk<ServerWorld, Server
aabbd,
filter = Predicate { it.isPersistent && !it.isRemote && aabbd.isInside(it.position) })
world.storage.saveCells(pos, cells.value, state)
world.storage.saveCells(pos, copyCells(), state)
world.storage.saveEntities(pos, unloadable)
return unloadable

View File

@ -17,7 +17,6 @@ import ru.dbotthepony.kommons.util.IStruct2i
import ru.dbotthepony.kstarbound.math.vector.Vector2d
import ru.dbotthepony.kstarbound.math.vector.Vector2i
import ru.dbotthepony.kstarbound.Globals
import ru.dbotthepony.kstarbound.Registries
import ru.dbotthepony.kstarbound.Starbound
import ru.dbotthepony.kstarbound.VersionRegistry
import ru.dbotthepony.kstarbound.defs.WarpAction
@ -169,6 +168,10 @@ class ServerWorld private constructor(
eventLoop.scheduleAtFixedRate(Runnable {
tick(Starbound.TIMESTEP)
}, 0L, Starbound.TIMESTEP_NANOS, TimeUnit.NANOSECONDS)
eventLoop.scheduleWithFixedDelay(Runnable {
storage.sync()
}, 10L, 10L, TimeUnit.SECONDS)
}
override fun toString(): String {

View File

@ -1,63 +0,0 @@
package ru.dbotthepony.kstarbound.server.world
import ru.dbotthepony.kommons.io.BTreeDB6
import ru.dbotthepony.kommons.io.ByteKey
import ru.dbotthepony.kommons.util.KOptional
import ru.dbotthepony.kstarbound.Starbound
import ru.dbotthepony.kstarbound.io.BTreeDB5
import ru.dbotthepony.kstarbound.util.CarriedExecutor
import java.io.Closeable
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import java.util.function.Supplier
abstract class StorageEngine : Closeable {
abstract fun load(at: ByteKey): CompletableFuture<KOptional<ByteArray>>
abstract fun write(at: ByteKey, value: ByteArray)
class Memory(private val get: (ByteKey) -> ByteArray?, private val set: (ByteKey, ByteArray) -> Unit) : StorageEngine() {
override fun load(at: ByteKey): CompletableFuture<KOptional<ByteArray>> {
return CompletableFuture.completedFuture(KOptional.ofNullable(get(at)))
}
override fun write(at: ByteKey, value: ByteArray) {
set(at, value)
}
override fun close() {}
}
class DB5(private val database: BTreeDB5) : StorageEngine() {
private val carrier = CarriedExecutor(Starbound.IO_EXECUTOR)
override fun load(at: ByteKey): CompletableFuture<KOptional<ByteArray>> {
return CompletableFuture.supplyAsync(Supplier { database.read(at) }, carrier)
}
override fun write(at: ByteKey, value: ByteArray) {
throw UnsupportedOperationException()
}
override fun close() {
carrier.execute { database.close() }
carrier.wait(300L, TimeUnit.SECONDS)
}
}
class DB6(private val database: BTreeDB6) : StorageEngine() {
private val carrier = CarriedExecutor(Starbound.IO_EXECUTOR)
override fun load(at: ByteKey): CompletableFuture<KOptional<ByteArray>> {
return CompletableFuture.supplyAsync(Supplier { database.read(at) }, carrier)
}
override fun write(at: ByteKey, value: ByteArray) {
carrier.execute { database.write(at, value) }
}
override fun close() {
carrier.execute { database.close() }
carrier.wait(300L, TimeUnit.SECONDS)
}
}
}

View File

@ -36,6 +36,8 @@ abstract class WorldStorage : Closeable {
abstract fun loadEntities(pos: ChunkPos): CompletableFuture<KOptional<Collection<AbstractEntity>>>
abstract fun loadMetadata(): CompletableFuture<KOptional<Metadata>>
abstract fun sync()
protected fun readEntities(pos: ChunkPos, data: ByteArray): List<AbstractEntity> {
val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(data))))
val i = reader.readVarInt()
@ -107,6 +109,8 @@ abstract class WorldStorage : Closeable {
override fun loadMetadata(): CompletableFuture<KOptional<Metadata>> {
return CompletableFuture.completedFuture(KOptional())
}
override fun sync() {}
}
object Nothing : WorldStorage() {
@ -121,6 +125,8 @@ abstract class WorldStorage : Closeable {
override fun loadMetadata(): CompletableFuture<KOptional<Metadata>> {
return CompletableFuture.completedFuture(KOptional())
}
override fun sync() {}
}
companion object {
@ -163,5 +169,9 @@ abstract class WorldStorage : Closeable {
override fun close() {
children.forEach { it.close() }
}
override fun sync() {
children.forEach { it.sync() }
}
}
}