BTreeDB gone async

This commit is contained in:
DBotThePony 2024-02-12 17:27:04 +07:00
parent fd233b9ab5
commit f6d138400f
Signed by: DBot
GPG Key ID: DCC23B5715498507
11 changed files with 330 additions and 238 deletions

View File

@ -7,6 +7,7 @@ import ru.dbotthepony.kommons.vector.Vector2d
import ru.dbotthepony.kommons.vector.Vector2i import ru.dbotthepony.kommons.vector.Vector2i
import ru.dbotthepony.kstarbound.client.StarboundClient import ru.dbotthepony.kstarbound.client.StarboundClient
import ru.dbotthepony.kstarbound.io.BTreeDB import ru.dbotthepony.kstarbound.io.BTreeDB
import ru.dbotthepony.kstarbound.io.ByteKey
import ru.dbotthepony.kstarbound.server.IntegratedStarboundServer import ru.dbotthepony.kstarbound.server.IntegratedStarboundServer
import ru.dbotthepony.kstarbound.server.world.LegacyChunkSource import ru.dbotthepony.kstarbound.server.world.LegacyChunkSource
import ru.dbotthepony.kstarbound.server.world.ServerWorld import ru.dbotthepony.kstarbound.server.world.ServerWorld
@ -33,7 +34,7 @@ fun main() {
val db = BTreeDB(File("F:\\SteamLibrary\\steamapps\\common\\Starbound - Unstable\\storage\\universe\\389760395_938904237_-238610574_5.world")) val db = BTreeDB(File("F:\\SteamLibrary\\steamapps\\common\\Starbound - Unstable\\storage\\universe\\389760395_938904237_-238610574_5.world"))
//val db = BTreeDB(File("world.world")) //val db = BTreeDB(File("world.world"))
val meta = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(db.read(byteArrayOf(0, 0, 0, 0, 0))), Inflater()))) val meta = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(db.read(ByteKey(0, 0, 0, 0, 0)).get().get()), Inflater())))
println(meta.readInt()) println(meta.readInt())
println(meta.readInt()) println(meta.readInt())
@ -83,8 +84,8 @@ fun main() {
//item.movement.applyVelocity(Vector2d(rand.nextDouble() * 1000.0 - 500.0, rand.nextDouble() * 1000.0 - 500.0)) //item.movement.applyVelocity(Vector2d(rand.nextDouble() * 1000.0 - 500.0, rand.nextDouble() * 1000.0 - 500.0))
} }
//client.connectToLocalServer(server.channels.createLocalChannel(), UUID.randomUUID()) client.connectToLocalServer(server.channels.createLocalChannel(), UUID.randomUUID())
client.connectToRemoteServer(InetSocketAddress("127.0.0.1", 21025), UUID.randomUUID()) //client.connectToRemoteServer(InetSocketAddress("127.0.0.1", 21025), UUID.randomUUID())
//client2.connectToLocalServer(server.channels.createLocalChannel(), UUID.randomUUID()) //client2.connectToLocalServer(server.channels.createLocalChannel(), UUID.randomUUID())
server.channels.createChannel(InetSocketAddress(21060)) server.channels.createChannel(InetSocketAddress(21060))
} }

View File

