diff --git a/build.gradle.kts b/build.gradle.kts index 26977c24..2ec6f12f 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -22,6 +22,7 @@ val gsonVersion: String by project val log4jVersion: String by project val guavaVersion: String by project val junitVersion: String by project +val sqliteVersion: String by project repositories { maven { @@ -90,6 +91,8 @@ dependencies { implementation("com.github.ben-manes.caffeine:caffeine:$caffeineVersion") implementation(project(":luna")) + implementation("org.xerial:sqlite-jdbc:$sqliteVersion") + implementation("io.netty:netty-transport:$nettyVersion") } diff --git a/gradle.properties b/gradle.properties index 684e59ab..ec67d4ba 100644 --- a/gradle.properties +++ b/gradle.properties @@ -16,3 +16,4 @@ gsonVersion=2.8.9 log4jVersion=2.17.1 guavaVersion=33.0.0-jre junitVersion=5.8.2 +sqliteVersion=3.45.3.0 diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt index b99820db..c4f86dda 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt @@ -36,7 +36,6 @@ import ru.dbotthepony.kstarbound.server.world.LegacyWorldStorage import ru.dbotthepony.kstarbound.server.world.NativeWorldStorage import ru.dbotthepony.kstarbound.server.world.ServerSystemWorld import ru.dbotthepony.kstarbound.server.world.ServerWorld -import ru.dbotthepony.kstarbound.server.world.StorageEngine import ru.dbotthepony.kstarbound.world.SystemWorldLocation import ru.dbotthepony.kstarbound.world.UniversePos import java.util.HashMap @@ -93,16 +92,14 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn var shipChunkSource by Delegates.notNull() private set - private val storageEngine = StorageEngine.Memory({ shipChunks[it]?.orNull() }, { key, value -> shipChunks[key] = KOptional(value) }) - override fun setupLegacy() { super.setupLegacy() - shipChunkSource = LegacyWorldStorage(storageEngine) + shipChunkSource = LegacyWorldStorage.Memory({ shipChunks[it]?.orNull() }, { key, value -> shipChunks[key] = KOptional(value) }) } override fun setupNative() { super.setupNative() - shipChunkSource = NativeWorldStorage(storageEngine) + shipChunkSource = WorldStorage.NULL } fun receiveShipChunks(chunks: Map>) { diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt index 5287240d..caf61573 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt @@ -10,7 +10,6 @@ import kotlinx.coroutines.future.asCompletableFuture import kotlinx.coroutines.future.await import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager -import ru.dbotthepony.kommons.io.BTreeDB6 import ru.dbotthepony.kstarbound.math.vector.Vector3i import ru.dbotthepony.kstarbound.Globals import ru.dbotthepony.kstarbound.Starbound @@ -24,13 +23,13 @@ import ru.dbotthepony.kstarbound.server.world.LegacyWorldStorage import ru.dbotthepony.kstarbound.server.world.ServerUniverse import ru.dbotthepony.kstarbound.server.world.ServerWorld import ru.dbotthepony.kstarbound.server.world.ServerSystemWorld -import ru.dbotthepony.kstarbound.server.world.StorageEngine import ru.dbotthepony.kstarbound.server.world.WorldStorage import ru.dbotthepony.kstarbound.util.BlockableEventLoop import ru.dbotthepony.kstarbound.util.JVMClock import ru.dbotthepony.kstarbound.util.random.random import ru.dbotthepony.kstarbound.world.UniversePos import java.io.File +import java.sql.DriverManager import java.util.UUID import java.util.concurrent.CompletableFuture import java.util.concurrent.TimeUnit @@ -84,23 +83,16 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread private suspend fun loadCelestialWorld(location: WorldID.Celestial): ServerWorld { val file = File(universeFolder, location.pos.toString().replace(':', '_') + ".kworld") - var firstTime = false + val firstTime = !file.exists() + val storage = LegacyWorldStorage.SQL(file) - val engine = if (!file.exists()) { - firstTime = true + val world = if (firstTime) { LOGGER.info("Creating celestial world $location") - BTreeDB6.create(file, blockSize = 1024) + ServerWorld.create(this, WorldTemplate.create(location.pos, universe), storage, location) } else { LOGGER.info("Loading celestial world $location") - BTreeDB6(file) - } - - val storage = LegacyWorldStorage(StorageEngine.DB6(engine)) - - val world = if (firstTime) - ServerWorld.create(this, WorldTemplate.create(location.pos, universe), storage, location) - else ServerWorld.load(this, storage, location).await() + } try { world.sky.referenceClock = universeClock @@ -319,10 +311,10 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread worlds.values.forEach { if (it.isDone && !it.isCompletedExceptionally) { - it.get().eventLoop.awaitTermination(10L, TimeUnit.SECONDS) + it.get().eventLoop.awaitTermination(60L, TimeUnit.SECONDS) if (!it.get().eventLoop.isTerminated) { - LOGGER.warn("World ${it.get()} did not shutdown in 10 seconds, forcing termination. This might leave world in inconsistent state!") + LOGGER.warn("World ${it.get()} did not shutdown in 60 seconds, forcing termination. This might leave world in inconsistent state!") it.get().eventLoop.shutdownNow() } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyWorldStorage.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyWorldStorage.kt index d92a8abd..1b691fbc 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyWorldStorage.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyWorldStorage.kt @@ -4,13 +4,16 @@ import it.unimi.dsi.fastutil.io.FastByteArrayInputStream import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import org.apache.logging.log4j.LogManager import ru.dbotthepony.kommons.arrays.Object2DArray +import ru.dbotthepony.kommons.io.BTreeDB6 import ru.dbotthepony.kommons.io.ByteKey import ru.dbotthepony.kommons.io.readVarInt import ru.dbotthepony.kommons.io.writeVarInt import ru.dbotthepony.kommons.util.KOptional import ru.dbotthepony.kstarbound.Starbound +import ru.dbotthepony.kstarbound.io.BTreeDB5 import ru.dbotthepony.kstarbound.json.VersionedJson import ru.dbotthepony.kstarbound.math.vector.Vector2i +import ru.dbotthepony.kstarbound.util.CarriedExecutor import ru.dbotthepony.kstarbound.world.CHUNK_SIZE import ru.dbotthepony.kstarbound.world.ChunkPos import ru.dbotthepony.kstarbound.world.ChunkState @@ -25,18 +28,32 @@ import java.io.BufferedOutputStream import java.io.ByteArrayInputStream import java.io.DataInputStream import java.io.DataOutputStream +import java.io.File +import java.lang.ref.Cleaner +import java.lang.ref.WeakReference +import java.sql.Connection +import java.sql.DriverManager +import java.sql.PreparedStatement import java.util.concurrent.CompletableFuture +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock import java.util.function.Function +import java.util.function.Supplier import java.util.zip.DeflaterOutputStream import java.util.zip.InflaterInputStream +import kotlin.concurrent.withLock + +sealed class LegacyWorldStorage() : WorldStorage() { + protected abstract fun load(at: ByteKey): CompletableFuture> + protected abstract fun write(at: ByteKey, value: ByteArray) -class LegacyWorldStorage(private val engine: StorageEngine) : WorldStorage() { override fun loadCells(pos: ChunkPos): CompletableFuture, ChunkState>>> { val chunkX = pos.x val chunkY = pos.y val key = ByteKey(1, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) - return engine.load(key).thenApplyAsync(Function { + return load(key).thenApplyAsync(Function { it.map { val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(it)))) val generationLevel = reader.readVarInt() @@ -70,13 +87,13 @@ class LegacyWorldStorage(private val engine: StorageEngine) : WorldStorage() { val chunkY = pos.y val key = ByteKey(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) - return engine.load(key).thenApplyAsync(Function { + return load(key).thenApplyAsync(Function { it.map { readEntities(pos, it) } }, Starbound.EXECUTOR) } override fun loadMetadata(): CompletableFuture> { - return engine.load(metadataKey).thenApplyAsync(Function { + return load(metadataKey).thenApplyAsync(Function { it.flatMap { val stream = DataInputStream(BufferedInputStream(InflaterInputStream(FastByteArrayInputStream(it)))) @@ -95,7 +112,7 @@ class LegacyWorldStorage(private val engine: StorageEngine) : WorldStorage() { val chunkY = pos.y val key = ByteKey(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) - engine.write(key, writeEntities(data)) + write(key, writeEntities(data)) return true } @@ -126,7 +143,7 @@ class LegacyWorldStorage(private val engine: StorageEngine) : WorldStorage() { val key = ByteKey(1, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) stream.close() - engine.write(key, buff.array.copyOf(buff.length)) + write(key, buff.array.copyOf(buff.length)) return true } @@ -139,12 +156,96 @@ class LegacyWorldStorage(private val engine: StorageEngine) : WorldStorage() { data.data.write(stream) stream.close() - engine.write(metadataKey, buff.array.copyOf(buff.length)) + write(metadataKey, buff.array.copyOf(buff.length)) return true } - override fun close() { - engine.close() + class Memory(private val get: (ByteKey) -> ByteArray?, private val set: (ByteKey, ByteArray) -> Unit) : LegacyWorldStorage() { + override fun load(at: ByteKey): CompletableFuture> { + return CompletableFuture.completedFuture(KOptional.ofNullable(get(at))) + } + + override fun write(at: ByteKey, value: ByteArray) { + set(at, value) + } + + override fun close() {} + override fun sync() {} + } + + class DB5(private val database: BTreeDB5) : LegacyWorldStorage() { + private val carrier = CarriedExecutor(Starbound.IO_EXECUTOR) + + override fun load(at: ByteKey): CompletableFuture> { + return CompletableFuture.supplyAsync(Supplier { database.read(at) }, carrier) + } + + override fun write(at: ByteKey, value: ByteArray) { + throw UnsupportedOperationException() + } + + override fun sync() { + // do nothing + } + + override fun close() { + carrier.execute { database.close() } + carrier.wait(300L, TimeUnit.SECONDS) + } + } + + class SQL(path: File) : LegacyWorldStorage() { + private val carrier = CarriedExecutor(Starbound.IO_EXECUTOR) + private val connection = DriverManager.getConnection("jdbc:sqlite:${path.canonicalPath.replace('\\', '/')}") + + init { + connection.autoCommit = false + + connection.createStatement().use { + it.execute("""CREATE TABLE IF NOT EXISTS `data` ( + |`key` BLOB NOT NULL PRIMARY KEY, + |`value` BLOB NOT NULL + |)""".trimMargin()) + } + } + + override fun sync() { + carrier.execute { connection.commit() } + } + + // concurrent safety - load(), write() and close() are always called on one thread + // and load()/write() will never be called after close() + private val loader = connection.prepareStatement("SELECT `value` FROM `data` WHERE `key` = ? LIMIT 1") + private val writer = connection.prepareStatement("REPLACE INTO `data` (`key`, `value`) VALUES (?, ?)") + + override fun load(at: ByteKey): CompletableFuture> { + return CompletableFuture.supplyAsync(Supplier { + loader.setBytes(1, at.toByteArray()) + + loader.executeQuery().use { + if (it.next()) { + val blob = it.getBytes(1) + KOptional(blob) + } else { + KOptional() + } + } + }, carrier) + } + + override fun write(at: ByteKey, value: ByteArray) { + carrier.execute { + writer.setBytes(1, at.toByteArray()) + writer.setBytes(2, value) + writer.execute() + } + } + + override fun close() { + carrier.execute { connection.commit() } + carrier.execute { connection.close() } + carrier.wait(300L, TimeUnit.SECONDS) + } } companion object { diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/NativeWorldStorage.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/NativeWorldStorage.kt index 57b2c3d2..ac29e4f3 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/NativeWorldStorage.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/NativeWorldStorage.kt @@ -10,7 +10,7 @@ import ru.dbotthepony.kstarbound.world.entities.AbstractEntity import java.io.Closeable import java.util.concurrent.CompletableFuture -class NativeWorldStorage(private val engine: StorageEngine) : WorldStorage() { +class NativeWorldStorage() : WorldStorage() { override fun loadCells(pos: ChunkPos): CompletableFuture, ChunkState>>> { TODO("Not yet implemented") } @@ -35,7 +35,11 @@ class NativeWorldStorage(private val engine: StorageEngine) : WorldStorage() { return super.saveMetadata(data) } + override fun sync() { + + } + override fun close() { - engine.close() + } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerChunk.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerChunk.kt index 216182cc..a7b33e24 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerChunk.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerChunk.kt @@ -555,7 +555,7 @@ class ServerChunk(world: ServerWorld, pos: ChunkPos) : Chunk> - abstract fun write(at: ByteKey, value: ByteArray) - - class Memory(private val get: (ByteKey) -> ByteArray?, private val set: (ByteKey, ByteArray) -> Unit) : StorageEngine() { - override fun load(at: ByteKey): CompletableFuture> { - return CompletableFuture.completedFuture(KOptional.ofNullable(get(at))) - } - - override fun write(at: ByteKey, value: ByteArray) { - set(at, value) - } - - override fun close() {} - } - - class DB5(private val database: BTreeDB5) : StorageEngine() { - private val carrier = CarriedExecutor(Starbound.IO_EXECUTOR) - - override fun load(at: ByteKey): CompletableFuture> { - return CompletableFuture.supplyAsync(Supplier { database.read(at) }, carrier) - } - - override fun write(at: ByteKey, value: ByteArray) { - throw UnsupportedOperationException() - } - - override fun close() { - carrier.execute { database.close() } - carrier.wait(300L, TimeUnit.SECONDS) - } - } - - class DB6(private val database: BTreeDB6) : StorageEngine() { - private val carrier = CarriedExecutor(Starbound.IO_EXECUTOR) - - override fun load(at: ByteKey): CompletableFuture> { - return CompletableFuture.supplyAsync(Supplier { database.read(at) }, carrier) - } - - override fun write(at: ByteKey, value: ByteArray) { - carrier.execute { database.write(at, value) } - } - - override fun close() { - carrier.execute { database.close() } - carrier.wait(300L, TimeUnit.SECONDS) - } - } -} diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/WorldStorage.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/WorldStorage.kt index d1e7d697..4edf35ae 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/WorldStorage.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/WorldStorage.kt @@ -36,6 +36,8 @@ abstract class WorldStorage : Closeable { abstract fun loadEntities(pos: ChunkPos): CompletableFuture>> abstract fun loadMetadata(): CompletableFuture> + abstract fun sync() + protected fun readEntities(pos: ChunkPos, data: ByteArray): List { val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(data)))) val i = reader.readVarInt() @@ -107,6 +109,8 @@ abstract class WorldStorage : Closeable { override fun loadMetadata(): CompletableFuture> { return CompletableFuture.completedFuture(KOptional()) } + + override fun sync() {} } object Nothing : WorldStorage() { @@ -121,6 +125,8 @@ abstract class WorldStorage : Closeable { override fun loadMetadata(): CompletableFuture> { return CompletableFuture.completedFuture(KOptional()) } + + override fun sync() {} } companion object { @@ -163,5 +169,9 @@ abstract class WorldStorage : Closeable { override fun close() { children.forEach { it.close() } } + + override fun sync() { + children.forEach { it.sync() } + } } }