From f6d138400f56f2aa96a2053f1e809a1d484cf21f Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Mon, 12 Feb 2024 17:27:04 +0700 Subject: [PATCH] BTreeDB gone async --- .../kotlin/ru/dbotthepony/kstarbound/Main.kt | 7 +- .../kstarbound/client/ClientConnection.kt | 5 +- .../kstarbound/client/StarboundClient.kt | 3 +- .../ru/dbotthepony/kstarbound/io/BTreeDB.kt | 374 ++++++++++-------- .../ru/dbotthepony/kstarbound/io/ByteKey.kt | 36 +- .../kstarbound/network/Connection.kt | 19 +- .../kstarbound/server/ServerChannels.kt | 7 +- .../kstarbound/server/ServerConnection.kt | 1 - .../kstarbound/server/StarboundServer.kt | 45 ++- .../server/world/LegacyChunkSource.kt | 67 ++-- .../kstarbound/server/world/ServerWorld.kt | 4 +- 11 files changed, 330 insertions(+), 238 deletions(-) diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/Main.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/Main.kt index 4b03bfe1..1dc78ea4 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/Main.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/Main.kt @@ -7,6 +7,7 @@ import ru.dbotthepony.kommons.vector.Vector2d import ru.dbotthepony.kommons.vector.Vector2i import ru.dbotthepony.kstarbound.client.StarboundClient import ru.dbotthepony.kstarbound.io.BTreeDB +import ru.dbotthepony.kstarbound.io.ByteKey import ru.dbotthepony.kstarbound.server.IntegratedStarboundServer import ru.dbotthepony.kstarbound.server.world.LegacyChunkSource import ru.dbotthepony.kstarbound.server.world.ServerWorld @@ -33,7 +34,7 @@ fun main() { val db = BTreeDB(File("F:\\SteamLibrary\\steamapps\\common\\Starbound - Unstable\\storage\\universe\\389760395_938904237_-238610574_5.world")) //val db = BTreeDB(File("world.world")) - val meta = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(db.read(byteArrayOf(0, 0, 0, 0, 0))), Inflater()))) + val meta = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(db.read(ByteKey(0, 0, 0, 0, 0)).get().get()), Inflater()))) println(meta.readInt()) println(meta.readInt()) @@ -83,8 +84,8 @@ fun main() { //item.movement.applyVelocity(Vector2d(rand.nextDouble() * 1000.0 - 500.0, rand.nextDouble() * 1000.0 - 500.0)) } - //client.connectToLocalServer(server.channels.createLocalChannel(), UUID.randomUUID()) - client.connectToRemoteServer(InetSocketAddress("127.0.0.1", 21025), UUID.randomUUID()) + client.connectToLocalServer(server.channels.createLocalChannel(), UUID.randomUUID()) + //client.connectToRemoteServer(InetSocketAddress("127.0.0.1", 21025), UUID.randomUUID()) //client2.connectToLocalServer(server.channels.createLocalChannel(), UUID.randomUUID()) server.channels.createChannel(InetSocketAddress(21060)) } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/client/ClientConnection.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/client/ClientConnection.kt index d1dd17a0..8416db3b 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/client/ClientConnection.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/client/ClientConnection.kt @@ -20,8 +20,9 @@ import java.util.* // client -> server class ClientConnection(val client: StarboundClient, type: ConnectionType, uuid: UUID) : Connection(ConnectionSide.CLIENT, type, uuid) { private fun sendHello() { - isLegacy = true - sendAndFlush(ProtocolRequestPacket(Starbound.LEGACY_PROTOCOL_VERSION)) + isLegacy = false + //sendAndFlush(ProtocolRequestPacket(Starbound.LEGACY_PROTOCOL_VERSION)) + sendAndFlush(ProtocolRequestPacket(Starbound.NATIVE_PROTOCOL_VERSION)) } var connectionID: Int = -1 diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/client/StarboundClient.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/client/StarboundClient.kt index 72ad0dbb..20c50b02 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/client/StarboundClient.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/client/StarboundClient.kt @@ -945,7 +945,8 @@ class StarboundClient private constructor(val clientID: Int) : Closeable { val activeConnection = activeConnection - //activeConnection?.send(TrackedPositionPacket(camera.pos)) + if (activeConnection != null && !activeConnection.isLegacy && activeConnection.isConnected) + activeConnection.send(TrackedPositionPacket(camera.pos)) uberShaderPrograms.forValidRefs { it.viewMatrix = viewportMatrixScreen } fontShaderPrograms.forValidRefs { it.viewMatrix = viewportMatrixScreen } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/io/BTreeDB.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/io/BTreeDB.kt index 29ac2860..c65301a3 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/io/BTreeDB.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/io/BTreeDB.kt @@ -4,17 +4,29 @@ import it.unimi.dsi.fastutil.ints.IntArraySet import ru.dbotthepony.kommons.io.readString import ru.dbotthepony.kommons.io.readVarInt import ru.dbotthepony.kommons.io.readVarIntInfo +import ru.dbotthepony.kommons.util.KOptional import java.io.* import java.util.* -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.Executor +import java.util.concurrent.SynchronousQueue +import java.util.concurrent.ThreadFactory +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.locks.ReentrantReadWriteLock +import java.util.function.Supplier +import kotlin.concurrent.read private fun readHeader(reader: RandomAccessFile, required: Char) { val read = reader.read() require(read.toChar() == required) { "Bad Starbound Pak header, expected ${required.code}, got $read" } } -enum class TreeBlockType(val identity: String) { +private enum class TreeBlockType(val identity: String) { INDEX("II"), LEAF("LL"), FREE("FF"); @@ -31,29 +43,14 @@ enum class TreeBlockType(val identity: String) { } } -private operator fun ByteArray.compareTo(b: ByteArray): Int { - require(size == b.size) { "Keys are not of same size (${size} vs ${b.size})" } +class BTreeDB(val path: File) : Closeable { + private val reader = RandomAccessFile(path, "r") + private val carrier = CarrierExecutor() - for (i in indices) { - if (this[i].toInt() and 0xFF > b[i].toInt() and 0xFF) { - return 1 - } else if (this[i].toInt() and 0xFF < b[i].toInt() and 0xFF) { - return -1 - } + override fun close() { + carrier.execute { reader.close() } } - return 0 -} - -/** - * Класс, который позволяет читать и записывать в файлы Starbound BTReeDB 5 - * - * Big credit for https://github.com/blixt/py-starbound/blob/master/FORMATS.md#btreedb5 ! - */ -class BTreeDB(val path: File) { - val reader = RandomAccessFile(path, "r") - private val lock = ReentrantLock() - init { readHeader(reader, 'B') readHeader(reader, 'T') @@ -92,174 +89,177 @@ class BTreeDB(val path: File) { val rootNodeIndex get() = if (useNodeTwo) rootNode2Index else rootNode1Index val rootNodeIsLeaf get() = if (useNodeTwo) rootNode2IsLeaf else rootNode1IsLeaf - fun readBlockType() = lock.withLock { TreeBlockType[reader.readString(2)] } + private fun readBlockType() = TreeBlockType[reader.readString(2)] - fun findAllKeys(index: Long = rootNodeIndex): List { - lock.withLock { - seekBlock(index) + private fun doFindAllKeys(index: Long): List { + seekBlock(index) - val list = ArrayList() - val type = readBlockType() + val list = ArrayList() + val type = readBlockType() - if (type == TreeBlockType.LEAF) { - val keyAmount = reader.readInt() - // offset внутри лепестка в байтах - var offset = 6 + if (type == TreeBlockType.LEAF) { + val keyAmount = reader.readInt() + // offset внутри лепестка в байтах + var offset = 6 - for (i in 0 until keyAmount) { - // читаем ключ - list.add(ByteArray(indexKeySize).also { reader.read(it) }) - offset += indexKeySize + for (i in 0 until keyAmount) { + // читаем ключ + list.add(ByteKey(*ByteArray(indexKeySize).also { reader.read(it) })) + offset += indexKeySize - // читаем размер данных внутри ключа - var (dataLength, readBytes) = reader.readVarIntInfo() - offset += readBytes + // читаем размер данных внутри ключа + var (dataLength, readBytes) = reader.readVarIntInfo() + offset += readBytes - while (true) { - // если конец данных внутри текущего блока, останавливаемся - if (offset + dataLength <= blockSize - 4) { - reader.skipBytes(dataLength) - offset += dataLength - break - } - - // иначе, ищем следующий блок - - // пропускаем оставшиеся данные, переходим на границу текущего блока-лепестка - val delta = (blockSize - 4 - offset) - reader.skipBytes(delta) - - // ищем следующий блок с нашими данными - val nextBlockIndex = reader.readInt() - seekBlock(nextBlockIndex.toLong()) - - // удостоверяемся что мы попали в лепесток - check(readBlockType() == TreeBlockType.LEAF) { "Did not hit leaf block" } - offset = 2 - dataLength -= delta - } - } - } else if (type == TreeBlockType.INDEX) { - reader.skipBytes(1) - val keyAmount = reader.readInt() - - val blockList = IntArraySet() - blockList.add(reader.readInt()) - - for (i in 0 until keyAmount) { - // ключ - reader.skipBytes(indexKeySize) - - // указатель на блок - blockList.add(reader.readInt()) - } - - // читаем все дочерние блоки на ключи - for (block in blockList.intIterator()) { - for (key in findAllKeys(block.toLong())) { - list.add(key) - } - } - } - - return list - } - } - - fun read(key: ByteArray): ByteArray? { - require(key.size == indexKeySize) { "Key provided is ${key.size} in size, while $indexKeySize is required" } - - lock.withLock { - seekBlock(rootNodeIndex) - var type = readBlockType() - var iterations = 1000 - - val keyLoader = ByteArray(indexKeySize) - - // сканирование индекса - while (iterations-- > 0 && type != TreeBlockType.LEAF) { - if (type == TreeBlockType.FREE) { - throw IllegalStateException("Hit free block while scanning index for ${key.joinToString(", ")}") - } - - reader.skipBytes(1) - - val keyCount = reader.readInt() - // if keyAmount == 4 then - // B a B b B c B d B - val readKeys = ByteArray((keyCount + 1) * 4 + keyCount * indexKeySize) - reader.readFully(readKeys) - - val stream = DataInputStream(ByteArrayInputStream(readKeys)) - - var read = false - - // B a - // B b - // B c - // B d - for (keyIndex in 0 until keyCount) { - // указатель на левый блок - val pointer = stream.readInt() - - // левый ключ, всё что меньше него находится в левом блоке - stream.readFully(keyLoader) - - // нужный ключ меньше самого первого ключа, поэтому он находится где то в левом блоке - if (key < keyLoader) { - seekBlock(pointer.toLong()) - type = readBlockType() - read = true + while (true) { + // если конец данных внутри текущего блока, останавливаемся + if (offset + dataLength <= blockSize - 4) { + reader.skipBytes(dataLength) + offset += dataLength break } - } - if (!read) { - // ... B - seekBlock(stream.readInt().toLong()) - type = readBlockType() + // иначе, ищем следующий блок + + // пропускаем оставшиеся данные, переходим на границу текущего блока-лепестка + val delta = (blockSize - 4 - offset) + reader.skipBytes(delta) + + // ищем следующий блок с нашими данными + val nextBlockIndex = reader.readInt() + seekBlock(nextBlockIndex.toLong()) + + // удостоверяемся что мы попали в лепесток + check(readBlockType() == TreeBlockType.LEAF) { "Did not hit leaf block" } + offset = 2 + dataLength -= delta } } + } else if (type == TreeBlockType.INDEX) { + reader.skipBytes(1) + val keyAmount = reader.readInt() - // мы пришли в лепесток, теперь прямолинейно ищем в linked list - val leafStream = DataInputStream(BufferedInputStream(LeafInputStream(2))) - val keyCount = leafStream.readInt() + val blockList = IntArraySet() + blockList.add(reader.readInt()) - for (keyIndex in 0 until keyCount) { - // читаем ключ - leafStream.read(keyLoader) + for (i in 0 until keyAmount) { + // ключ + reader.skipBytes(indexKeySize) - // читаем размер данных - val dataLength = leafStream.readVarInt() - - // это наш блок - if (keyLoader.contentEquals(key)) { - val binary = ByteArray(dataLength) - - if (dataLength == 0) { - // нет данных (?) - return binary - } - - leafStream.readFully(binary) - - return binary - } else { - leafStream.skipBytes(dataLength) - } + // указатель на блок + blockList.add(reader.readInt()) } - return null + // читаем все дочерние блоки на ключи + for (block in blockList.intIterator()) { + for (key in doFindAllKeys(block.toLong())) { + list.add(key) + } + } } + + return list } - fun seekBlock(id: Long) { + fun findAllKeys(index: Long = rootNodeIndex): CompletableFuture> { + return CompletableFuture.supplyAsync(Supplier { + doFindAllKeys(index) + }, carrier) + } + + private fun doRead(key: ByteKey): KOptional { + require(key.size == indexKeySize) { "Key provided is ${key.size} in size, while $indexKeySize is required" } + + seekBlock(rootNodeIndex) + var type = readBlockType() + var iterations = 1000 + + // сканирование индекса + while (iterations-- > 0 && type != TreeBlockType.LEAF) { + if (type == TreeBlockType.FREE) { + throw IllegalStateException("Hit free block while scanning index for $key") + } + + reader.skipBytes(1) + + val keyCount = reader.readInt() + // if keyAmount == 4 then + // B a B b B c B d B + val readKeys = ByteArray((keyCount + 1) * 4 + keyCount * indexKeySize) + reader.readFully(readKeys) + + val stream = DataInputStream(ByteArrayInputStream(readKeys)) + + var read = false + + // B a + // B b + // B c + // B d + for (keyIndex in 0 until keyCount) { + // указатель на левый блок + val pointer = stream.readInt() + + // левый ключ, всё что меньше него находится в левом блоке + val seekKey = stream.readByteKeyRaw(indexKeySize) + + // нужный ключ меньше самого первого ключа, поэтому он находится где то в левом блоке + if (key < seekKey) { + seekBlock(pointer.toLong()) + type = readBlockType() + read = true + break + } + } + + if (!read) { + // ... B + seekBlock(stream.readInt().toLong()) + type = readBlockType() + } + } + + // мы пришли в лепесток, теперь прямолинейно ищем в linked list + val leafStream = DataInputStream(BufferedInputStream(LeafInputStream(2))) + val keyCount = leafStream.readInt() + + for (keyIndex in 0 until keyCount) { + // читаем ключ + val seekKey = leafStream.readByteKeyRaw(indexKeySize) + + // читаем размер данных + val dataLength = leafStream.readVarInt() + + // это наш блок + if (seekKey == key) { + val binary = ByteArray(dataLength) + + if (dataLength == 0) { + // нет данных (?) + return KOptional(binary) + } + + leafStream.readFully(binary) + + return KOptional(binary) + } else { + leafStream.skipBytes(dataLength) + } + } + + return KOptional.empty() + } + + fun read(key: ByteKey): CompletableFuture> { + return CompletableFuture.supplyAsync(Supplier { + doRead(key) + }, carrier) + } + + private fun seekBlock(id: Long) { require(id >= 0) { "Negative id $id" } require(id * blockSize + blocksOffsetStart < reader.length()) { "Tried to seek block with $id, but it is outside of file's bounds (file size ${reader.length()} bytes, seeking ${id * blockSize + blocksOffsetStart})! (does not exist)" } - - lock.withLock { - reader.seek(id * blockSize + blocksOffsetStart) - } + reader.seek(id * blockSize + blocksOffsetStart) } private inner class LeafInputStream(private var offset: Int) : InputStream() { @@ -324,4 +324,38 @@ class BTreeDB(val path: File) { return true } } + + private class CarrierExecutor : Runnable, Executor { + private val isCarried = AtomicBoolean() + private val queue = ConcurrentLinkedQueue() + + override fun execute(command: Runnable) { + queue.add(command) + + if (isCarried.compareAndSet(false, true)) + pool.execute(this) + } + + override fun run() { + var next = queue.poll() + + while (next != null) { + next.run() + next = queue.poll() + } + + isCarried.set(false) + } + + companion object { + private val poolCounter = AtomicInteger() + + private val pool = ThreadPoolExecutor(0, Int.MAX_VALUE, 30L, TimeUnit.SECONDS, SynchronousQueue(), ThreadFactory { + val thread = Thread(it, "BTreeDB IO ${poolCounter.getAndIncrement()}") + thread.isDaemon = true + thread.priority = Thread.MIN_PRIORITY + return@ThreadFactory thread + }) + } + } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/io/ByteKey.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/io/ByteKey.kt index be217ac7..4e23d934 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/io/ByteKey.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/io/ByteKey.kt @@ -5,16 +5,27 @@ import ru.dbotthepony.kommons.io.writeVarInt import java.io.InputStream import java.io.OutputStream -class ByteKey(private vararg val bytes: Byte) { +class ByteKey(private vararg val bytes: Byte) : Comparable { override fun equals(other: Any?): Boolean { return this === other || other is ByteKey && other.bytes.contentEquals(bytes) } + val size: Int + get() = bytes.size + + operator fun get(index: Int): Byte { + return bytes[index] + } + fun write(stream: OutputStream) { stream.writeVarInt(bytes.size) stream.write(bytes) } + fun writeRaw(stream: OutputStream) { + stream.write(bytes) + } + override fun hashCode(): Int { return bytes.contentHashCode() } @@ -22,6 +33,21 @@ class ByteKey(private vararg val bytes: Byte) { override fun toString(): String { return "ByteKey[${bytes.joinToString(", ")}]" } + + override fun compareTo(other: ByteKey): Int { + val cmp = size.compareTo(other.size) + if (cmp != 0) return cmp + + for (i in bytes.indices) { + if (bytes[i].toInt() and 0xFF > other.bytes[i].toInt() and 0xFF) { + return 1 + } else if (bytes[i].toInt() and 0xFF < other.bytes[i].toInt() and 0xFF) { + return -1 + } + } + + return 0 + } } fun InputStream.readByteKey(): ByteKey { @@ -31,3 +57,11 @@ fun InputStream.readByteKey(): ByteKey { fun OutputStream.writeByteKey(key: ByteKey) { key.write(this) } + +fun InputStream.readByteKeyRaw(size: Int): ByteKey { + return ByteKey(*ByteArray(size).also { read(it) }) +} + +fun OutputStream.writeRawByteKey(key: ByteKey) { + key.writeRaw(this) +} diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/network/Connection.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/network/Connection.kt index f7bcd292..08252527 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/network/Connection.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/network/Connection.kt @@ -25,6 +25,9 @@ abstract class Connection(val side: ConnectionSide, val type: ConnectionType, va var isLegacy: Boolean = true protected set + var isConnected: Boolean = false + protected set + private val handshakeValidator = PacketRegistry.HANDSHAKE.Validator(side) private val handshakeSerializer = PacketRegistry.HANDSHAKE.Serializer(side) @@ -46,6 +49,7 @@ abstract class Connection(val side: ConnectionSide, val type: ConnectionType, va } isLegacy = true + isConnected = true } fun setupNative() { @@ -60,10 +64,10 @@ abstract class Connection(val side: ConnectionSide, val type: ConnectionType, va } isLegacy = false - } + isConnected = true - var disconnectionReason: String? = null - private set + inGame() + } fun bind(channel: Channel) { channel.config().setOption(ChannelOption.TCP_NODELAY, true) @@ -77,6 +81,7 @@ abstract class Connection(val side: ConnectionSide, val type: ConnectionType, va channel.pipeline().addLast(this) channel.closeFuture().addListener { + isConnected = false LOGGER.info("Connection to ${channel.remoteAddress()} is closed") } } @@ -97,12 +102,8 @@ abstract class Connection(val side: ConnectionSide, val type: ConnectionType, va } fun disconnect(reason: String) { - if (side == ConnectionSide.CLIENT) { - channel.close() - } else { - channel.flush() - channel.close() - } + channel.flush() + channel.close() } override fun close() { diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerChannels.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerChannels.kt index 2a11cc21..441dbd09 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerChannels.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerChannels.kt @@ -13,16 +13,19 @@ import ru.dbotthepony.kstarbound.network.ConnectionType import java.io.Closeable import java.net.SocketAddress import java.util.* +import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock class ServerChannels(val server: StarboundServer) : Closeable { - private val channels = Collections.synchronizedList(ArrayList()) - private val connections = Collections.synchronizedList(ArrayList()) + private val channels = CopyOnWriteArrayList() + private val connections = CopyOnWriteArrayList() private var localChannel: Channel? = null private val lock = ReentrantLock() private var isClosed = false + val connectionsView: List = Collections.unmodifiableList(connections) + @Suppress("name_shadowing") fun createLocalChannel(): Channel { val localChannel = localChannel diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt index b8d10333..91f7f608 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt @@ -114,7 +114,6 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn } fun tick() { - channel.flush() val world = world if (world == null) { diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt index 7769760b..812b08e7 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt @@ -1,7 +1,10 @@ package ru.dbotthepony.kstarbound.server +import org.apache.logging.log4j.LogManager import ru.dbotthepony.kommons.util.MailboxExecutorService +import ru.dbotthepony.kstarbound.Starbound import ru.dbotthepony.kstarbound.server.world.ServerWorld +import ru.dbotthepony.kstarbound.util.ExecutionSpinner import java.io.Closeable import java.io.File import java.util.Collections @@ -22,18 +25,22 @@ sealed class StarboundServer(val root: File) : Closeable { val worlds: MutableList = Collections.synchronizedList(ArrayList()) val serverID = threadCounter.getAndIncrement() - val thread = Thread(::runThread, "Starbound Server $serverID") - val mailbox = MailboxExecutorService(thread) + val mailbox = MailboxExecutorService() + val spinner = ExecutionSpinner(mailbox, ::spin, Starbound.TICK_TIME_ADVANCE_NANOS) + val thread = Thread(spinner, "Starbound Server $serverID") val settings = ServerSettings() val channels = ServerChannels(this) val lock = ReentrantLock() - - @Volatile var isClosed = false private set init { + thread.uncaughtExceptionHandler = Thread.UncaughtExceptionHandler { t, e -> + LOGGER.fatal("Unexpected exception in server execution loop, shutting down", e) + actuallyClose() + } + thread.isDaemon = this is IntegratedStarboundServer thread.start() } @@ -45,25 +52,31 @@ sealed class StarboundServer(val root: File) : Closeable { protected abstract fun close0() - private fun runThread() { - while (!isClosed) { - mailbox.executeQueuedTasks() - LockSupport.park() - } + private fun spin(): Boolean { + if (isClosed) return false + channels.connectionsView.forEach { if (it.isConnected) it.flush() } + return !isClosed + } + + private fun actuallyClose() { + if (isClosed) return + isClosed = true + + channels.close() + worlds.forEach { it.close() } + close0() } final override fun close() { - lock.withLock { - if (isClosed) return - - channels.close() - worlds.forEach { it.close() } - close0() - LockSupport.unpark(thread) + if (Thread.currentThread() == thread) { + actuallyClose() + } else { + mailbox.execute { actuallyClose() } } } companion object { private val threadCounter = AtomicInteger() + private val LOGGER = LogManager.getLogger() } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyChunkSource.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyChunkSource.kt index c01c89a1..6bb79d6f 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyChunkSource.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyChunkSource.kt @@ -7,6 +7,7 @@ import ru.dbotthepony.kommons.io.readVarInt import ru.dbotthepony.kommons.vector.Vector2i import ru.dbotthepony.kstarbound.Starbound import ru.dbotthepony.kstarbound.io.BTreeDB +import ru.dbotthepony.kstarbound.io.ByteKey import ru.dbotthepony.kstarbound.json.VersionedJson import ru.dbotthepony.kstarbound.world.CHUNK_SIZE import ru.dbotthepony.kstarbound.world.ChunkPos @@ -23,53 +24,55 @@ import java.util.zip.InflaterInputStream class LegacyChunkSource(val db: BTreeDB) : IChunkSource { override fun getTiles(pos: ChunkPos): CompletableFuture>> { - return CompletableFuture.supplyAsync { - val chunkX = pos.x - val chunkY = pos.y - val key = byteArrayOf(1, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) - val data = db.read(key) ?: return@supplyAsync KOptional.empty() + val chunkX = pos.x + val chunkY = pos.y + val key = ByteKey(1, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) - val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(data), Inflater()))) - reader.skipBytes(3) + return db.read(key).thenApplyAsync { + it.map { + val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(it), Inflater()))) + reader.skipBytes(3) - val result = Object2DArray.nulls(CHUNK_SIZE, CHUNK_SIZE) + val result = Object2DArray.nulls(CHUNK_SIZE, CHUNK_SIZE) - for (y in 0 until CHUNK_SIZE) { - for (x in 0 until CHUNK_SIZE) { - result[x, y] = MutableCell().read(reader) + for (y in 0 until CHUNK_SIZE) { + for (x in 0 until CHUNK_SIZE) { + result[x, y] = MutableCell().read(reader) + } } - } - KOptional(result as Object2DArray) + result as Object2DArray + } } } override fun getEntities(pos: ChunkPos): CompletableFuture>> { - return CompletableFuture.supplyAsync { - val chunkX = pos.x - val chunkY = pos.y - val key = byteArrayOf(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) - val data = db.read(key) ?: return@supplyAsync KOptional.empty() + val chunkX = pos.x + val chunkY = pos.y + val key = ByteKey(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) - val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(data), Inflater()))) - val i = reader.readVarInt() - val objects = ArrayList() + return db.read(key).thenApplyAsync { + it.map { + val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(it), Inflater()))) + val i = reader.readVarInt() + val objects = ArrayList() - for (i2 in 0 until i) { - val obj = VersionedJson(reader) + for (i2 in 0 until i) { + val obj = VersionedJson(reader) - if (obj.identifier == "ObjectEntity") { - try { - objects.add(WorldObject.fromJson(obj.content.asJsonObject)) - } catch (err: Throwable) { - LOGGER.error("Unable to deserialize entity in chunk $pos", err) + if (obj.identifier == "ObjectEntity") { + try { + objects.add(WorldObject.fromJson(obj.content.asJsonObject)) + } catch (err: Throwable) { + LOGGER.error("Unable to deserialize entity in chunk $pos", err) + } + } else { + LOGGER.error("Unknown entity type in chunk $pos: ${obj.identifier}") } - } else { - LOGGER.error("Unknown entity type in chunk $pos: ${obj.identifier}") } - } - return@supplyAsync KOptional(objects) + objects + } } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorld.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorld.kt index f28d94d6..86eb6871 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorld.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorld.kt @@ -259,7 +259,9 @@ class ServerWorld( first = false if (geometry.x.inBoundsChunk(pos.x) && geometry.y.inBoundsChunk(pos.y)) { - ticketLists.add(this@TicketList) + ticketListLock.withLock { + ticketLists.add(this@TicketList) + } val existing = chunkMap[pos]