From 639aafce506e1bc03b6e20b6004c29e26d4a220d Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Fri, 3 May 2024 16:13:16 +0700 Subject: [PATCH] Now, at long last, unique entity index and messages --- .../kstarbound/client/world/ClientWorld.kt | 54 ++++ .../kstarbound/defs/PlayerWarping.kt | 2 +- .../lua/bindings/WorldEntityBindings.kt | 7 +- .../network/packets/EntityMessagePacket.kt | 40 +-- .../FindUniqueEntityResponsePacket.kt | 39 +-- .../serverbound/FindUniqueEntityPacket.kt | 11 +- .../network/syncher/BasicNetworkedElement.kt | 40 ++- .../network/syncher/EventCounterElement.kt | 2 +- .../kstarbound/server/ServerConnection.kt | 4 +- .../kstarbound/server/StarboundServer.kt | 2 +- .../server/world/LegacyWorldStorage.kt | 304 ++++++++++++++---- .../server/world/NativeWorldStorage.kt | 22 +- .../kstarbound/server/world/ServerChunk.kt | 25 +- .../kstarbound/server/world/ServerUniverse.kt | 3 +- .../kstarbound/server/world/ServerWorld.kt | 49 ++- .../server/world/ServerWorldTracker.kt | 68 +++- .../kstarbound/server/world/WorldStorage.kt | 125 +------ .../kstarbound/util/ActionPacer.kt | 7 +- .../kstarbound/util/CarriedExecutor.kt | 26 +- .../ru/dbotthepony/kstarbound/world/World.kt | 41 ++- .../world/entities/AbstractEntity.kt | 36 ++- 21 files changed, 625 insertions(+), 282 deletions(-) diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/client/world/ClientWorld.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/client/world/ClientWorld.kt index 8a1e965b..7604db3d 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/client/world/ClientWorld.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/client/world/ClientWorld.kt @@ -1,6 +1,9 @@ package ru.dbotthepony.kstarbound.client.world +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.Caffeine import com.google.common.base.Supplier +import com.google.gson.JsonArray import com.google.gson.JsonElement import com.google.gson.JsonObject import it.unimi.dsi.fastutil.longs.Long2ObjectFunction @@ -9,10 +12,12 @@ import it.unimi.dsi.fastutil.longs.LongArraySet import it.unimi.dsi.fastutil.objects.ReferenceArraySet import ru.dbotthepony.kommons.util.IStruct2i import ru.dbotthepony.kommons.math.RGBAColor +import ru.dbotthepony.kommons.util.Either import ru.dbotthepony.kstarbound.math.AABB import ru.dbotthepony.kstarbound.math.vector.Vector2f import ru.dbotthepony.kstarbound.math.vector.Vector2i import ru.dbotthepony.kstarbound.Registry +import ru.dbotthepony.kstarbound.Starbound import ru.dbotthepony.kstarbound.client.StarboundClient import ru.dbotthepony.kstarbound.client.render.ConfiguredMesh import ru.dbotthepony.kstarbound.client.render.LayeredRenderer @@ -22,11 +27,14 @@ import ru.dbotthepony.kstarbound.defs.tile.LiquidDefinition import ru.dbotthepony.kstarbound.defs.world.WorldTemplate import ru.dbotthepony.kstarbound.math.roundTowardsNegativeInfinity import ru.dbotthepony.kstarbound.math.roundTowardsPositiveInfinity +import ru.dbotthepony.kstarbound.math.vector.Vector2d import ru.dbotthepony.kstarbound.network.Connection import ru.dbotthepony.kstarbound.network.packets.DamageNotificationPacket import ru.dbotthepony.kstarbound.network.packets.DamageRequestPacket +import ru.dbotthepony.kstarbound.network.packets.EntityMessagePacket import ru.dbotthepony.kstarbound.network.packets.HitRequestPacket import ru.dbotthepony.kstarbound.network.packets.clientbound.UpdateWorldPropertiesPacket +import ru.dbotthepony.kstarbound.network.packets.serverbound.FindUniqueEntityPacket import ru.dbotthepony.kstarbound.util.BlockableEventLoop import ru.dbotthepony.kstarbound.world.CHUNK_SIZE import ru.dbotthepony.kstarbound.world.ChunkPos @@ -36,10 +44,13 @@ import ru.dbotthepony.kstarbound.world.api.ITileAccess import ru.dbotthepony.kstarbound.world.api.OffsetCellAccess import ru.dbotthepony.kstarbound.world.api.TileView import ru.dbotthepony.kstarbound.world.positiveModulo +import java.time.Duration +import java.util.* import java.util.concurrent.CompletableFuture import java.util.concurrent.Future import java.util.concurrent.TimeUnit import java.util.function.Consumer +import kotlin.collections.ArrayList class ClientWorld( val client: StarboundClient, @@ -346,6 +357,49 @@ class ClientWorld( return client.activeConnection } + val pendingUniqueEntityRequests: Cache> = Caffeine.newBuilder() + .maximumSize(4096L) // why would you even need to request this many unique entities anyway? + // who knows! modders probably know! + .expireAfterWrite(Duration.ofMinutes(1)) + .scheduler(Starbound) + .executor(Starbound.EXECUTOR) + .evictionListener> { key, value, cause -> if (cause.wasEvicted()) value!!.complete(null) } + .build() + + override fun findUniqueEntity(id: String): CompletableFuture { + val loaded = uniqueEntities[id] + + if (loaded != null) { + return CompletableFuture.completedFuture(loaded.position) + } + + val connection = client.activeConnection ?: return CompletableFuture.completedFuture(null) + + return pendingUniqueEntityRequests.get(id) { + val future = CompletableFuture() + connection.send(FindUniqueEntityPacket(id)) + future + } + } + + override fun dispatchEntityMessage( + sourceConnection: Int, + entityID: String, + message: String, + arguments: JsonArray + ): CompletableFuture { + val loaded = uniqueEntities[entityID] + + if (loaded != null) { + return loaded.dispatchMessage(sourceConnection, message, arguments) + } + + val connection = client.activeConnection ?: return CompletableFuture.failedFuture(MessageCallException("No active connection")) + val (future, packet) = createRemoteEntityMessageFuture(sourceConnection, Either.right(entityID), message, arguments) + connection.send(packet) + return future + } + companion object { val ring = listOf( Vector2i(0, 0), diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/defs/PlayerWarping.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/defs/PlayerWarping.kt index ad7b1a0a..e73110e9 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/defs/PlayerWarping.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/defs/PlayerWarping.kt @@ -63,7 +63,7 @@ sealed class SpawnTarget { } override suspend fun resolve(world: ServerWorld): Vector2d? { - return world.entities.values.firstOrNull { it.uniqueID.get() == id }?.position + return world.uniqueEntities[id]?.position } override fun toString(): String { diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/lua/bindings/WorldEntityBindings.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/lua/bindings/WorldEntityBindings.kt index c1c93ad4..f308621a 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/lua/bindings/WorldEntityBindings.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/lua/bindings/WorldEntityBindings.kt @@ -550,7 +550,12 @@ fun provideWorldEntitiesBindings(self: World<*, *>, callbacks: Table, lua: LuaEn returnBuffer.setToContentsOf(entity.callScript(function, *it.copyRemaining())) } - callbacks["findUniqueEntity"] = luaStub("findUniqueEntity") + callbacks["findUniqueEntity"] = luaFunction { id: ByteString -> + returnBuffer.setTo(LuaFuture( + future = self.findUniqueEntity(id.decode()).thenApply { from(it) }, + isLocal = self.isServer + )) + } callbacks["sendEntityMessage"] = luaFunctionN("sendEntityMessage") { val id = it.nextAny() diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/EntityMessagePacket.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/EntityMessagePacket.kt index 48216d4e..bf2b3fab 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/EntityMessagePacket.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/EntityMessagePacket.kt @@ -44,26 +44,13 @@ class EntityMessagePacket(val entity: Either, val message: String, stream.writeShort(sourceConnection) } - private fun handle(connection: Connection, world: World<*, *>) { - val future = if (entity.isLeft) { - world.dispatchEntityMessage(connection.connectionID, entity.left(), message, arguments) - } else { - world.dispatchEntityMessage(connection.connectionID, entity.right(), message, arguments) - } - - future - .thenAccept(Consumer { - connection.send(EntityMessageResponsePacket(Either.right(it), id)) - }) - .exceptionally(Function { - connection.send(EntityMessageResponsePacket(Either.left(it.message ?: "Internal server error"), id)) - null - }) - } - override fun play(connection: ServerConnection) { - connection.enqueue { - handle(connection, this) + val tracker = connection.tracker + + if (tracker == null) { + connection.send(EntityMessageResponsePacket(Either.left("Not in world"), this@EntityMessagePacket.id)) + } else { + tracker.handleEntityMessage(this) } } @@ -72,7 +59,20 @@ class EntityMessagePacket(val entity: Either, val message: String, val world = world if (world != null) { - handle(connection, world) + val future = if (entity.isLeft) { + world.dispatchEntityMessage(connection.connectionID, entity.left(), message, arguments) + } else { + world.dispatchEntityMessage(connection.connectionID, entity.right(), message, arguments) + } + + future + .thenAccept(Consumer { + connection.send(EntityMessageResponsePacket(Either.right(it), this@EntityMessagePacket.id)) + }) + .exceptionally(Function { + connection.send(EntityMessageResponsePacket(Either.left(it.message ?: "Internal server error"), this@EntityMessagePacket.id)) + null + }) } } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/clientbound/FindUniqueEntityResponsePacket.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/clientbound/FindUniqueEntityResponsePacket.kt index 74831415..702691e6 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/clientbound/FindUniqueEntityResponsePacket.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/clientbound/FindUniqueEntityResponsePacket.kt @@ -1,13 +1,11 @@ package ru.dbotthepony.kstarbound.network.packets.clientbound -import ru.dbotthepony.kommons.io.readBinaryString -import ru.dbotthepony.kstarbound.io.readVector2d -import ru.dbotthepony.kstarbound.io.readVector2f import ru.dbotthepony.kommons.io.writeBinaryString -import ru.dbotthepony.kommons.io.writeStruct2d -import ru.dbotthepony.kommons.io.writeStruct2f -import ru.dbotthepony.kstarbound.math.vector.Vector2d import ru.dbotthepony.kstarbound.client.ClientConnection +import ru.dbotthepony.kstarbound.io.readInternedString +import ru.dbotthepony.kstarbound.io.readVector2d +import ru.dbotthepony.kstarbound.io.writeStruct2d +import ru.dbotthepony.kstarbound.math.vector.Vector2d import ru.dbotthepony.kstarbound.network.IClientPacket import java.io.DataInputStream import java.io.DataOutputStream @@ -17,36 +15,25 @@ class FindUniqueEntityResponsePacket(val name: String, val position: Vector2d?) stream.writeBinaryString(name) stream.writeBoolean(position != null) - if (isLegacy) { - if (position != null) - stream.writeStruct2f(position.toFloatVector()) - } else { - if (position != null) - stream.writeStruct2d(position) - } + if (position != null) + stream.writeStruct2d(position, isLegacy) } override fun play(connection: ClientConnection) { - TODO("Not yet implemented") + connection.enqueue { + world?.pendingUniqueEntityRequests?.asMap()?.remove(name)?.complete(position) + } } companion object { fun read(stream: DataInputStream, isLegacy: Boolean): FindUniqueEntityResponsePacket { - val name = stream.readBinaryString() + val name = stream.readInternedString() val position: Vector2d? - if (isLegacy) { - if (stream.readBoolean()) { - position = stream.readVector2f().toDoubleVector() - } else { - position = null - } + if (stream.readBoolean()) { + position = stream.readVector2d(isLegacy) } else { - if (stream.readBoolean()) { - position = stream.readVector2d() - } else { - position = null - } + position = null } return FindUniqueEntityResponsePacket(name, position) diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/serverbound/FindUniqueEntityPacket.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/serverbound/FindUniqueEntityPacket.kt index f0f8eff4..7e0969de 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/serverbound/FindUniqueEntityPacket.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/serverbound/FindUniqueEntityPacket.kt @@ -1,7 +1,7 @@ package ru.dbotthepony.kstarbound.network.packets.serverbound -import ru.dbotthepony.kommons.io.readBinaryString import ru.dbotthepony.kommons.io.writeBinaryString +import ru.dbotthepony.kstarbound.io.readInternedString import ru.dbotthepony.kstarbound.network.IServerPacket import ru.dbotthepony.kstarbound.network.packets.clientbound.FindUniqueEntityResponsePacket import ru.dbotthepony.kstarbound.server.ServerConnection @@ -9,15 +9,18 @@ import java.io.DataInputStream import java.io.DataOutputStream class FindUniqueEntityPacket(val name: String) : IServerPacket { - constructor(stream: DataInputStream, isLegacy: Boolean) : this(stream.readBinaryString()) + constructor(stream: DataInputStream, isLegacy: Boolean) : this(stream.readInternedString()) override fun write(stream: DataOutputStream, isLegacy: Boolean) { stream.writeBinaryString(name) } override fun play(connection: ServerConnection) { - connection.enqueue { - // Do something + val tracker = connection.tracker + + if (tracker != null) { + tracker.findUniqueEntity(name) + } else { connection.send(FindUniqueEntityResponsePacket(name, null)) } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/BasicNetworkedElement.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/BasicNetworkedElement.kt index d0bfda16..bbcccffa 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/BasicNetworkedElement.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/BasicNetworkedElement.kt @@ -5,34 +5,59 @@ import ru.dbotthepony.kommons.util.Listenable import ru.dbotthepony.kommons.util.ListenableDelegate import java.io.DataInputStream import java.io.DataOutputStream +import java.util.concurrent.CopyOnWriteArrayList import java.util.function.Consumer open class BasicNetworkedElement(private var value: TYPE, protected val codec: StreamCodec, protected val legacyCodec: StreamCodec, protected val toLegacy: (TYPE) -> LEGACY, protected val fromLegacy: (LEGACY) -> TYPE) : NetworkedElement(), ListenableDelegate { - protected val valueListeners = Listenable.Impl() - protected val queue = InterpolationQueue { this.value = it; valueListeners.accept(value) } + protected val valueListeners = CopyOnWriteArrayList() + + protected val queue = InterpolationQueue { + val old = this.value + this.value = it + valueListeners.forEach { c -> c.callable.invoke(it, old) } + } + protected var isInterpolating = false protected var currentTime = 0.0 + protected inner class Listener(val callable: (TYPE, TYPE) -> Unit) : Listenable.L { + constructor(listener: Consumer) : this({ v, _ -> listener.accept(v) }) + constructor(listener: Runnable) : this({ _, _ -> listener.run() }) + + init { + valueListeners.add(this) + } + + override fun remove() { + valueListeners.remove(this) + } + } + override fun toString(): String { return "BasicNetworkedElement[$value, isInterpolating=$isInterpolating, currentTime=$currentTime]" } override fun accept(t: TYPE) { if (t != value) { + val old = value value = t queue.clear() bumpVersion() - valueListeners.accept(t) + valueListeners.forEach { it.callable.invoke(t, old) } } } override fun addListener(listener: Consumer): Listenable.L { - return valueListeners.addListener(listener) + return Listener(listener) + } + + fun addListener(listener: (new: TYPE, old: TYPE) -> Unit): Listenable.L { + return Listener(listener) } @Deprecated("Internal API") override fun listen(listener: Runnable): Listenable.L { - return valueListeners.addListener(listener) + return Listener(listener) } override fun get(): TYPE { @@ -46,7 +71,7 @@ open class BasicNetworkedElement(private var value: TYPE, protecte bumpVersion() if (value != old) { - valueListeners.accept(value) + valueListeners.forEach { it.callable.invoke(value, old) } } } @@ -65,8 +90,9 @@ open class BasicNetworkedElement(private var value: TYPE, protecte if (isInterpolating) { queue.push(read, interpolationDelay) } else { + val old = value value = read - valueListeners.accept(read) + valueListeners.forEach { it.callable.invoke(read, old) } } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/EventCounterElement.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/EventCounterElement.kt index 2cc349ff..ecb943ea 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/EventCounterElement.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/network/syncher/EventCounterElement.kt @@ -38,7 +38,7 @@ class EventCounterElement : BasicNetworkedElement(0L, UnsignedVarLon } init { - valueListeners.addListener(Consumer { + addListener(Consumer { if (it < pulled) pulled = it }) diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt index c14fc197..1d8370dc 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt @@ -60,7 +60,7 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn // packets which interact with world must be // executed on world's thread - fun enqueue(task: ServerWorld.() -> Unit): Boolean { + fun enqueue(task: ServerWorld.(ServerWorldTracker) -> Unit): Boolean { val isInWorld = tracker?.enqueue(task) != null if (!isInWorld) { @@ -105,7 +105,7 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn override fun setupNative() { super.setupNative() - shipChunkSource = WorldStorage.NULL + shipChunkSource = LegacyWorldStorage.Memory({ shipChunks[it]?.orNull() }, { key, value -> shipChunks[key] = KOptional(value) }) } fun receiveShipChunks(chunks: Map>) { diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt index 88b20422..4c578cee 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt @@ -269,7 +269,7 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread visitable.disableDeathDrops = config.disableDeathDrops val template = WorldTemplate(visitable, config.skyParameters, random) - val world = ServerWorld.create(this, template, WorldStorage.NULL, location) + val world = ServerWorld.create(this, template, LegacyWorldStorage.memory(), location) try { world.setProperty("ephemeral", JsonPrimitive(!config.persistent)) diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyWorldStorage.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyWorldStorage.kt index 94dbe27d..febe075a 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyWorldStorage.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/LegacyWorldStorage.kt @@ -1,22 +1,35 @@ package ru.dbotthepony.kstarbound.server.world +import com.github.benmanes.caffeine.cache.AsyncCacheLoader +import com.github.benmanes.caffeine.cache.Caffeine +import com.google.gson.JsonObject import it.unimi.dsi.fastutil.io.FastByteArrayInputStream import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.future.await +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import ru.dbotthepony.kommons.arrays.Object2DArray -import ru.dbotthepony.kommons.io.BTreeDB6 import ru.dbotthepony.kommons.io.ByteKey +import ru.dbotthepony.kommons.io.readBinaryString +import ru.dbotthepony.kommons.io.readCollection import ru.dbotthepony.kommons.io.readVarInt +import ru.dbotthepony.kommons.io.writeBinaryString +import ru.dbotthepony.kommons.io.writeCollection +import ru.dbotthepony.kommons.io.writeStruct2f import ru.dbotthepony.kommons.io.writeVarInt -import ru.dbotthepony.kommons.util.KOptional +import ru.dbotthepony.kommons.util.xxhash32 import ru.dbotthepony.kstarbound.Starbound -import ru.dbotthepony.kstarbound.defs.tile.BuiltinMetaMaterials -import ru.dbotthepony.kstarbound.defs.tile.isNullTile +import ru.dbotthepony.kstarbound.VersionRegistry import ru.dbotthepony.kstarbound.io.BTreeDB5 +import ru.dbotthepony.kstarbound.io.readInternedString +import ru.dbotthepony.kstarbound.io.readVector2f import ru.dbotthepony.kstarbound.json.VersionedJson +import ru.dbotthepony.kstarbound.math.vector.Vector2d import ru.dbotthepony.kstarbound.math.vector.Vector2i import ru.dbotthepony.kstarbound.util.CarriedExecutor -import ru.dbotthepony.kstarbound.util.supplyAsync +import ru.dbotthepony.kstarbound.util.ScheduledCoroutineExecutor import ru.dbotthepony.kstarbound.world.CHUNK_SIZE import ru.dbotthepony.kstarbound.world.ChunkPos import ru.dbotthepony.kstarbound.world.ChunkState @@ -25,7 +38,6 @@ import ru.dbotthepony.kstarbound.world.api.AbstractCell import ru.dbotthepony.kstarbound.world.api.ImmutableCell import ru.dbotthepony.kstarbound.world.api.MutableCell import ru.dbotthepony.kstarbound.world.entities.AbstractEntity -import ru.dbotthepony.kstarbound.world.entities.tile.WorldObject import java.io.BufferedInputStream import java.io.BufferedOutputStream import java.io.ByteArrayInputStream @@ -33,106 +45,256 @@ import java.io.DataInputStream import java.io.DataOutputStream import java.io.File import java.lang.ref.Cleaner -import java.lang.ref.WeakReference -import java.sql.Connection import java.sql.DriverManager -import java.sql.PreparedStatement +import java.time.Duration import java.util.concurrent.CompletableFuture -import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.Executor import java.util.concurrent.TimeUnit -import java.util.concurrent.locks.ReentrantLock import java.util.function.Function import java.util.function.Supplier +import java.util.zip.Deflater import java.util.zip.DeflaterOutputStream +import java.util.zip.Inflater import java.util.zip.InflaterInputStream -import kotlin.concurrent.withLock sealed class LegacyWorldStorage() : WorldStorage() { - protected abstract fun load(at: ByteKey): CompletableFuture> + protected abstract fun load(at: ByteKey): CompletableFuture protected abstract fun write(at: ByteKey, value: ByteArray) - protected abstract val executor: Executor - override fun loadCells(pos: ChunkPos): CompletableFuture, ChunkState>>> { + protected val scope by lazy { + CoroutineScope(ScheduledCoroutineExecutor(executor) + SupervisorJob()) + } + + private val uniqueIndexBlockCache = Caffeine.newBuilder() + .maximumSize(1024L) + .scheduler(Starbound) + .executor(Starbound.EXECUTOR) + .evictionListener>> { key, value, cause -> writeUniqueEntityIndex(key!!, value!!) } + .buildAsync(AsyncCacheLoader>> { key, executor -> + load(key).thenApply { + parseUniqueEntityIndex(it ?: return@thenApply HashMap()) + } + }) + + private fun uniqueEntityIndex(identifier: String): ByteKey { + val hash = xxhash32(identifier) + return ByteKey(3, hash[3], hash[2], hash[1], hash[0]) + } + + private fun parseUniqueEntityIndex(data: ByteArray): HashMap> { + val inflater = Inflater() + + try { + // why is index zlib compressed? + // I know you guys don't have much of brain, but holy shit, man, why in the world would you + // compress + // AN INDEX + val stream = DataInputStream(BufferedInputStream(InflaterInputStream(FastByteArrayInputStream(data), inflater, 0x10000), 0x40000)) + + val keys = stream.readVarInt() + + val map = HashMap>(keys) + + for (i in 0 until keys) { + val key = stream.readInternedString() + val x = stream.readUnsignedShort() + val y = stream.readUnsignedShort() + val pos = stream.readVector2f().toDoubleVector() + map[key] = ChunkPos(x, y) to pos + } + + return map + } finally { + inflater.end() + } + } + + private fun writeUniqueEntityIndex(key: ByteKey, data: HashMap>) { + executor.execute { + val deflater = Deflater() + val buffer = FastByteArrayOutputStream() + // aaaaaaaaaaaaaaaaagrh + val stream = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buffer, deflater, 0x10000), 0x40000)) + + try { + stream.writeVarInt(data.size) + + for ((id, value) in data) { + val (pos, epos) = value + + stream.writeBinaryString(id) + stream.writeShort(pos.x) + stream.writeShort(pos.y) + stream.writeStruct2f(epos.toFloatVector()) + } + + stream.close() + write(key, buffer.array.copyOf(buffer.length)) + } finally { + deflater.end() + } + } + } + + override fun findUniqueEntity(identifier: String): CompletableFuture { + return uniqueIndexBlockCache[uniqueEntityIndex(identifier)].thenApply { + it[identifier]?.let { UniqueEntitySearchResult(it.first, it.second) } + } + } + + override fun loadCells(pos: ChunkPos): CompletableFuture { val chunkX = pos.x val chunkY = pos.y val key = ByteKey(1, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) return load(key).thenApplyAsync(Function { - it.map { - val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(it)))) - val generationLevel = reader.readVarInt() - val tileSerializationVersion = reader.readVarInt() + it ?: return@Function null + val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(it)))) + val generationLevel = reader.readVarInt() + val tileSerializationVersion = reader.readVarInt() - val state = when (generationLevel) { - 0 -> ChunkState.EMPTY - 1 -> ChunkState.TERRAIN - 2 -> ChunkState.MICRO_DUNGEONS - 3 -> ChunkState.CAVE_LIQUID - 4 -> ChunkState.FULL - else -> ChunkState.FULL - } - - val result = Object2DArray.nulls(CHUNK_SIZE, CHUNK_SIZE) - - for (y in 0 until CHUNK_SIZE) { - for (x in 0 until CHUNK_SIZE) { - val read = MutableCell().readLegacy(reader, tileSerializationVersion) - result[x, y] = read.immutable() - } - } - - reader.close() - result as Object2DArray to state + val state = when (generationLevel) { + 0 -> ChunkState.EMPTY + 1 -> ChunkState.TERRAIN + 2 -> ChunkState.MICRO_DUNGEONS + 3 -> ChunkState.CAVE_LIQUID + 4 -> ChunkState.FULL + else -> ChunkState.FULL } + + val result = Object2DArray.nulls(CHUNK_SIZE, CHUNK_SIZE) + + for (y in 0 until CHUNK_SIZE) { + for (x in 0 until CHUNK_SIZE) { + val read = MutableCell().readLegacy(reader, tileSerializationVersion) + result[x, y] = read.immutable() + } + } + + reader.close() + return@Function ChunkCells(result as Object2DArray, state) }, Starbound.EXECUTOR) } - override fun loadEntities(pos: ChunkPos): CompletableFuture>> { + override fun loadEntities(pos: ChunkPos): CompletableFuture> { val chunkX = pos.x val chunkY = pos.y val key = ByteKey(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) return load(key).thenApplyAsync(Function { - it.map { readEntities(pos, it) } + readEntities(pos, it ?: return@Function listOf()) }, Starbound.EXECUTOR) } - override fun loadMetadata(): CompletableFuture> { + override fun loadMetadata(): CompletableFuture { return load(metadataKey).thenApplyAsync(Function { - it.flatMap { - val stream = DataInputStream(BufferedInputStream(InflaterInputStream(FastByteArrayInputStream(it)))) + it ?: return@Function null + val stream = DataInputStream(BufferedInputStream(InflaterInputStream(FastByteArrayInputStream(it)))) - val width = stream.readInt() - val height = stream.readInt() + val width = stream.readInt() + val height = stream.readInt() + val json = VersionedJson(stream) - val json = VersionedJson(stream) - - KOptional(Metadata(WorldGeometry(Vector2i(width, height), true, false), json)) - } + Metadata(WorldGeometry(Vector2i(width, height), true, false), json) }, Starbound.EXECUTOR) } - override fun saveEntities(pos: ChunkPos, data: Collection, now: Boolean): Boolean { - executor.execute { + override fun saveEntities(pos: ChunkPos, entities: Collection) { + scope.launch { val chunkX = pos.x val chunkY = pos.y val key = ByteKey(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) + val uniquesKey = ByteKey(4, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte()) - write(key, writeEntities(data)) + val buffEntities = FastByteArrayOutputStream() + val buffUniques = FastByteArrayOutputStream() + val streamEntities = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buffEntities))) + val streamUniques = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buffUniques))) + + val uniques = HashMap() + + try { + streamEntities.writeVarInt(entities.size) + + for (entity in entities) { + Starbound.storeJson { + val data = JsonObject() + entity.serialize(data) + VersionRegistry.make(entity.type.storeName, data).write(streamEntities) + } + + val uniqueID = entity.uniqueID.get() + + if (uniqueID != null) { + uniques[uniqueID] = entity.position + } + } + + streamEntities.close() + + val readUniques = load(uniquesKey).await() + val existingUniques: Set + + if (readUniques == null) { + existingUniques = setOf() + } else { + DataInputStream(BufferedInputStream(InflaterInputStream(FastByteArrayInputStream(readUniques)))).use { + existingUniques = it.readCollection({ readBinaryString() }, ::HashSet) + } + } + + // FIXME: eviction can happen in between these calls... not good, since that might leave + // unique index and actual uniques in inconsistent state + // Lets try our best to avoid that! + + // TODO: we don't account for unique ID collisions in legacy world storage, + // which might corrupt legacy client shipworlds if there is malicious actor / broken mod + val touchedUniqueKeys = HashMap>>() + + for (existing in existingUniques) { + if (existing !in uniques) { + val indexKey = uniqueEntityIndex(existing) + val load = touchedUniqueKeys[indexKey] ?: uniqueIndexBlockCache[indexKey].await() + touchedUniqueKeys[indexKey] = load + load.remove(existing) + } + } + + for ((newKey, newValue) in uniques) { + if (newKey !in existingUniques) { + val indexKey = uniqueEntityIndex(newKey) + val load = touchedUniqueKeys[indexKey] ?: uniqueIndexBlockCache[indexKey].await() + touchedUniqueKeys[indexKey] = load + load[newKey] = pos to newValue + } + } + + streamUniques.writeCollection(uniques.keys) { writeBinaryString(it) } + streamUniques.close() + + // non atomic updates, god damn it + write(key, buffEntities.array.copyOf(buffEntities.length)) + write(uniquesKey, buffUniques.array.copyOf(buffUniques.length)) + + for ((index, data) in touchedUniqueKeys) { + writeUniqueEntityIndex(index, data) + } + } catch (err: Throwable) { + streamEntities.close() + streamUniques.close() + throw err + } } - - return true } - override fun saveCells(pos: ChunkPos, data: Object2DArray, state: ChunkState): Boolean { + override fun saveCells(pos: ChunkPos, data: ChunkCells) { executor.execute { val buff = FastByteArrayOutputStream() val stream = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buff))) stream.writeVarInt( - when (state) { + when (data.state) { ChunkState.FRESH -> 0 ChunkState.EMPTY -> 0 ChunkState.TERRAIN -> 1 @@ -146,7 +308,7 @@ sealed class LegacyWorldStorage() : WorldStorage() { for (y in 0 until CHUNK_SIZE) { for (x in 0 until CHUNK_SIZE) { - val cell = data.getOrNull(x, y) ?: AbstractCell.NULL + val cell = data.cells.getOrNull(x, y) ?: AbstractCell.NULL cell.writeLegacy(stream) } } @@ -158,11 +320,9 @@ sealed class LegacyWorldStorage() : WorldStorage() { stream.close() write(key, buff.array.copyOf(buff.length)) } - - return true } - override fun saveMetadata(data: Metadata): Boolean { + override fun saveMetadata(data: Metadata) { val buff = FastByteArrayOutputStream() val stream = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buff))) @@ -172,15 +332,13 @@ sealed class LegacyWorldStorage() : WorldStorage() { stream.close() write(metadataKey, buff.array.copyOf(buff.length)) - - return true } class Memory(private val get: (ByteKey) -> ByteArray?, private val set: (ByteKey, ByteArray) -> Unit) : LegacyWorldStorage() { override val executor: Executor = Executor { it.run() } - override fun load(at: ByteKey): CompletableFuture> { - return CompletableFuture.completedFuture(KOptional.ofNullable(get(at))) + override fun load(at: ByteKey): CompletableFuture { + return CompletableFuture.completedFuture(get(at)) } override fun write(at: ByteKey, value: ByteArray) { @@ -193,8 +351,8 @@ sealed class LegacyWorldStorage() : WorldStorage() { class DB5(private val database: BTreeDB5) : LegacyWorldStorage() { override val executor = CarriedExecutor(Starbound.IO_EXECUTOR) - override fun load(at: ByteKey): CompletableFuture> { - return CompletableFuture.supplyAsync(Supplier { database.read(at) }, executor) + override fun load(at: ByteKey): CompletableFuture { + return CompletableFuture.supplyAsync(Supplier { database.read(at).orNull() }, executor) } override fun write(at: ByteKey, value: ByteArray) { @@ -236,16 +394,15 @@ sealed class LegacyWorldStorage() : WorldStorage() { private val loader = connection.prepareStatement("SELECT `value` FROM `data` WHERE `key` = ? LIMIT 1") private val writer = connection.prepareStatement("REPLACE INTO `data` (`key`, `value`) VALUES (?, ?)") - override fun load(at: ByteKey): CompletableFuture> { + override fun load(at: ByteKey): CompletableFuture { return CompletableFuture.supplyAsync(Supplier { loader.setBytes(1, at.toByteArray()) loader.executeQuery().use { if (it.next()) { - val blob = it.getBytes(1) - KOptional(blob) + it.getBytes(1) } else { - KOptional() + null } } }, executor) @@ -269,5 +426,10 @@ sealed class LegacyWorldStorage() : WorldStorage() { companion object { private val LOGGER = LogManager.getLogger() private val metadataKey = ByteKey(0, 0, 0, 0, 0) + + fun memory(): Memory { + val map = HashMap() + return Memory(map::get, map::set) + } } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/NativeWorldStorage.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/NativeWorldStorage.kt index b11c4454..606a4d8e 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/NativeWorldStorage.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/NativeWorldStorage.kt @@ -11,28 +11,32 @@ import java.io.Closeable import java.util.concurrent.CompletableFuture class NativeWorldStorage() : WorldStorage() { - override fun loadCells(pos: ChunkPos): CompletableFuture, ChunkState>>> { + override fun loadCells(pos: ChunkPos): CompletableFuture { TODO("Not yet implemented") } - override fun loadEntities(pos: ChunkPos): CompletableFuture>> { + override fun loadEntities(pos: ChunkPos): CompletableFuture> { TODO("Not yet implemented") } - override fun loadMetadata(): CompletableFuture> { + override fun loadMetadata(): CompletableFuture { TODO("Not yet implemented") } - override fun saveEntities(pos: ChunkPos, data: Collection, now: Boolean): Boolean { - return super.saveEntities(pos, data, now) + override fun saveEntities(pos: ChunkPos, entities: Collection) { + TODO("Not yet implemented") } - override fun saveCells(pos: ChunkPos, data: Object2DArray, state: ChunkState): Boolean { - return super.saveCells(pos, data, state) + override fun saveCells(pos: ChunkPos, data: ChunkCells) { + TODO("Not yet implemented") } - override fun saveMetadata(data: Metadata): Boolean { - return super.saveMetadata(data) + override fun saveMetadata(data: Metadata) { + TODO("Not yet implemented") + } + + override fun findUniqueEntity(identifier: String): CompletableFuture { + TODO("Not yet implemented") } override fun close() { diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerChunk.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerChunk.kt index 0f470867..aee9ae60 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerChunk.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerChunk.kt @@ -212,26 +212,28 @@ class ServerChunk(world: ServerWorld, pos: ChunkPos) : Chunk= state) { "Tried to downgrade $this state from $state to $newState" } + this.savedCellState = -1 this.state = newState permanent.forEach { if (it.targetState <= state) it.chunk.complete(this) } @@ -533,6 +536,8 @@ class ServerChunk(world: ServerWorld, pos: ChunkPos) : Chunk { if (!cells.isInitialized() || state <= ChunkState.EMPTY) return emptyList() @@ -542,8 +547,12 @@ class ServerChunk(world: ServerWorld, pos: ChunkPos) : Chunk { + val loaded = uniqueEntities[id] + + if (loaded != null) { + return CompletableFuture.completedFuture(loaded.position) + } + + return storage.findUniqueEntity(id).thenApply { it?.pos } + } + + override fun dispatchEntityMessage( + sourceConnection: Int, + entityID: String, + message: String, + arguments: JsonArray + ): CompletableFuture { + var loaded = uniqueEntities[entityID] + + if (loaded != null) { + return loaded.dispatchMessage(sourceConnection, message, arguments) + } + + // very well. + // I accept the requirement to load the chunk that contains aforementioned entity + return eventLoop.scope.async { + val (chunk) = storage.findUniqueEntity(entityID).await() ?: throw MessageCallException("No such entity $entityID") + val ticket = permanentChunkTicket(chunk).await() ?: throw MessageCallException("Internal server error") + + try { + ticket.chunk.await() + loaded = uniqueEntities[entityID] + + if (loaded == null) { + // How? + LOGGER.warn("Expected unique entity $entityID to be present inside $chunk, but after loading said chunk required entity is missing; world storage might be in corrupt state after unclean shutdown") + throw MessageCallException("No such entity $entityID") + } + + loaded!!.dispatchMessage(sourceConnection, message, arguments).await() + } finally { + ticket.cancel() + } + }.asCompletableFuture() + } + @JsonFactory data class MetadataJson( val playerStart: Vector2d, @@ -695,9 +741,10 @@ class ServerWorld private constructor( LOGGER.info("Attempting to load world at $worldID") return storage.loadMetadata().thenApply { + it ?: throw NoSuchElementException("No world metadata is present") LOGGER.info("Loading world at $worldID") AssetPathStack("/") { _ -> - val meta = it.map { Starbound.gson.fromJson(it.data.content, MetadataJson::class.java) }.orThrow { NoSuchElementException("No world metadata is present") } + val meta = Starbound.gson.fromJson(it.data.content, MetadataJson::class.java) val world = ServerWorld(server, WorldTemplate.fromJson(meta.worldTemplate), storage, worldID) world.playerSpawnPosition = meta.playerStart diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorldTracker.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorldTracker.kt index c50132a2..248957b1 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorldTracker.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorldTracker.kt @@ -12,8 +12,10 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.future.await import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager +import ru.dbotthepony.kommons.util.Either import ru.dbotthepony.kstarbound.math.vector.Vector2d import ru.dbotthepony.kstarbound.math.vector.Vector2i import ru.dbotthepony.kstarbound.Starbound @@ -27,9 +29,12 @@ import ru.dbotthepony.kstarbound.math.AABBi import ru.dbotthepony.kstarbound.network.IPacket import ru.dbotthepony.kstarbound.network.packets.EntityCreatePacket import ru.dbotthepony.kstarbound.network.packets.EntityDestroyPacket +import ru.dbotthepony.kstarbound.network.packets.EntityMessagePacket +import ru.dbotthepony.kstarbound.network.packets.EntityMessageResponsePacket import ru.dbotthepony.kstarbound.network.packets.EntityUpdateSetPacket import ru.dbotthepony.kstarbound.network.packets.clientbound.CentralStructureUpdatePacket import ru.dbotthepony.kstarbound.network.packets.clientbound.EnvironmentUpdatePacket +import ru.dbotthepony.kstarbound.network.packets.clientbound.FindUniqueEntityResponsePacket import ru.dbotthepony.kstarbound.network.packets.clientbound.LegacyTileArrayUpdatePacket import ru.dbotthepony.kstarbound.network.packets.clientbound.LegacyTileUpdatePacket import ru.dbotthepony.kstarbound.network.packets.clientbound.TileDamageUpdatePacket @@ -50,6 +55,8 @@ import java.util.* import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicBoolean +import java.util.function.Consumer +import java.util.function.Function import kotlin.collections.ArrayList import kotlin.math.roundToInt @@ -85,6 +92,7 @@ class ServerWorldTracker(val world: ServerWorld, val client: ServerConnection, p private val damageTilesQueue = Channel(64) // 64 pending tile group damage requests should be more than enough private val tileModificationBudget = ActionPacer(actions = 512, handicap = 2048) // TODO: make this configurable private val modifyTilesQueue = Channel>, Boolean>>(64) + private val findUniqueEntityQueue = Channel(1024) private suspend fun damageTilesLoop() { while (true) { @@ -127,21 +135,77 @@ class ServerWorldTracker(val world: ServerWorld, val client: ServerConnection, p modifyTilesQueue.trySend(modifications to allowEntityOverlap) } + private suspend fun findUniqueEntityLoop() { + while (true) { + val name = findUniqueEntityQueue.receive() + client.send(FindUniqueEntityResponsePacket(name, world.findUniqueEntity(name).await())) + } + } + + fun findUniqueEntity(name: String) { + findUniqueEntityQueue.trySend(name) + } + init { scope.launch { damageTilesLoop() } scope.launch { modifyTilesLoop() } + scope.launch { findUniqueEntityLoop() } + } + + // max 4096 pending messages + private val entityMessageQueue = Channel(4096) + // handle up to 512 messages per second + private val messageQueuePacer = ActionPacer(actions = 512, handicap = 512) + + init { + // up to 256 messages can be in "unprocessed" state + // this should be a non-issue, + // unless entities owned by two different clients with high ping are actively + // exchanging messages with each other + + for (i in 0 until 256) { + scope.launch { handleEntityMessagesLoop() } + } + } + + private suspend fun handleEntityMessagesLoop() { + while (true) { + val packet = entityMessageQueue.receive() + messageQueuePacer.consume() + + val future = if (packet.entity.isLeft) { + world.dispatchEntityMessage(client.connectionID, packet.entity.left(), packet.message, packet.arguments) + } else { + world.dispatchEntityMessage(client.connectionID, packet.entity.right(), packet.message, packet.arguments) + } + + try { + val result = future.await() + client.send(EntityMessageResponsePacket(Either.right(result), packet.id)) + } catch (err: Throwable) { + client.send(EntityMessageResponsePacket(Either.left(err.message ?: "Internal server error"), packet.id)) + } + } + } + + fun handleEntityMessage(packet: EntityMessagePacket) { + val result = entityMessageQueue.trySend(packet) + + if (result.isFailure) { + client.send(EntityMessageResponsePacket(Either.left("Your client is sending too many entity messages!"), packet.id)) + } } fun send(packet: IPacket) = client.send(packet) // packets which interact with world must be // executed on world's thread - fun enqueue(task: ServerWorld.() -> Unit) { + fun enqueue(task: ServerWorld.(ServerWorldTracker) -> Unit) { if (!isRemoved.get()) { tasksQueue.add(world.eventLoop.supplyAsync { if (!isRemoved.get()) { try { - task(world) + task(world, this) } catch (err: Throwable) { LOGGER.error("Exception executing queued player task", err) client.disconnect("Exception executing queued player task: $err") diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/WorldStorage.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/WorldStorage.kt index 1094e14d..f485c4c6 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/WorldStorage.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/WorldStorage.kt @@ -1,40 +1,41 @@ package ru.dbotthepony.kstarbound.server.world import com.google.gson.JsonObject -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import org.apache.logging.log4j.LogManager import ru.dbotthepony.kommons.arrays.Object2DArray -import ru.dbotthepony.kommons.collect.chainOptionalFutures import ru.dbotthepony.kommons.io.readVarInt -import ru.dbotthepony.kommons.io.writeVarInt -import ru.dbotthepony.kommons.util.KOptional -import ru.dbotthepony.kstarbound.Starbound -import ru.dbotthepony.kstarbound.VersionRegistry import ru.dbotthepony.kstarbound.defs.EntityType import ru.dbotthepony.kstarbound.json.VersionedJson -import ru.dbotthepony.kstarbound.world.CHUNK_SIZE +import ru.dbotthepony.kstarbound.math.vector.Vector2d import ru.dbotthepony.kstarbound.world.ChunkPos import ru.dbotthepony.kstarbound.world.ChunkState import ru.dbotthepony.kstarbound.world.WorldGeometry import ru.dbotthepony.kstarbound.world.api.AbstractCell -import ru.dbotthepony.kstarbound.world.api.ImmutableCell import ru.dbotthepony.kstarbound.world.entities.AbstractEntity import java.io.BufferedInputStream -import java.io.BufferedOutputStream import java.io.ByteArrayInputStream import java.io.Closeable import java.io.DataInputStream -import java.io.DataOutputStream import java.util.concurrent.CompletableFuture -import java.util.zip.DeflaterOutputStream import java.util.zip.InflaterInputStream abstract class WorldStorage : Closeable { data class Metadata(val geometry: WorldGeometry, val data: VersionedJson) + data class ChunkCells(val cells: Object2DArray, val state: ChunkState) - abstract fun loadCells(pos: ChunkPos): CompletableFuture, ChunkState>>> - abstract fun loadEntities(pos: ChunkPos): CompletableFuture>> - abstract fun loadMetadata(): CompletableFuture> + abstract fun loadCells(pos: ChunkPos): CompletableFuture + abstract fun loadEntities(pos: ChunkPos): CompletableFuture> + abstract fun loadMetadata(): CompletableFuture + + abstract fun saveEntities(pos: ChunkPos, entities: Collection) + abstract fun saveCells(pos: ChunkPos, data: ChunkCells) + abstract fun saveMetadata(data: Metadata) + + // original method of storing and indexing unique entities is crude + // good thing we will only ever work with memory backed storage when dealing + // with legacy data storage (I HOPE) + data class UniqueEntitySearchResult(val chunk: ChunkPos, val pos: Vector2d) + abstract fun findUniqueEntity(identifier: String): CompletableFuture protected fun readEntities(pos: ChunkPos, data: ByteArray): List { val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(data)))) @@ -61,107 +62,11 @@ abstract class WorldStorage : Closeable { return objects } - protected fun writeEntities(entities: Collection): ByteArray { - val buff = FastByteArrayOutputStream() - val stream = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buff))) - - stream.writeVarInt(entities.size) - - for (entity in entities) { - Starbound.storeJson { - val data = JsonObject() - entity.serialize(data) - VersionRegistry.make(entity.type.storeName, data).write(stream) - } - } - - stream.close() - return buff.array.copyOf(buff.length) - } - - open fun saveEntities(pos: ChunkPos, data: Collection, now: Boolean = false): Boolean { - return false - } - - open fun saveCells(pos: ChunkPos, data: Object2DArray, state: ChunkState): Boolean { - return false - } - - open fun saveMetadata(data: Metadata): Boolean { - return false - } - override fun close() { } - private class Fixed(private val cell: ImmutableCell) : WorldStorage() { - override fun loadCells(pos: ChunkPos): CompletableFuture, ChunkState>>> { - return CompletableFuture.completedFuture(KOptional.of(Object2DArray(CHUNK_SIZE, CHUNK_SIZE, cell) to ChunkState.FULL)) - } - - override fun loadEntities(pos: ChunkPos): CompletableFuture>> { - return CompletableFuture.completedFuture(KOptional.of(emptyList())) - } - - override fun loadMetadata(): CompletableFuture> { - return CompletableFuture.completedFuture(KOptional()) - } - } - - object Nothing : WorldStorage() { - override fun loadCells(pos: ChunkPos): CompletableFuture, ChunkState>>> { - return CompletableFuture.completedFuture(KOptional()) - } - - override fun loadEntities(pos: ChunkPos): CompletableFuture>> { - return CompletableFuture.completedFuture(KOptional()) - } - - override fun loadMetadata(): CompletableFuture> { - return CompletableFuture.completedFuture(KOptional()) - } - } - companion object { private val LOGGER = LogManager.getLogger() - val NULL: WorldStorage = Fixed(AbstractCell.NULL) - val EMPTY: WorldStorage = Fixed(AbstractCell.EMPTY) - } - - class Dispatch(vararg storage: WorldStorage) : WorldStorage() { - private val children = ArrayList() - - init { - storage.forEach { children.add(it) } - } - - override fun loadCells(pos: ChunkPos): CompletableFuture, ChunkState>>> { - return chainOptionalFutures(children) { it.loadCells(pos) } - } - - override fun loadEntities(pos: ChunkPos): CompletableFuture>> { - return chainOptionalFutures(children) { it.loadEntities(pos) } - } - - override fun loadMetadata(): CompletableFuture> { - return chainOptionalFutures(children) { it.loadMetadata() } - } - - override fun saveEntities(pos: ChunkPos, data: Collection, now: Boolean): Boolean { - return children.any { it.saveEntities(pos, data, now) } - } - - override fun saveCells(pos: ChunkPos, data: Object2DArray, state: ChunkState): Boolean { - return children.any { it.saveCells(pos, data, state) } - } - - override fun saveMetadata(data: Metadata): Boolean { - return children.any { it.saveMetadata(data) } - } - - override fun close() { - children.forEach { it.close() } - } } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/util/ActionPacer.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/util/ActionPacer.kt index d4444508..24dce1e9 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/util/ActionPacer.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/util/ActionPacer.kt @@ -11,7 +11,7 @@ class ActionPacer(actions: Int, handicap: Int = 0) { private val maxBackwardNanos = handicap * delayBetween private var currentTime = System.nanoTime() - maxBackwardNanos - suspend fun consume(actions: Int = 1) { + fun consumeAndReturnDeadline(actions: Int = 1): Long { require(actions >= 1) { "Invalid amount of actions to consume: $actions" } val time = System.nanoTime() @@ -21,6 +21,11 @@ class ActionPacer(actions: Int, handicap: Int = 0) { currentTime += delayBetween * (actions - 1) val diff = (currentTime - time) / 1_000_000L currentTime += delayBetween + return diff + } + + suspend fun consume(actions: Int = 1) { + val diff = consumeAndReturnDeadline(actions) if (diff > 0L) delay(diff) } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/util/CarriedExecutor.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/util/CarriedExecutor.kt index 3795e80d..66ae611c 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/util/CarriedExecutor.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/util/CarriedExecutor.kt @@ -7,17 +7,31 @@ import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.Executor import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.locks.LockSupport -class CarriedExecutor(private val parent: Executor) : Executor, Runnable { +class CarriedExecutor(private val parent: Executor, private val allowExecutionInPlace: Boolean = true) : Executor, Runnable { private val queue = ConcurrentLinkedDeque() private val isCarried = AtomicBoolean() - override fun execute(command: Runnable) { - queue.add(command) + @Volatile + private var carrierThread: Thread? = null - if (isCarried.compareAndSet(false, true)) { - parent.execute(this) + override fun execute(command: Runnable) { + if (allowExecutionInPlace && carrierThread === Thread.currentThread()) { + // execute blocks in place if we are inside execution loop + + try { + command.run() + } catch (err: Throwable) { + LOGGER.error("Exception running task", err) + } + } else { + queue.add(command) + + if (isCarried.compareAndSet(false, true)) { + parent.execute(this) + } } } @@ -31,6 +45,7 @@ class CarriedExecutor(private val parent: Executor) : Executor, Runnable { override fun run() { while (true) { + carrierThread = Thread.currentThread() var next = queue.poll() while (next != null) { @@ -43,6 +58,7 @@ class CarriedExecutor(private val parent: Executor) : Executor, Runnable { next = queue.poll() } + carrierThread = null isCarried.set(false) // spinwait for little diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/world/World.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/world/World.kt index e17b3a6d..6c2439bb 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/world/World.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/world/World.kt @@ -60,6 +60,7 @@ import java.util.function.Predicate import java.util.random.RandomGenerator import java.util.stream.Stream import kotlin.collections.ArrayList +import kotlin.collections.HashMap import kotlin.math.roundToInt abstract class World, ChunkType : Chunk>(val template: WorldTemplate) : ICellAccess { @@ -255,9 +256,29 @@ abstract class World, ChunkType : Chunk() + + /** + * Entities with set unique (symbolic) id in this world + */ + val uniqueEntities = HashMap() + + /** + * [entities] values but in fast traversal and copy on write list + */ val entityList = CopyOnWriteArrayList() + + /** + * Entity spatial index + */ val entityIndex = EntityIndex(geometry) + + /** + * Entities with movement controller, to be simulated in parallel + */ val dynamicEntities = ArrayList() var playerSpawnPosition = Vector2d.ZERO @@ -588,6 +609,14 @@ abstract class World, ChunkType : Chunk, message: String, arguments: JsonArray): Pair, EntityMessagePacket> { + val future = CompletableFuture() + val uuid = UUID(random.nextLong(), random.nextLong()) + pendingEntityMessages.put(uuid, future) + val packet = EntityMessagePacket(entityID, message, arguments, uuid, sourceConnection) + return future to packet + } + fun dispatchEntityMessage(sourceConnection: Int, entityID: Int, message: String, arguments: JsonArray): CompletableFuture { val connectionID = Connection.connectionForEntityID(entityID) @@ -596,17 +625,13 @@ abstract class World, ChunkType : Chunk() - val uuid = UUID(random.nextLong(), random.nextLong()) - pendingEntityMessages.put(uuid, future) - connection.send(EntityMessagePacket(Either.left(entityID), message, arguments, uuid, sourceConnection)) + val (future, packet) = createRemoteEntityMessageFuture(sourceConnection, Either.left(entityID), message, arguments) + connection.send(packet) return future } } - fun dispatchEntityMessage(sourceConnection: Int, entityID: String, message: String, arguments: JsonArray): CompletableFuture { - TODO() - } + abstract fun dispatchEntityMessage(sourceConnection: Int, entityID: String, message: String, arguments: JsonArray): CompletableFuture // this *could* have been divided into per-entity map and beheaded world's map // but we can't, because response packets contain only message UUID, and don't contain entity ID @@ -618,6 +643,8 @@ abstract class World, ChunkType : Chunk> { key, value, cause -> if (cause.wasEvicted()) value?.completeExceptionally(TimeoutException("Did not receive response from remote in time")) } .build() + abstract fun findUniqueEntity(id: String): CompletableFuture + companion object { private val LOGGER = LogManager.getLogger() diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/world/entities/AbstractEntity.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/world/entities/AbstractEntity.kt index 8774d7c1..e838a0e2 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/world/entities/AbstractEntity.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/world/entities/AbstractEntity.kt @@ -117,6 +117,25 @@ abstract class AbstractEntity : Comparable { */ val uniqueID = networkedData(null, InternedStringCodec.nullable()) + init { + uniqueID.addListener { new, old -> + if (isInWorld) { + if (new != null && new in world.uniqueEntities && world.uniqueEntities[new] != this) { + uniqueID.accept(old) // rollback the change to avoid bad things from happening + throw IllegalArgumentException("Duplicate unique entity ID: $new") + } + + if (old != null) { + if (world.uniqueEntities[old] == this) { + world.uniqueEntities.remove(old) + } + } + + if (new != null) world.uniqueEntities[new] = this + } + } + } + var description = "" /** @@ -185,7 +204,7 @@ abstract class AbstractEntity : Comparable { fun joinWorld(world: World<*, *>) { if (innerWorld != null) - throw IllegalStateException("Already spawned (in world $innerWorld)") + throw IllegalStateException("Already spawned") removalReason = null @@ -198,10 +217,14 @@ abstract class AbstractEntity : Comparable { } world.eventLoop.ensureSameThread() - check(!world.entities.containsKey(entityID)) { "Duplicate entity ID: $entityID" } - innerWorld = world + + uniqueID.get()?.let { + check(it !in world.uniqueEntities) { "Duplicate unique entity ID: $it" } + world.uniqueEntities[it] = this + } + world.entities[entityID] = this world.entityList.add(this) spatialEntry = world.entityIndex.Entry(this) @@ -227,8 +250,13 @@ abstract class AbstractEntity : Comparable { scheduledTasks.forEach { it.cancel(false) } scheduledTasks.clear() - check(world.entities.remove(entityID) == this) { "Tried to remove $this from $world, but removed something else!" } + uniqueID.get()?.let { + if (world.uniqueEntities[it] == this) + world.uniqueEntities.remove(it) + } + world.entityList.remove(this) + check(world.entities.remove(entityID) == this) { "Tried to remove $this from $world, but removed something else!" } try { onRemove(world, reason)