@ -20,8 +20,9 @@ import java.util.*
// client -> server // client -> server
class ClientConnection(val client: StarboundClient, type: ConnectionType, uuid: UUID) : Connection(ConnectionSide.CLIENT, type, uuid) { class ClientConnection(val client: StarboundClient, type: ConnectionType, uuid: UUID) : Connection(ConnectionSide.CLIENT, type, uuid) {
private fun sendHello() { private fun sendHello() {
isLegacy = true isLegacy = false
sendAndFlush(ProtocolRequestPacket(Starbound.LEGACY_PROTOCOL_VERSION)) //sendAndFlush(ProtocolRequestPacket(Starbound.LEGACY_PROTOCOL_VERSION))
sendAndFlush(ProtocolRequestPacket(Starbound.NATIVE_PROTOCOL_VERSION))
} }
var connectionID: Int = -1 var connectionID: Int = -1

View File

@ -945,7 +945,8 @@ class StarboundClient private constructor(val clientID: Int) : Closeable {
val activeConnection = activeConnection val activeConnection = activeConnection
//activeConnection?.send(TrackedPositionPacket(camera.pos)) if (activeConnection != null && !activeConnection.isLegacy && activeConnection.isConnected)
activeConnection.send(TrackedPositionPacket(camera.pos))
uberShaderPrograms.forValidRefs { it.viewMatrix = viewportMatrixScreen } uberShaderPrograms.forValidRefs { it.viewMatrix = viewportMatrixScreen }
fontShaderPrograms.forValidRefs { it.viewMatrix = viewportMatrixScreen } fontShaderPrograms.forValidRefs { it.viewMatrix = viewportMatrixScreen }

View File

@ -4,17 +4,29 @@ import it.unimi.dsi.fastutil.ints.IntArraySet
import ru.dbotthepony.kommons.io.readString import ru.dbotthepony.kommons.io.readString
import ru.dbotthepony.kommons.io.readVarInt import ru.dbotthepony.kommons.io.readVarInt
import ru.dbotthepony.kommons.io.readVarIntInfo import ru.dbotthepony.kommons.io.readVarIntInfo
import ru.dbotthepony.kommons.util.KOptional
import java.io.* import java.io.*
import java.util.* import java.util.*
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.CompletableFuture
import kotlin.concurrent.withLock import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executor
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadFactory
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.function.Supplier
import kotlin.concurrent.read
private fun readHeader(reader: RandomAccessFile, required: Char) { private fun readHeader(reader: RandomAccessFile, required: Char) {
val read = reader.read() val read = reader.read()
require(read.toChar() == required) { "Bad Starbound Pak header, expected ${required.code}, got $read" } require(read.toChar() == required) { "Bad Starbound Pak header, expected ${required.code}, got $read" }
} }
enum class TreeBlockType(val identity: String) { private enum class TreeBlockType(val identity: String) {
INDEX("II"), INDEX("II"),
LEAF("LL"), LEAF("LL"),
FREE("FF"); FREE("FF");
@ -31,29 +43,14 @@ enum class TreeBlockType(val identity: String) {
} }
} }
private operator fun ByteArray.compareTo(b: ByteArray): Int { class BTreeDB(val path: File) : Closeable {
require(size == b.size) { "Keys are not of same size (${size} vs ${b.size})" } private val reader = RandomAccessFile(path, "r")
private val carrier = CarrierExecutor()
for (i in indices) { override fun close() {
if (this[i].toInt() and 0xFF > b[i].toInt() and 0xFF) { carrier.execute { reader.close() }
return 1
} else if (this[i].toInt() and 0xFF < b[i].toInt() and 0xFF) {
return -1
}
} }
return 0
}
/**
* Класс, который позволяет читать и записывать в файлы Starbound BTReeDB 5
*
* Big credit for https://github.com/blixt/py-starbound/blob/master/FORMATS.md#btreedb5 !
*/
class BTreeDB(val path: File) {
val reader = RandomAccessFile(path, "r")
private val lock = ReentrantLock()
init { init {
readHeader(reader, 'B') readHeader(reader, 'B')
readHeader(reader, 'T') readHeader(reader, 'T')
@ -92,174 +89,177 @@ class BTreeDB(val path: File) {
val rootNodeIndex get() = if (useNodeTwo) rootNode2Index else rootNode1Index val rootNodeIndex get() = if (useNodeTwo) rootNode2Index else rootNode1Index
val rootNodeIsLeaf get() = if (useNodeTwo) rootNode2IsLeaf else rootNode1IsLeaf val rootNodeIsLeaf get() = if (useNodeTwo) rootNode2IsLeaf else rootNode1IsLeaf
fun readBlockType() = lock.withLock { TreeBlockType[reader.readString(2)] } private fun readBlockType() = TreeBlockType[reader.readString(2)]
fun findAllKeys(index: Long = rootNodeIndex): List<ByteArray> { private fun doFindAllKeys(index: Long): List<ByteKey> {
lock.withLock { seekBlock(index)
seekBlock(index)
val list = ArrayList<ByteArray>() val list = ArrayList<ByteKey>()
val type = readBlockType() val type = readBlockType()
if (type == TreeBlockType.LEAF) { if (type == TreeBlockType.LEAF) {
val keyAmount = reader.readInt() val keyAmount = reader.readInt()
// offset внутри лепестка в байтах // offset внутри лепестка в байтах
var offset = 6 var offset = 6
for (i in 0 until keyAmount) { for (i in 0 until keyAmount) {
// читаем ключ // читаем ключ
list.add(ByteArray(indexKeySize).also { reader.read(it) }) list.add(ByteKey(*ByteArray(indexKeySize).also { reader.read(it) }))
offset += indexKeySize offset += indexKeySize
// читаем размер данных внутри ключа // читаем размер данных внутри ключа
var (dataLength, readBytes) = reader.readVarIntInfo() var (dataLength, readBytes) = reader.readVarIntInfo()
offset += readBytes offset += readBytes
while (true) { while (true) {
// если конец данных внутри текущего блока, останавливаемся // если конец данных внутри текущего блока, останавливаемся
if (offset + dataLength <= blockSize - 4) { if (offset + dataLength <= blockSize - 4) {
reader.skipBytes(dataLength) reader.skipBytes(dataLength)
offset += dataLength offset += dataLength
break
}
// иначе, ищем следующий блок
// пропускаем оставшиеся данные, переходим на границу текущего блока-лепестка
val delta = (blockSize - 4 - offset)
reader.skipBytes(delta)
// ищем следующий блок с нашими данными
val nextBlockIndex = reader.readInt()
seekBlock(nextBlockIndex.toLong())
// удостоверяемся что мы попали в лепесток
check(readBlockType() == TreeBlockType.LEAF) { "Did not hit leaf block" }
offset = 2
dataLength -= delta
}
}
} else if (type == TreeBlockType.INDEX) {
reader.skipBytes(1)
val keyAmount = reader.readInt()
val blockList = IntArraySet()
blockList.add(reader.readInt())
for (i in 0 until keyAmount) {
// ключ
reader.skipBytes(indexKeySize)
// указатель на блок
blockList.add(reader.readInt())
}
// читаем все дочерние блоки на ключи
for (block in blockList.intIterator()) {
for (key in findAllKeys(block.toLong())) {
list.add(key)
}
}
}
return list
}
}
fun read(key: ByteArray): ByteArray? {
require(key.size == indexKeySize) { "Key provided is ${key.size} in size, while $indexKeySize is required" }
lock.withLock {
seekBlock(rootNodeIndex)
var type = readBlockType()
var iterations = 1000
val keyLoader = ByteArray(indexKeySize)
// сканирование индекса
while (iterations-- > 0 && type != TreeBlockType.LEAF) {
if (type == TreeBlockType.FREE) {
throw IllegalStateException("Hit free block while scanning index for ${key.joinToString(", ")}")
}
reader.skipBytes(1)
val keyCount = reader.readInt()
// if keyAmount == 4 then
// B a B b B c B d B
val readKeys = ByteArray((keyCount + 1) * 4 + keyCount * indexKeySize)
reader.readFully(readKeys)
val stream = DataInputStream(ByteArrayInputStream(readKeys))
var read = false
// B a
// B b
// B c
// B d
for (keyIndex in 0 until keyCount) {
// указатель на левый блок
val pointer = stream.readInt()
// левый ключ, всё что меньше него находится в левом блоке
stream.readFully(keyLoader)
// нужный ключ меньше самого первого ключа, поэтому он находится где то в левом блоке
if (key < keyLoader) {
seekBlock(pointer.toLong())
type = readBlockType()
read = true
break break
} }
}
if (!read) { // иначе, ищем следующий блок
// ... B
seekBlock(stream.readInt().toLong()) // пропускаем оставшиеся данные, переходим на границу текущего блока-лепестка
type = readBlockType() val delta = (blockSize - 4 - offset)
reader.skipBytes(delta)
// ищем следующий блок с нашими данными
val nextBlockIndex = reader.readInt()
seekBlock(nextBlockIndex.toLong())
// удостоверяемся что мы попали в лепесток
check(readBlockType() == TreeBlockType.LEAF) { "Did not hit leaf block" }
offset = 2
dataLength -= delta
} }
} }
} else if (type == TreeBlockType.INDEX) {
reader.skipBytes(1)
val keyAmount = reader.readInt()
// мы пришли в лепесток, теперь прямолинейно ищем в linked list val blockList = IntArraySet()
val leafStream = DataInputStream(BufferedInputStream(LeafInputStream(2))) blockList.add(reader.readInt())
val keyCount = leafStream.readInt()
for (keyIndex in 0 until keyCount) { for (i in 0 until keyAmount) {
// читаем ключ // ключ
leafStream.read(keyLoader) reader.skipBytes(indexKeySize)
// читаем размер данных // указатель на блок
val dataLength = leafStream.readVarInt() blockList.add(reader.readInt())
// это наш блок
if (keyLoader.contentEquals(key)) {
val binary = ByteArray(dataLength)
if (dataLength == 0) {
// нет данных (?)
return binary
}
leafStream.readFully(binary)
return binary
} else {
leafStream.skipBytes(dataLength)
}
} }
return null // читаем все дочерние блоки на ключи
for (block in blockList.intIterator()) {
for (key in doFindAllKeys(block.toLong())) {
list.add(key)
}
}
} }
return list
} }
fun seekBlock(id: Long) { fun findAllKeys(index: Long = rootNodeIndex): CompletableFuture<List<ByteKey>> {
return CompletableFuture.supplyAsync(Supplier {
doFindAllKeys(index)
}, carrier)
}
private fun doRead(key: ByteKey): KOptional<ByteArray> {
require(key.size == indexKeySize) { "Key provided is ${key.size} in size, while $indexKeySize is required" }
seekBlock(rootNodeIndex)
var type = readBlockType()
var iterations = 1000
// сканирование индекса
while (iterations-- > 0 && type != TreeBlockType.LEAF) {
if (type == TreeBlockType.FREE) {
throw IllegalStateException("Hit free block while scanning index for $key")
}
reader.skipBytes(1)
val keyCount = reader.readInt()
// if keyAmount == 4 then
// B a B b B c B d B
val readKeys = ByteArray((keyCount + 1) * 4 + keyCount * indexKeySize)
reader.readFully(readKeys)
val stream = DataInputStream(ByteArrayInputStream(readKeys))
var read = false
// B a
// B b
// B c
// B d
for (keyIndex in 0 until keyCount) {
// указатель на левый блок
val pointer = stream.readInt()
// левый ключ, всё что меньше него находится в левом блоке
val seekKey = stream.readByteKeyRaw(indexKeySize)
// нужный ключ меньше самого первого ключа, поэтому он находится где то в левом блоке
if (key < seekKey) {
seekBlock(pointer.toLong())
type = readBlockType()
read = true
break
}
}
if (!read) {
// ... B
seekBlock(stream.readInt().toLong())
type = readBlockType()
}
}
// мы пришли в лепесток, теперь прямолинейно ищем в linked list
val leafStream = DataInputStream(BufferedInputStream(LeafInputStream(2)))
val keyCount = leafStream.readInt()
for (keyIndex in 0 until keyCount) {
// читаем ключ
val seekKey = leafStream.readByteKeyRaw(indexKeySize)
// читаем размер данных
val dataLength = leafStream.readVarInt()
// это наш блок
if (seekKey == key) {
val binary = ByteArray(dataLength)
if (dataLength == 0) {
// нет данных (?)
return KOptional(binary)
}
leafStream.readFully(binary)
return KOptional(binary)
} else {
leafStream.skipBytes(dataLength)
}
}
return KOptional.empty()
}
fun read(key: ByteKey): CompletableFuture<KOptional<ByteArray>> {
return CompletableFuture.supplyAsync(Supplier {
doRead(key)
}, carrier)
}
private fun seekBlock(id: Long) {
require(id >= 0) { "Negative id $id" } require(id >= 0) { "Negative id $id" }
require(id * blockSize + blocksOffsetStart < reader.length()) { "Tried to seek block with $id, but it is outside of file's bounds (file size ${reader.length()} bytes, seeking ${id * blockSize + blocksOffsetStart})! (does not exist)" } require(id * blockSize + blocksOffsetStart < reader.length()) { "Tried to seek block with $id, but it is outside of file's bounds (file size ${reader.length()} bytes, seeking ${id * blockSize + blocksOffsetStart})! (does not exist)" }
reader.seek(id * blockSize + blocksOffsetStart)
lock.withLock {
reader.seek(id * blockSize + blocksOffsetStart)
}
} }
private inner class LeafInputStream(private var offset: Int) : InputStream() { private inner class LeafInputStream(private var offset: Int) : InputStream() {
@ -324,4 +324,38 @@ class BTreeDB(val path: File) {
return true return true
} }
} }
private class CarrierExecutor : Runnable, Executor {
private val isCarried = AtomicBoolean()
private val queue = ConcurrentLinkedQueue<Runnable>()
override fun execute(command: Runnable) {
queue.add(command)
if (isCarried.compareAndSet(false, true))
pool.execute(this)
}
override fun run() {
var next = queue.poll()
while (next != null) {
next.run()
next = queue.poll()
}
isCarried.set(false)
}
companion object {
private val poolCounter = AtomicInteger()
private val pool = ThreadPoolExecutor(0, Int.MAX_VALUE, 30L, TimeUnit.SECONDS, SynchronousQueue(), ThreadFactory {
val thread = Thread(it, "BTreeDB IO ${poolCounter.getAndIncrement()}")
thread.isDaemon = true
thread.priority = Thread.MIN_PRIORITY
return@ThreadFactory thread
})
}
}
} }

View File

@ -5,16 +5,27 @@ import ru.dbotthepony.kommons.io.writeVarInt
import java.io.InputStream import java.io.InputStream
import java.io.OutputStream import java.io.OutputStream
class ByteKey(private vararg val bytes: Byte) { class ByteKey(private vararg val bytes: Byte) : Comparable<ByteKey> {
override fun equals(other: Any?): Boolean { override fun equals(other: Any?): Boolean {
return this === other || other is ByteKey && other.bytes.contentEquals(bytes) return this === other || other is ByteKey && other.bytes.contentEquals(bytes)
} }
val size: Int
get() = bytes.size
operator fun get(index: Int): Byte {
return bytes[index]
}
fun write(stream: OutputStream) { fun write(stream: OutputStream) {
stream.writeVarInt(bytes.size) stream.writeVarInt(bytes.size)
stream.write(bytes) stream.write(bytes)
} }
fun writeRaw(stream: OutputStream) {
stream.write(bytes)
}
override fun hashCode(): Int { override fun hashCode(): Int {
return bytes.contentHashCode() return bytes.contentHashCode()
} }
@ -22,6 +33,21 @@ class ByteKey(private vararg val bytes: Byte) {
override fun toString(): String { override fun toString(): String {
return "ByteKey[${bytes.joinToString(", ")}]" return "ByteKey[${bytes.joinToString(", ")}]"
} }
override fun compareTo(other: ByteKey): Int {
val cmp = size.compareTo(other.size)
if (cmp != 0) return cmp
for (i in bytes.indices) {
if (bytes[i].toInt() and 0xFF > other.bytes[i].toInt() and 0xFF) {
return 1
} else if (bytes[i].toInt() and 0xFF < other.bytes[i].toInt() and 0xFF) {
return -1
}
}
return 0
}
} }
fun InputStream.readByteKey(): ByteKey { fun InputStream.readByteKey(): ByteKey {
@ -31,3 +57,11 @@ fun InputStream.readByteKey(): ByteKey {
fun OutputStream.writeByteKey(key: ByteKey) { fun OutputStream.writeByteKey(key: ByteKey) {
key.write(this) key.write(this)
} }
fun InputStream.readByteKeyRaw(size: Int): ByteKey {
return ByteKey(*ByteArray(size).also { read(it) })
}
fun OutputStream.writeRawByteKey(key: ByteKey) {
key.writeRaw(this)
}

View File

@ -25,6 +25,9 @@ abstract class Connection(val side: ConnectionSide, val type: ConnectionType, va
var isLegacy: Boolean = true var isLegacy: Boolean = true
protected set protected set
var isConnected: Boolean = false
protected set
private val handshakeValidator = PacketRegistry.HANDSHAKE.Validator(side) private val handshakeValidator = PacketRegistry.HANDSHAKE.Validator(side)
private val handshakeSerializer = PacketRegistry.HANDSHAKE.Serializer(side) private val handshakeSerializer = PacketRegistry.HANDSHAKE.Serializer(side)
@ -46,6 +49,7 @@ abstract class Connection(val side: ConnectionSide, val type: ConnectionType, va
} }
isLegacy = true isLegacy = true
isConnected = true
} }
fun setupNative() { fun setupNative() {
@ -60,10 +64,10 @@ abstract class Connection(val side: ConnectionSide, val type: ConnectionType, va
} }
isLegacy = false isLegacy = false
} isConnected = true
var disconnectionReason: String? = null inGame()
private set }
fun bind(channel: Channel) { fun bind(channel: Channel) {
channel.config().setOption(ChannelOption.TCP_NODELAY, true) channel.config().setOption(ChannelOption.TCP_NODELAY, true)
@ -77,6 +81,7 @@ abstract class Connection(val side: ConnectionSide, val type: ConnectionType, va
channel.pipeline().addLast(this) channel.pipeline().addLast(this)
channel.closeFuture().addListener { channel.closeFuture().addListener {
isConnected = false
LOGGER.info("Connection to ${channel.remoteAddress()} is closed") LOGGER.info("Connection to ${channel.remoteAddress()} is closed")
} }
} }
@ -97,12 +102,8 @@ abstract class Connection(val side: ConnectionSide, val type: ConnectionType, va
} }
fun disconnect(reason: String) { fun disconnect(reason: String) {
if (side == ConnectionSide.CLIENT) { channel.flush()
channel.close() channel.close()
} else {
channel.flush()
channel.close()
}
} }
override fun close() { override fun close() {

View File

@ -13,16 +13,19 @@ import ru.dbotthepony.kstarbound.network.ConnectionType
import java.io.Closeable import java.io.Closeable
import java.net.SocketAddress import java.net.SocketAddress
import java.util.* import java.util.*
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock import kotlin.concurrent.withLock
class ServerChannels(val server: StarboundServer) : Closeable { class ServerChannels(val server: StarboundServer) : Closeable {
private val channels = Collections.synchronizedList(ArrayList<ChannelFuture>()) private val channels = CopyOnWriteArrayList<ChannelFuture>()
private val connections = Collections.synchronizedList(ArrayList<ServerConnection>()) private val connections = CopyOnWriteArrayList<ServerConnection>()
private var localChannel: Channel? = null private var localChannel: Channel? = null
private val lock = ReentrantLock() private val lock = ReentrantLock()
private var isClosed = false private var isClosed = false
val connectionsView: List<ServerConnection> = Collections.unmodifiableList(connections)
@Suppress("name_shadowing") @Suppress("name_shadowing")
fun createLocalChannel(): Channel { fun createLocalChannel(): Channel {
val localChannel = localChannel val localChannel = localChannel

View File

@ -114,7 +114,6 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn
} }
fun tick() { fun tick() {
channel.flush()
val world = world val world = world
if (world == null) { if (world == null) {

View File

@ -1,7 +1,10 @@
package ru.dbotthepony.kstarbound.server package ru.dbotthepony.kstarbound.server
import org.apache.logging.log4j.LogManager
import ru.dbotthepony.kommons.util.MailboxExecutorService import ru.dbotthepony.kommons.util.MailboxExecutorService
import ru.dbotthepony.kstarbound.Starbound
import ru.dbotthepony.kstarbound.server.world.ServerWorld import ru.dbotthepony.kstarbound.server.world.ServerWorld
import ru.dbotthepony.kstarbound.util.ExecutionSpinner
import java.io.Closeable import java.io.Closeable
import java.io.File import java.io.File
import java.util.Collections import java.util.Collections
@ -22,18 +25,22 @@ sealed class StarboundServer(val root: File) : Closeable {
val worlds: MutableList<ServerWorld> = Collections.synchronizedList(ArrayList<ServerWorld>()) val worlds: MutableList<ServerWorld> = Collections.synchronizedList(ArrayList<ServerWorld>())
val serverID = threadCounter.getAndIncrement() val serverID = threadCounter.getAndIncrement()
val thread = Thread(::runThread, "Starbound Server $serverID") val mailbox = MailboxExecutorService()
val mailbox = MailboxExecutorService(thread) val spinner = ExecutionSpinner(mailbox, ::spin, Starbound.TICK_TIME_ADVANCE_NANOS)
val thread = Thread(spinner, "Starbound Server $serverID")
val settings = ServerSettings() val settings = ServerSettings()
val channels = ServerChannels(this) val channels = ServerChannels(this)
val lock = ReentrantLock() val lock = ReentrantLock()
@Volatile
var isClosed = false var isClosed = false
private set private set
init { init {
thread.uncaughtExceptionHandler = Thread.UncaughtExceptionHandler { t, e ->
LOGGER.fatal("Unexpected exception in server execution loop, shutting down", e)
actuallyClose()
}
thread.isDaemon = this is IntegratedStarboundServer thread.isDaemon = this is IntegratedStarboundServer
thread.start() thread.start()
} }
@ -45,25 +52,31 @@ sealed class StarboundServer(val root: File) : Closeable {
protected abstract fun close0() protected abstract fun close0()
private fun runThread() { private fun spin(): Boolean {
while (!isClosed) { if (isClosed) return false
mailbox.executeQueuedTasks() channels.connectionsView.forEach { if (it.isConnected) it.flush() }
LockSupport.park() return !isClosed
} }
private fun actuallyClose() {
if (isClosed) return
isClosed = true
channels.close()
worlds.forEach { it.close() }
close0()
} }
final override fun close() { final override fun close() {
lock.withLock { if (Thread.currentThread() == thread) {
if (isClosed) return actuallyClose()
} else {
channels.close() mailbox.execute { actuallyClose() }
worlds.forEach { it.close() }
close0()
LockSupport.unpark(thread)
} }
} }
companion object { companion object {
private val threadCounter = AtomicInteger() private val threadCounter = AtomicInteger()
private val LOGGER = LogManager.getLogger()
} }
} }

View File

@ -7,6 +7,7 @@ import ru.dbotthepony.kommons.io.readVarInt
import ru.dbotthepony.kommons.vector.Vector2i import ru.dbotthepony.kommons.vector.Vector2i
import ru.dbotthepony.kstarbound.Starbound import ru.dbotthepony.kstarbound.Starbound
import ru.dbotthepony.kstarbound.io.BTreeDB import ru.dbotthepony.kstarbound.io.BTreeDB
import ru.dbotthepony.kstarbound.io.ByteKey
import ru.dbotthepony.kstarbound.json.VersionedJson import ru.dbotthepony.kstarbound.json.VersionedJson
import ru.dbotthepony.kstarbound.world.CHUNK_SIZE import ru.dbotthepony.kstarbound.world.CHUNK_SIZE
import ru.dbotthepony.kstarbound.world.ChunkPos import ru.dbotthepony.kstarbound.world.ChunkPos
@ -23,53 +24,55 @@ import java.util.zip.InflaterInputStream
class LegacyChunkSource(val db: BTreeDB) : IChunkSource { class LegacyChunkSource(val db: BTreeDB) : IChunkSource {
override fun getTiles(pos: ChunkPos): CompletableFuture<KOptional<Object2DArray<out AbstractCell>>> { override fun getTiles(pos: ChunkPos): CompletableFuture<KOptional<Object2DArray<out AbstractCell>>> {
return CompletableFuture.supplyAsync { val chunkX = pos.x
val chunkX = pos.x val chunkY = pos.y
val chunkY = pos.y val key = ByteKey(1, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
val key = byteArrayOf(1, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
val data = db.read(key) ?: return@supplyAsync KOptional.empty()
val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(data), Inflater()))) return db.read(key).thenApplyAsync {
reader.skipBytes(3) it.map {
val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(it), Inflater())))
reader.skipBytes(3)
val result = Object2DArray.nulls<MutableCell>(CHUNK_SIZE, CHUNK_SIZE) val result = Object2DArray.nulls<MutableCell>(CHUNK_SIZE, CHUNK_SIZE)
for (y in 0 until CHUNK_SIZE) { for (y in 0 until CHUNK_SIZE) {
for (x in 0 until CHUNK_SIZE) { for (x in 0 until CHUNK_SIZE) {
result[x, y] = MutableCell().read(reader) result[x, y] = MutableCell().read(reader)
}
} }
}
KOptional(result as Object2DArray<out AbstractCell>) result as Object2DArray<out AbstractCell>
}
} }
} }
override fun getEntities(pos: ChunkPos): CompletableFuture<KOptional<Collection<AbstractEntity>>> { override fun getEntities(pos: ChunkPos): CompletableFuture<KOptional<Collection<AbstractEntity>>> {
return CompletableFuture.supplyAsync { val chunkX = pos.x
val chunkX = pos.x val chunkY = pos.y
val chunkY = pos.y val key = ByteKey(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
val key = byteArrayOf(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
val data = db.read(key) ?: return@supplyAsync KOptional.empty()
val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(data), Inflater()))) return db.read(key).thenApplyAsync {
val i = reader.readVarInt() it.map {
val objects = ArrayList<AbstractEntity>() val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(it), Inflater())))
val i = reader.readVarInt()
val objects = ArrayList<AbstractEntity>()
for (i2 in 0 until i) { for (i2 in 0 until i) {
val obj = VersionedJson(reader) val obj = VersionedJson(reader)
if (obj.identifier == "ObjectEntity") { if (obj.identifier == "ObjectEntity") {
try { try {
objects.add(WorldObject.fromJson(obj.content.asJsonObject)) objects.add(WorldObject.fromJson(obj.content.asJsonObject))
} catch (err: Throwable) { } catch (err: Throwable) {
LOGGER.error("Unable to deserialize entity in chunk $pos", err) LOGGER.error("Unable to deserialize entity in chunk $pos", err)
}
} else {
LOGGER.error("Unknown entity type in chunk $pos: ${obj.identifier}")
} }
} else {
LOGGER.error("Unknown entity type in chunk $pos: ${obj.identifier}")
} }
}
return@supplyAsync KOptional(objects) objects
}
} }
} }

View File

@ -259,7 +259,9 @@ class ServerWorld(
first = false first = false
if (geometry.x.inBoundsChunk(pos.x) && geometry.y.inBoundsChunk(pos.y)) { if (geometry.x.inBoundsChunk(pos.x) && geometry.y.inBoundsChunk(pos.y)) {
ticketLists.add(this@TicketList) ticketListLock.withLock {
ticketLists.add(this@TicketList)
}
val existing = chunkMap[pos] val existing = chunkMap[pos]