Now, at long last, unique entity index and messages

This commit is contained in:
DBotThePony 2024-05-03 16:13:16 +07:00
parent 23dab02cc5
commit 639aafce50
Signed by: DBot
GPG Key ID: DCC23B5715498507
21 changed files with 625 additions and 282 deletions

View File

@ -1,6 +1,9 @@
package ru.dbotthepony.kstarbound.client.world
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import com.google.common.base.Supplier
import com.google.gson.JsonArray
import com.google.gson.JsonElement
import com.google.gson.JsonObject
import it.unimi.dsi.fastutil.longs.Long2ObjectFunction
@ -9,10 +12,12 @@ import it.unimi.dsi.fastutil.longs.LongArraySet
import it.unimi.dsi.fastutil.objects.ReferenceArraySet
import ru.dbotthepony.kommons.util.IStruct2i
import ru.dbotthepony.kommons.math.RGBAColor
import ru.dbotthepony.kommons.util.Either
import ru.dbotthepony.kstarbound.math.AABB
import ru.dbotthepony.kstarbound.math.vector.Vector2f
import ru.dbotthepony.kstarbound.math.vector.Vector2i
import ru.dbotthepony.kstarbound.Registry
import ru.dbotthepony.kstarbound.Starbound
import ru.dbotthepony.kstarbound.client.StarboundClient
import ru.dbotthepony.kstarbound.client.render.ConfiguredMesh
import ru.dbotthepony.kstarbound.client.render.LayeredRenderer
@ -22,11 +27,14 @@ import ru.dbotthepony.kstarbound.defs.tile.LiquidDefinition
import ru.dbotthepony.kstarbound.defs.world.WorldTemplate
import ru.dbotthepony.kstarbound.math.roundTowardsNegativeInfinity
import ru.dbotthepony.kstarbound.math.roundTowardsPositiveInfinity
import ru.dbotthepony.kstarbound.math.vector.Vector2d
import ru.dbotthepony.kstarbound.network.Connection
import ru.dbotthepony.kstarbound.network.packets.DamageNotificationPacket
import ru.dbotthepony.kstarbound.network.packets.DamageRequestPacket
import ru.dbotthepony.kstarbound.network.packets.EntityMessagePacket
import ru.dbotthepony.kstarbound.network.packets.HitRequestPacket
import ru.dbotthepony.kstarbound.network.packets.clientbound.UpdateWorldPropertiesPacket
import ru.dbotthepony.kstarbound.network.packets.serverbound.FindUniqueEntityPacket
import ru.dbotthepony.kstarbound.util.BlockableEventLoop
import ru.dbotthepony.kstarbound.world.CHUNK_SIZE
import ru.dbotthepony.kstarbound.world.ChunkPos
@ -36,10 +44,13 @@ import ru.dbotthepony.kstarbound.world.api.ITileAccess
import ru.dbotthepony.kstarbound.world.api.OffsetCellAccess
import ru.dbotthepony.kstarbound.world.api.TileView
import ru.dbotthepony.kstarbound.world.positiveModulo
import java.time.Duration
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import kotlin.collections.ArrayList
class ClientWorld(
val client: StarboundClient,
@ -346,6 +357,49 @@ class ClientWorld(
return client.activeConnection
}
val pendingUniqueEntityRequests: Cache<String, CompletableFuture<Vector2d?>> = Caffeine.newBuilder()
.maximumSize(4096L) // why would you even need to request this many unique entities anyway?
// who knows! modders probably know!
.expireAfterWrite(Duration.ofMinutes(1))
.scheduler(Starbound)
.executor(Starbound.EXECUTOR)
.evictionListener<String, CompletableFuture<Vector2d?>> { key, value, cause -> if (cause.wasEvicted()) value!!.complete(null) }
.build()
override fun findUniqueEntity(id: String): CompletableFuture<Vector2d?> {
val loaded = uniqueEntities[id]
if (loaded != null) {
return CompletableFuture.completedFuture(loaded.position)
}
val connection = client.activeConnection ?: return CompletableFuture.completedFuture(null)
return pendingUniqueEntityRequests.get(id) {
val future = CompletableFuture<Vector2d?>()
connection.send(FindUniqueEntityPacket(id))
future
}
}
override fun dispatchEntityMessage(
sourceConnection: Int,
entityID: String,
message: String,
arguments: JsonArray
): CompletableFuture<JsonElement> {
val loaded = uniqueEntities[entityID]
if (loaded != null) {
return loaded.dispatchMessage(sourceConnection, message, arguments)
}
val connection = client.activeConnection ?: return CompletableFuture.failedFuture(MessageCallException("No active connection"))
val (future, packet) = createRemoteEntityMessageFuture(sourceConnection, Either.right(entityID), message, arguments)
connection.send(packet)
return future
}
companion object {
val ring = listOf(
Vector2i(0, 0),

View File

@ -63,7 +63,7 @@ sealed class SpawnTarget {
}
override suspend fun resolve(world: ServerWorld): Vector2d? {
return world.entities.values.firstOrNull { it.uniqueID.get() == id }?.position
return world.uniqueEntities[id]?.position
}
override fun toString(): String {

View File

@ -550,7 +550,12 @@ fun provideWorldEntitiesBindings(self: World<*, *>, callbacks: Table, lua: LuaEn
returnBuffer.setToContentsOf(entity.callScript(function, *it.copyRemaining()))
}
callbacks["findUniqueEntity"] = luaStub("findUniqueEntity")
callbacks["findUniqueEntity"] = luaFunction { id: ByteString ->
returnBuffer.setTo(LuaFuture(
future = self.findUniqueEntity(id.decode()).thenApply { from(it) },
isLocal = self.isServer
))
}
callbacks["sendEntityMessage"] = luaFunctionN("sendEntityMessage") {
val id = it.nextAny()

View File

@ -44,26 +44,13 @@ class EntityMessagePacket(val entity: Either<Int, String>, val message: String,
stream.writeShort(sourceConnection)
}
private fun handle(connection: Connection, world: World<*, *>) {
val future = if (entity.isLeft) {
world.dispatchEntityMessage(connection.connectionID, entity.left(), message, arguments)
} else {
world.dispatchEntityMessage(connection.connectionID, entity.right(), message, arguments)
}
future
.thenAccept(Consumer {
connection.send(EntityMessageResponsePacket(Either.right(it), id))
})
.exceptionally(Function {
connection.send(EntityMessageResponsePacket(Either.left(it.message ?: "Internal server error"), id))
null
})
}
override fun play(connection: ServerConnection) {
connection.enqueue {
handle(connection, this)
val tracker = connection.tracker
if (tracker == null) {
connection.send(EntityMessageResponsePacket(Either.left("Not in world"), this@EntityMessagePacket.id))
} else {
tracker.handleEntityMessage(this)
}
}
@ -72,7 +59,20 @@ class EntityMessagePacket(val entity: Either<Int, String>, val message: String,
val world = world
if (world != null) {
handle(connection, world)
val future = if (entity.isLeft) {
world.dispatchEntityMessage(connection.connectionID, entity.left(), message, arguments)
} else {
world.dispatchEntityMessage(connection.connectionID, entity.right(), message, arguments)
}
future
.thenAccept(Consumer {
connection.send(EntityMessageResponsePacket(Either.right(it), this@EntityMessagePacket.id))
})
.exceptionally(Function {
connection.send(EntityMessageResponsePacket(Either.left(it.message ?: "Internal server error"), this@EntityMessagePacket.id))
null
})
}
}
}

View File

@ -1,13 +1,11 @@
package ru.dbotthepony.kstarbound.network.packets.clientbound
import ru.dbotthepony.kommons.io.readBinaryString
import ru.dbotthepony.kstarbound.io.readVector2d
import ru.dbotthepony.kstarbound.io.readVector2f
import ru.dbotthepony.kommons.io.writeBinaryString
import ru.dbotthepony.kommons.io.writeStruct2d
import ru.dbotthepony.kommons.io.writeStruct2f
import ru.dbotthepony.kstarbound.math.vector.Vector2d
import ru.dbotthepony.kstarbound.client.ClientConnection
import ru.dbotthepony.kstarbound.io.readInternedString
import ru.dbotthepony.kstarbound.io.readVector2d
import ru.dbotthepony.kstarbound.io.writeStruct2d
import ru.dbotthepony.kstarbound.math.vector.Vector2d
import ru.dbotthepony.kstarbound.network.IClientPacket
import java.io.DataInputStream
import java.io.DataOutputStream
@ -17,36 +15,25 @@ class FindUniqueEntityResponsePacket(val name: String, val position: Vector2d?)
stream.writeBinaryString(name)
stream.writeBoolean(position != null)
if (isLegacy) {
if (position != null)
stream.writeStruct2f(position.toFloatVector())
} else {
if (position != null)
stream.writeStruct2d(position)
}
if (position != null)
stream.writeStruct2d(position, isLegacy)
}
override fun play(connection: ClientConnection) {
TODO("Not yet implemented")
connection.enqueue {
world?.pendingUniqueEntityRequests?.asMap()?.remove(name)?.complete(position)
}
}
companion object {
fun read(stream: DataInputStream, isLegacy: Boolean): FindUniqueEntityResponsePacket {
val name = stream.readBinaryString()
val name = stream.readInternedString()
val position: Vector2d?
if (isLegacy) {
if (stream.readBoolean()) {
position = stream.readVector2f().toDoubleVector()
} else {
position = null
}
if (stream.readBoolean()) {
position = stream.readVector2d(isLegacy)
} else {
if (stream.readBoolean()) {
position = stream.readVector2d()
} else {
position = null
}
position = null
}
return FindUniqueEntityResponsePacket(name, position)

View File

@ -1,7 +1,7 @@
package ru.dbotthepony.kstarbound.network.packets.serverbound
import ru.dbotthepony.kommons.io.readBinaryString
import ru.dbotthepony.kommons.io.writeBinaryString
import ru.dbotthepony.kstarbound.io.readInternedString
import ru.dbotthepony.kstarbound.network.IServerPacket
import ru.dbotthepony.kstarbound.network.packets.clientbound.FindUniqueEntityResponsePacket
import ru.dbotthepony.kstarbound.server.ServerConnection
@ -9,15 +9,18 @@ import java.io.DataInputStream
import java.io.DataOutputStream
class FindUniqueEntityPacket(val name: String) : IServerPacket {
constructor(stream: DataInputStream, isLegacy: Boolean) : this(stream.readBinaryString())
constructor(stream: DataInputStream, isLegacy: Boolean) : this(stream.readInternedString())
override fun write(stream: DataOutputStream, isLegacy: Boolean) {
stream.writeBinaryString(name)
}
override fun play(connection: ServerConnection) {
connection.enqueue {
// Do something
val tracker = connection.tracker
if (tracker != null) {
tracker.findUniqueEntity(name)
} else {
connection.send(FindUniqueEntityResponsePacket(name, null))
}
}

View File

@ -5,34 +5,59 @@ import ru.dbotthepony.kommons.util.Listenable
import ru.dbotthepony.kommons.util.ListenableDelegate
import java.io.DataInputStream
import java.io.DataOutputStream
import java.util.concurrent.CopyOnWriteArrayList
import java.util.function.Consumer
open class BasicNetworkedElement<TYPE, LEGACY>(private var value: TYPE, protected val codec: StreamCodec<TYPE>, protected val legacyCodec: StreamCodec<LEGACY>, protected val toLegacy: (TYPE) -> LEGACY, protected val fromLegacy: (LEGACY) -> TYPE) : NetworkedElement(), ListenableDelegate<TYPE> {
protected val valueListeners = Listenable.Impl<TYPE>()
protected val queue = InterpolationQueue<TYPE> { this.value = it; valueListeners.accept(value) }
protected val valueListeners = CopyOnWriteArrayList<Listener>()
protected val queue = InterpolationQueue<TYPE> {
val old = this.value
this.value = it
valueListeners.forEach { c -> c.callable.invoke(it, old) }
}
protected var isInterpolating = false
protected var currentTime = 0.0
protected inner class Listener(val callable: (TYPE, TYPE) -> Unit) : Listenable.L {
constructor(listener: Consumer<TYPE>) : this({ v, _ -> listener.accept(v) })
constructor(listener: Runnable) : this({ _, _ -> listener.run() })
init {
valueListeners.add(this)
}
override fun remove() {
valueListeners.remove(this)
}
}
override fun toString(): String {
return "BasicNetworkedElement[$value, isInterpolating=$isInterpolating, currentTime=$currentTime]"
}
override fun accept(t: TYPE) {
if (t != value) {
val old = value
value = t
queue.clear()
bumpVersion()
valueListeners.accept(t)
valueListeners.forEach { it.callable.invoke(t, old) }
}
}
override fun addListener(listener: Consumer<TYPE>): Listenable.L {
return valueListeners.addListener(listener)
return Listener(listener)
}
fun addListener(listener: (new: TYPE, old: TYPE) -> Unit): Listenable.L {
return Listener(listener)
}
@Deprecated("Internal API")
override fun listen(listener: Runnable): Listenable.L {
return valueListeners.addListener(listener)
return Listener(listener)
}
override fun get(): TYPE {
@ -46,7 +71,7 @@ open class BasicNetworkedElement<TYPE, LEGACY>(private var value: TYPE, protecte
bumpVersion()
if (value != old) {
valueListeners.accept(value)
valueListeners.forEach { it.callable.invoke(value, old) }
}
}
@ -65,8 +90,9 @@ open class BasicNetworkedElement<TYPE, LEGACY>(private var value: TYPE, protecte
if (isInterpolating) {
queue.push(read, interpolationDelay)
} else {
val old = value
value = read
valueListeners.accept(read)
valueListeners.forEach { it.callable.invoke(read, old) }
}
}

View File

@ -38,7 +38,7 @@ class EventCounterElement : BasicNetworkedElement<Long, Long>(0L, UnsignedVarLon
}
init {
valueListeners.addListener(Consumer {
addListener(Consumer {
if (it < pulled)
pulled = it
})

View File

@ -60,7 +60,7 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn
// packets which interact with world must be
// executed on world's thread
fun enqueue(task: ServerWorld.() -> Unit): Boolean {
fun enqueue(task: ServerWorld.(ServerWorldTracker) -> Unit): Boolean {
val isInWorld = tracker?.enqueue(task) != null
if (!isInWorld) {
@ -105,7 +105,7 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn
override fun setupNative() {
super.setupNative()
shipChunkSource = WorldStorage.NULL
shipChunkSource = LegacyWorldStorage.Memory({ shipChunks[it]?.orNull() }, { key, value -> shipChunks[key] = KOptional(value) })
}
fun receiveShipChunks(chunks: Map<ByteKey, KOptional<ByteArray>>) {

View File

@ -269,7 +269,7 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread
visitable.disableDeathDrops = config.disableDeathDrops
val template = WorldTemplate(visitable, config.skyParameters, random)
val world = ServerWorld.create(this, template, WorldStorage.NULL, location)
val world = ServerWorld.create(this, template, LegacyWorldStorage.memory(), location)
try {
world.setProperty("ephemeral", JsonPrimitive(!config.persistent))

View File

@ -1,22 +1,35 @@
package ru.dbotthepony.kstarbound.server.world
import com.github.benmanes.caffeine.cache.AsyncCacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.google.gson.JsonObject
import it.unimi.dsi.fastutil.io.FastByteArrayInputStream
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import ru.dbotthepony.kommons.arrays.Object2DArray
import ru.dbotthepony.kommons.io.BTreeDB6
import ru.dbotthepony.kommons.io.ByteKey
import ru.dbotthepony.kommons.io.readBinaryString
import ru.dbotthepony.kommons.io.readCollection
import ru.dbotthepony.kommons.io.readVarInt
import ru.dbotthepony.kommons.io.writeBinaryString
import ru.dbotthepony.kommons.io.writeCollection
import ru.dbotthepony.kommons.io.writeStruct2f
import ru.dbotthepony.kommons.io.writeVarInt
import ru.dbotthepony.kommons.util.KOptional
import ru.dbotthepony.kommons.util.xxhash32
import ru.dbotthepony.kstarbound.Starbound
import ru.dbotthepony.kstarbound.defs.tile.BuiltinMetaMaterials
import ru.dbotthepony.kstarbound.defs.tile.isNullTile
import ru.dbotthepony.kstarbound.VersionRegistry
import ru.dbotthepony.kstarbound.io.BTreeDB5
import ru.dbotthepony.kstarbound.io.readInternedString
import ru.dbotthepony.kstarbound.io.readVector2f
import ru.dbotthepony.kstarbound.json.VersionedJson
import ru.dbotthepony.kstarbound.math.vector.Vector2d
import ru.dbotthepony.kstarbound.math.vector.Vector2i
import ru.dbotthepony.kstarbound.util.CarriedExecutor
import ru.dbotthepony.kstarbound.util.supplyAsync
import ru.dbotthepony.kstarbound.util.ScheduledCoroutineExecutor
import ru.dbotthepony.kstarbound.world.CHUNK_SIZE
import ru.dbotthepony.kstarbound.world.ChunkPos
import ru.dbotthepony.kstarbound.world.ChunkState
@ -25,7 +38,6 @@ import ru.dbotthepony.kstarbound.world.api.AbstractCell
import ru.dbotthepony.kstarbound.world.api.ImmutableCell
import ru.dbotthepony.kstarbound.world.api.MutableCell
import ru.dbotthepony.kstarbound.world.entities.AbstractEntity
import ru.dbotthepony.kstarbound.world.entities.tile.WorldObject
import java.io.BufferedInputStream
import java.io.BufferedOutputStream
import java.io.ByteArrayInputStream
@ -33,106 +45,256 @@ import java.io.DataInputStream
import java.io.DataOutputStream
import java.io.File
import java.lang.ref.Cleaner
import java.lang.ref.WeakReference
import java.sql.Connection
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.Executor
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import java.util.function.Function
import java.util.function.Supplier
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
import java.util.zip.InflaterInputStream
import kotlin.concurrent.withLock
sealed class LegacyWorldStorage() : WorldStorage() {
protected abstract fun load(at: ByteKey): CompletableFuture<KOptional<ByteArray>>
protected abstract fun load(at: ByteKey): CompletableFuture<ByteArray?>
protected abstract fun write(at: ByteKey, value: ByteArray)
protected abstract val executor: Executor
override fun loadCells(pos: ChunkPos): CompletableFuture<KOptional<Pair<Object2DArray<out AbstractCell>, ChunkState>>> {
protected val scope by lazy {
CoroutineScope(ScheduledCoroutineExecutor(executor) + SupervisorJob())
}
private val uniqueIndexBlockCache = Caffeine.newBuilder()
.maximumSize(1024L)
.scheduler(Starbound)
.executor(Starbound.EXECUTOR)
.evictionListener<ByteKey, HashMap<String, Pair<ChunkPos, Vector2d>>> { key, value, cause -> writeUniqueEntityIndex(key!!, value!!) }
.buildAsync(AsyncCacheLoader<ByteKey, HashMap<String, Pair<ChunkPos, Vector2d>>> { key, executor ->
load(key).thenApply {
parseUniqueEntityIndex(it ?: return@thenApply HashMap())
}
})
private fun uniqueEntityIndex(identifier: String): ByteKey {
val hash = xxhash32(identifier)
return ByteKey(3, hash[3], hash[2], hash[1], hash[0])
}
private fun parseUniqueEntityIndex(data: ByteArray): HashMap<String, Pair<ChunkPos, Vector2d>> {
val inflater = Inflater()
try {
// why is index zlib compressed?
// I know you guys don't have much of brain, but holy shit, man, why in the world would you
// compress
// AN INDEX
val stream = DataInputStream(BufferedInputStream(InflaterInputStream(FastByteArrayInputStream(data), inflater, 0x10000), 0x40000))
val keys = stream.readVarInt()
val map = HashMap<String, Pair<ChunkPos, Vector2d>>(keys)
for (i in 0 until keys) {
val key = stream.readInternedString()
val x = stream.readUnsignedShort()
val y = stream.readUnsignedShort()
val pos = stream.readVector2f().toDoubleVector()
map[key] = ChunkPos(x, y) to pos
}
return map
} finally {
inflater.end()
}
}
private fun writeUniqueEntityIndex(key: ByteKey, data: HashMap<String, Pair<ChunkPos, Vector2d>>) {
executor.execute {
val deflater = Deflater()
val buffer = FastByteArrayOutputStream()
// aaaaaaaaaaaaaaaaagrh
val stream = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buffer, deflater, 0x10000), 0x40000))
try {
stream.writeVarInt(data.size)
for ((id, value) in data) {
val (pos, epos) = value
stream.writeBinaryString(id)
stream.writeShort(pos.x)
stream.writeShort(pos.y)
stream.writeStruct2f(epos.toFloatVector())
}
stream.close()
write(key, buffer.array.copyOf(buffer.length))
} finally {
deflater.end()
}
}
}
override fun findUniqueEntity(identifier: String): CompletableFuture<UniqueEntitySearchResult?> {
return uniqueIndexBlockCache[uniqueEntityIndex(identifier)].thenApply {
it[identifier]?.let { UniqueEntitySearchResult(it.first, it.second) }
}
}
override fun loadCells(pos: ChunkPos): CompletableFuture<ChunkCells?> {
val chunkX = pos.x
val chunkY = pos.y
val key = ByteKey(1, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
return load(key).thenApplyAsync(Function {
it.map {
val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(it))))
val generationLevel = reader.readVarInt()
val tileSerializationVersion = reader.readVarInt()
it ?: return@Function null
val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(it))))
val generationLevel = reader.readVarInt()
val tileSerializationVersion = reader.readVarInt()
val state = when (generationLevel) {
0 -> ChunkState.EMPTY
1 -> ChunkState.TERRAIN
2 -> ChunkState.MICRO_DUNGEONS
3 -> ChunkState.CAVE_LIQUID
4 -> ChunkState.FULL
else -> ChunkState.FULL
}
val result = Object2DArray.nulls<ImmutableCell>(CHUNK_SIZE, CHUNK_SIZE)
for (y in 0 until CHUNK_SIZE) {
for (x in 0 until CHUNK_SIZE) {
val read = MutableCell().readLegacy(reader, tileSerializationVersion)
result[x, y] = read.immutable()
}
}
reader.close()
result as Object2DArray<out AbstractCell> to state
val state = when (generationLevel) {
0 -> ChunkState.EMPTY
1 -> ChunkState.TERRAIN
2 -> ChunkState.MICRO_DUNGEONS
3 -> ChunkState.CAVE_LIQUID
4 -> ChunkState.FULL
else -> ChunkState.FULL
}
val result = Object2DArray.nulls<ImmutableCell>(CHUNK_SIZE, CHUNK_SIZE)
for (y in 0 until CHUNK_SIZE) {
for (x in 0 until CHUNK_SIZE) {
val read = MutableCell().readLegacy(reader, tileSerializationVersion)
result[x, y] = read.immutable()
}
}
reader.close()
return@Function ChunkCells(result as Object2DArray<out AbstractCell>, state)
}, Starbound.EXECUTOR)
}
override fun loadEntities(pos: ChunkPos): CompletableFuture<KOptional<Collection<AbstractEntity>>> {
override fun loadEntities(pos: ChunkPos): CompletableFuture<Collection<AbstractEntity>> {
val chunkX = pos.x
val chunkY = pos.y
val key = ByteKey(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
return load(key).thenApplyAsync(Function {
it.map { readEntities(pos, it) }
readEntities(pos, it ?: return@Function listOf())
}, Starbound.EXECUTOR)
}
override fun loadMetadata(): CompletableFuture<KOptional<Metadata>> {
override fun loadMetadata(): CompletableFuture<Metadata?> {
return load(metadataKey).thenApplyAsync(Function {
it.flatMap {
val stream = DataInputStream(BufferedInputStream(InflaterInputStream(FastByteArrayInputStream(it))))
it ?: return@Function null
val stream = DataInputStream(BufferedInputStream(InflaterInputStream(FastByteArrayInputStream(it))))
val width = stream.readInt()
val height = stream.readInt()
val width = stream.readInt()
val height = stream.readInt()
val json = VersionedJson(stream)
val json = VersionedJson(stream)
KOptional(Metadata(WorldGeometry(Vector2i(width, height), true, false), json))
}
Metadata(WorldGeometry(Vector2i(width, height), true, false), json)
}, Starbound.EXECUTOR)
}
override fun saveEntities(pos: ChunkPos, data: Collection<AbstractEntity>, now: Boolean): Boolean {
executor.execute {
override fun saveEntities(pos: ChunkPos, entities: Collection<AbstractEntity>) {
scope.launch {
val chunkX = pos.x
val chunkY = pos.y
val key = ByteKey(2, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
val uniquesKey = ByteKey(4, (chunkX shr 8).toByte(), chunkX.toByte(), (chunkY shr 8).toByte(), chunkY.toByte())
write(key, writeEntities(data))
val buffEntities = FastByteArrayOutputStream()
val buffUniques = FastByteArrayOutputStream()
val streamEntities = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buffEntities)))
val streamUniques = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buffUniques)))
val uniques = HashMap<String, Vector2d>()
try {
streamEntities.writeVarInt(entities.size)
for (entity in entities) {
Starbound.storeJson {
val data = JsonObject()
entity.serialize(data)
VersionRegistry.make(entity.type.storeName, data).write(streamEntities)
}
val uniqueID = entity.uniqueID.get()
if (uniqueID != null) {
uniques[uniqueID] = entity.position
}
}
streamEntities.close()
val readUniques = load(uniquesKey).await()
val existingUniques: Set<String>
if (readUniques == null) {
existingUniques = setOf()
} else {
DataInputStream(BufferedInputStream(InflaterInputStream(FastByteArrayInputStream(readUniques)))).use {
existingUniques = it.readCollection({ readBinaryString() }, ::HashSet)
}
}
// FIXME: eviction can happen in between these calls... not good, since that might leave
// unique index and actual uniques in inconsistent state
// Lets try our best to avoid that!
// TODO: we don't account for unique ID collisions in legacy world storage,
// which might corrupt legacy client shipworlds if there is malicious actor / broken mod
val touchedUniqueKeys = HashMap<ByteKey, HashMap<String, Pair<ChunkPos, Vector2d>>>()
for (existing in existingUniques) {
if (existing !in uniques) {
val indexKey = uniqueEntityIndex(existing)
val load = touchedUniqueKeys[indexKey] ?: uniqueIndexBlockCache[indexKey].await()
touchedUniqueKeys[indexKey] = load
load.remove(existing)
}
}
for ((newKey, newValue) in uniques) {
if (newKey !in existingUniques) {
val indexKey = uniqueEntityIndex(newKey)
val load = touchedUniqueKeys[indexKey] ?: uniqueIndexBlockCache[indexKey].await()
touchedUniqueKeys[indexKey] = load
load[newKey] = pos to newValue
}
}
streamUniques.writeCollection(uniques.keys) { writeBinaryString(it) }
streamUniques.close()
// non atomic updates, god damn it
write(key, buffEntities.array.copyOf(buffEntities.length))
write(uniquesKey, buffUniques.array.copyOf(buffUniques.length))
for ((index, data) in touchedUniqueKeys) {
writeUniqueEntityIndex(index, data)
}
} catch (err: Throwable) {
streamEntities.close()
streamUniques.close()
throw err
}
}
return true
}
override fun saveCells(pos: ChunkPos, data: Object2DArray<out AbstractCell>, state: ChunkState): Boolean {
override fun saveCells(pos: ChunkPos, data: ChunkCells) {
executor.execute {
val buff = FastByteArrayOutputStream()
val stream = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buff)))
stream.writeVarInt(
when (state) {
when (data.state) {
ChunkState.FRESH -> 0
ChunkState.EMPTY -> 0
ChunkState.TERRAIN -> 1
@ -146,7 +308,7 @@ sealed class LegacyWorldStorage() : WorldStorage() {
for (y in 0 until CHUNK_SIZE) {
for (x in 0 until CHUNK_SIZE) {
val cell = data.getOrNull(x, y) ?: AbstractCell.NULL
val cell = data.cells.getOrNull(x, y) ?: AbstractCell.NULL
cell.writeLegacy(stream)
}
}
@ -158,11 +320,9 @@ sealed class LegacyWorldStorage() : WorldStorage() {
stream.close()
write(key, buff.array.copyOf(buff.length))
}
return true
}
override fun saveMetadata(data: Metadata): Boolean {
override fun saveMetadata(data: Metadata) {
val buff = FastByteArrayOutputStream()
val stream = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buff)))
@ -172,15 +332,13 @@ sealed class LegacyWorldStorage() : WorldStorage() {
stream.close()
write(metadataKey, buff.array.copyOf(buff.length))
return true
}
class Memory(private val get: (ByteKey) -> ByteArray?, private val set: (ByteKey, ByteArray) -> Unit) : LegacyWorldStorage() {
override val executor: Executor = Executor { it.run() }
override fun load(at: ByteKey): CompletableFuture<KOptional<ByteArray>> {
return CompletableFuture.completedFuture(KOptional.ofNullable(get(at)))
override fun load(at: ByteKey): CompletableFuture<ByteArray?> {
return CompletableFuture.completedFuture(get(at))
}
override fun write(at: ByteKey, value: ByteArray) {
@ -193,8 +351,8 @@ sealed class LegacyWorldStorage() : WorldStorage() {
class DB5(private val database: BTreeDB5) : LegacyWorldStorage() {
override val executor = CarriedExecutor(Starbound.IO_EXECUTOR)
override fun load(at: ByteKey): CompletableFuture<KOptional<ByteArray>> {
return CompletableFuture.supplyAsync(Supplier { database.read(at) }, executor)
override fun load(at: ByteKey): CompletableFuture<ByteArray?> {
return CompletableFuture.supplyAsync(Supplier { database.read(at).orNull() }, executor)
}
override fun write(at: ByteKey, value: ByteArray) {
@ -236,16 +394,15 @@ sealed class LegacyWorldStorage() : WorldStorage() {
private val loader = connection.prepareStatement("SELECT `value` FROM `data` WHERE `key` = ? LIMIT 1")
private val writer = connection.prepareStatement("REPLACE INTO `data` (`key`, `value`) VALUES (?, ?)")
override fun load(at: ByteKey): CompletableFuture<KOptional<ByteArray>> {
override fun load(at: ByteKey): CompletableFuture<ByteArray?> {
return CompletableFuture.supplyAsync(Supplier {
loader.setBytes(1, at.toByteArray())
loader.executeQuery().use {
if (it.next()) {
val blob = it.getBytes(1)
KOptional(blob)
it.getBytes(1)
} else {
KOptional()
null
}
}
}, executor)
@ -269,5 +426,10 @@ sealed class LegacyWorldStorage() : WorldStorage() {
companion object {
private val LOGGER = LogManager.getLogger()
private val metadataKey = ByteKey(0, 0, 0, 0, 0)
fun memory(): Memory {
val map = HashMap<ByteKey, ByteArray>()
return Memory(map::get, map::set)
}
}
}

View File

@ -11,28 +11,32 @@ import java.io.Closeable
import java.util.concurrent.CompletableFuture
class NativeWorldStorage() : WorldStorage() {
override fun loadCells(pos: ChunkPos): CompletableFuture<KOptional<Pair<Object2DArray<out AbstractCell>, ChunkState>>> {
override fun loadCells(pos: ChunkPos): CompletableFuture<ChunkCells?> {
TODO("Not yet implemented")
}
override fun loadEntities(pos: ChunkPos): CompletableFuture<KOptional<Collection<AbstractEntity>>> {
override fun loadEntities(pos: ChunkPos): CompletableFuture<Collection<AbstractEntity>> {
TODO("Not yet implemented")
}
override fun loadMetadata(): CompletableFuture<KOptional<Metadata>> {
override fun loadMetadata(): CompletableFuture<Metadata?> {
TODO("Not yet implemented")
}
override fun saveEntities(pos: ChunkPos, data: Collection<AbstractEntity>, now: Boolean): Boolean {
return super.saveEntities(pos, data, now)
override fun saveEntities(pos: ChunkPos, entities: Collection<AbstractEntity>) {
TODO("Not yet implemented")
}
override fun saveCells(pos: ChunkPos, data: Object2DArray<out AbstractCell>, state: ChunkState): Boolean {
return super.saveCells(pos, data, state)
override fun saveCells(pos: ChunkPos, data: ChunkCells) {
TODO("Not yet implemented")
}
override fun saveMetadata(data: Metadata): Boolean {
return super.saveMetadata(data)
override fun saveMetadata(data: Metadata) {
TODO("Not yet implemented")
}
override fun findUniqueEntity(identifier: String): CompletableFuture<UniqueEntitySearchResult?> {
TODO("Not yet implemented")
}
override fun close() {

View File

@ -212,26 +212,28 @@ class ServerChunk(world: ServerWorld, pos: ChunkPos) : Chunk<ServerWorld, Server
isBusy = false
}
private var isLoadingFromDisk = false
private suspend fun loadChunk() {
try {
val cells = world.storage.loadCells(pos).await()
// very good.
if (cells.isPresent) {
val (cellData, state) = cells.value
if (cells != null) {
isLoadingFromDisk = true
val (cellData, state) = cells
loadCells(cellData)
// bumping state while loading chunk might have
// undesired consequences, such as if chunk requester
// is pessimistic and want "fully loaded chunk or chunk generated to at least X stage"
// bumpState(State.CAVE_LIQUID)
world.storage.loadEntities(pos).await().ifPresent {
for (obj in it) {
obj.joinWorld(world)
}
for (obj in world.storage.loadEntities(pos).await()) {
obj.joinWorld(world)
}
bumpState(state)
isLoadingFromDisk = false
if (state < ChunkState.FULL) {
// we have loaded partially generated chunk from disk
@ -387,6 +389,7 @@ class ServerChunk(world: ServerWorld, pos: ChunkPos) : Chunk<ServerWorld, Server
private fun bumpState(newState: ChunkState) {
if (newState == state) return
require(newState >= state) { "Tried to downgrade $this state from $state to $newState" }
this.savedCellState = -1
this.state = newState
permanent.forEach { if (it.targetState <= state) it.chunk.complete(this) }
@ -533,6 +536,8 @@ class ServerChunk(world: ServerWorld, pos: ChunkPos) : Chunk<ServerWorld, Server
}
}
private var savedCellState = -1
private fun writeToStorage(): Collection<AbstractEntity> {
if (!cells.isInitialized() || state <= ChunkState.EMPTY)
return emptyList()
@ -542,8 +547,12 @@ class ServerChunk(world: ServerWorld, pos: ChunkPos) : Chunk<ServerWorld, Server
aabbd,
filter = Predicate { !it.isRemote && aabbd.isInside(it.position) })
world.storage.saveCells(pos, copyCells(), state)
world.storage.saveEntities(pos, unloadable.filter { it.isPersistent })
if (!isLoadingFromDisk) {
if (savedCellState != cellChangeset)
world.storage.saveCells(pos, WorldStorage.ChunkCells(copyCells(), state))
world.storage.saveEntities(pos, unloadable.filter { it.isPersistent })
}
return unloadable
}

View File

@ -124,7 +124,8 @@ class ServerUniverse(folder: File? = null) : Universe(), Closeable {
database.autoCommit = false
}
private val carrier = CarriedExecutor(Starbound.IO_EXECUTOR)
// don't allow execution in place because of computeIfAbsent of ConcurrentHashMap
private val carrier = CarriedExecutor(Starbound.IO_EXECUTOR, allowExecutionInPlace = false)
private val scope = CoroutineScope(ScheduledCoroutineExecutor(carrier) + SupervisorJob())
private val selectChunk = database.prepareStatement("SELECT `systems`, `constellations` FROM `chunk` WHERE `x` = ? AND `y` = ?")

View File

@ -1,5 +1,6 @@
package ru.dbotthepony.kstarbound.server.world
import com.google.gson.JsonArray
import com.google.gson.JsonElement
import com.google.gson.JsonObject
import it.unimi.dsi.fastutil.ints.IntArraySet
@ -649,6 +650,51 @@ class ServerWorld private constructor(
return server.channels.connectionByID(connectionID)
}
override fun findUniqueEntity(id: String): CompletableFuture<Vector2d?> {
val loaded = uniqueEntities[id]
if (loaded != null) {
return CompletableFuture.completedFuture(loaded.position)
}
return storage.findUniqueEntity(id).thenApply { it?.pos }
}
override fun dispatchEntityMessage(
sourceConnection: Int,
entityID: String,
message: String,
arguments: JsonArray
): CompletableFuture<JsonElement> {
var loaded = uniqueEntities[entityID]
if (loaded != null) {
return loaded.dispatchMessage(sourceConnection, message, arguments)
}
// very well.
// I accept the requirement to load the chunk that contains aforementioned entity
return eventLoop.scope.async {
val (chunk) = storage.findUniqueEntity(entityID).await() ?: throw MessageCallException("No such entity $entityID")
val ticket = permanentChunkTicket(chunk).await() ?: throw MessageCallException("Internal server error")
try {
ticket.chunk.await()
loaded = uniqueEntities[entityID]
if (loaded == null) {
// How?
LOGGER.warn("Expected unique entity $entityID to be present inside $chunk, but after loading said chunk required entity is missing; world storage might be in corrupt state after unclean shutdown")
throw MessageCallException("No such entity $entityID")
}
loaded!!.dispatchMessage(sourceConnection, message, arguments).await()
} finally {
ticket.cancel()
}
}.asCompletableFuture()
}
@JsonFactory
data class MetadataJson(
val playerStart: Vector2d,
@ -695,9 +741,10 @@ class ServerWorld private constructor(
LOGGER.info("Attempting to load world at $worldID")
return storage.loadMetadata().thenApply {
it ?: throw NoSuchElementException("No world metadata is present")
LOGGER.info("Loading world at $worldID")
AssetPathStack("/") { _ ->
val meta = it.map { Starbound.gson.fromJson(it.data.content, MetadataJson::class.java) }.orThrow { NoSuchElementException("No world metadata is present") }
val meta = Starbound.gson.fromJson(it.data.content, MetadataJson::class.java)
val world = ServerWorld(server, WorldTemplate.fromJson(meta.worldTemplate), storage, worldID)
world.playerSpawnPosition = meta.playerStart

View File

@ -12,8 +12,10 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import ru.dbotthepony.kommons.util.Either
import ru.dbotthepony.kstarbound.math.vector.Vector2d
import ru.dbotthepony.kstarbound.math.vector.Vector2i
import ru.dbotthepony.kstarbound.Starbound
@ -27,9 +29,12 @@ import ru.dbotthepony.kstarbound.math.AABBi
import ru.dbotthepony.kstarbound.network.IPacket
import ru.dbotthepony.kstarbound.network.packets.EntityCreatePacket
import ru.dbotthepony.kstarbound.network.packets.EntityDestroyPacket
import ru.dbotthepony.kstarbound.network.packets.EntityMessagePacket
import ru.dbotthepony.kstarbound.network.packets.EntityMessageResponsePacket
import ru.dbotthepony.kstarbound.network.packets.EntityUpdateSetPacket
import ru.dbotthepony.kstarbound.network.packets.clientbound.CentralStructureUpdatePacket
import ru.dbotthepony.kstarbound.network.packets.clientbound.EnvironmentUpdatePacket
import ru.dbotthepony.kstarbound.network.packets.clientbound.FindUniqueEntityResponsePacket
import ru.dbotthepony.kstarbound.network.packets.clientbound.LegacyTileArrayUpdatePacket
import ru.dbotthepony.kstarbound.network.packets.clientbound.LegacyTileUpdatePacket
import ru.dbotthepony.kstarbound.network.packets.clientbound.TileDamageUpdatePacket
@ -50,6 +55,8 @@ import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Consumer
import java.util.function.Function
import kotlin.collections.ArrayList
import kotlin.math.roundToInt
@ -85,6 +92,7 @@ class ServerWorldTracker(val world: ServerWorld, val client: ServerConnection, p
private val damageTilesQueue = Channel<DamageTileEntry>(64) // 64 pending tile group damage requests should be more than enough
private val tileModificationBudget = ActionPacer(actions = 512, handicap = 2048) // TODO: make this configurable
private val modifyTilesQueue = Channel<Pair<Collection<Pair<Vector2i, TileModification>>, Boolean>>(64)
private val findUniqueEntityQueue = Channel<String>(1024)
private suspend fun damageTilesLoop() {
while (true) {
@ -127,21 +135,77 @@ class ServerWorldTracker(val world: ServerWorld, val client: ServerConnection, p
modifyTilesQueue.trySend(modifications to allowEntityOverlap)
}
private suspend fun findUniqueEntityLoop() {
while (true) {
val name = findUniqueEntityQueue.receive()
client.send(FindUniqueEntityResponsePacket(name, world.findUniqueEntity(name).await()))
}
}
fun findUniqueEntity(name: String) {
findUniqueEntityQueue.trySend(name)
}
init {
scope.launch { damageTilesLoop() }
scope.launch { modifyTilesLoop() }
scope.launch { findUniqueEntityLoop() }
}
// max 4096 pending messages
private val entityMessageQueue = Channel<EntityMessagePacket>(4096)
// handle up to 512 messages per second
private val messageQueuePacer = ActionPacer(actions = 512, handicap = 512)
init {
// up to 256 messages can be in "unprocessed" state
// this should be a non-issue,
// unless entities owned by two different clients with high ping are actively
// exchanging messages with each other
for (i in 0 until 256) {
scope.launch { handleEntityMessagesLoop() }
}
}
private suspend fun handleEntityMessagesLoop() {
while (true) {
val packet = entityMessageQueue.receive()
messageQueuePacer.consume()
val future = if (packet.entity.isLeft) {
world.dispatchEntityMessage(client.connectionID, packet.entity.left(), packet.message, packet.arguments)
} else {
world.dispatchEntityMessage(client.connectionID, packet.entity.right(), packet.message, packet.arguments)
}
try {
val result = future.await()
client.send(EntityMessageResponsePacket(Either.right(result), packet.id))
} catch (err: Throwable) {
client.send(EntityMessageResponsePacket(Either.left(err.message ?: "Internal server error"), packet.id))
}
}
}
fun handleEntityMessage(packet: EntityMessagePacket) {
val result = entityMessageQueue.trySend(packet)
if (result.isFailure) {
client.send(EntityMessageResponsePacket(Either.left("Your client is sending too many entity messages!"), packet.id))
}
}
fun send(packet: IPacket) = client.send(packet)
// packets which interact with world must be
// executed on world's thread
fun enqueue(task: ServerWorld.() -> Unit) {
fun enqueue(task: ServerWorld.(ServerWorldTracker) -> Unit) {
if (!isRemoved.get()) {
tasksQueue.add(world.eventLoop.supplyAsync {
if (!isRemoved.get()) {
try {
task(world)
task(world, this)
} catch (err: Throwable) {
LOGGER.error("Exception executing queued player task", err)
client.disconnect("Exception executing queued player task: $err")

View File

@ -1,40 +1,41 @@
package ru.dbotthepony.kstarbound.server.world
import com.google.gson.JsonObject
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import org.apache.logging.log4j.LogManager
import ru.dbotthepony.kommons.arrays.Object2DArray
import ru.dbotthepony.kommons.collect.chainOptionalFutures
import ru.dbotthepony.kommons.io.readVarInt
import ru.dbotthepony.kommons.io.writeVarInt
import ru.dbotthepony.kommons.util.KOptional
import ru.dbotthepony.kstarbound.Starbound
import ru.dbotthepony.kstarbound.VersionRegistry
import ru.dbotthepony.kstarbound.defs.EntityType
import ru.dbotthepony.kstarbound.json.VersionedJson
import ru.dbotthepony.kstarbound.world.CHUNK_SIZE
import ru.dbotthepony.kstarbound.math.vector.Vector2d
import ru.dbotthepony.kstarbound.world.ChunkPos
import ru.dbotthepony.kstarbound.world.ChunkState
import ru.dbotthepony.kstarbound.world.WorldGeometry
import ru.dbotthepony.kstarbound.world.api.AbstractCell
import ru.dbotthepony.kstarbound.world.api.ImmutableCell
import ru.dbotthepony.kstarbound.world.entities.AbstractEntity
import java.io.BufferedInputStream
import java.io.BufferedOutputStream
import java.io.ByteArrayInputStream
import java.io.Closeable
import java.io.DataInputStream
import java.io.DataOutputStream
import java.util.concurrent.CompletableFuture
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterInputStream
abstract class WorldStorage : Closeable {
data class Metadata(val geometry: WorldGeometry, val data: VersionedJson)
data class ChunkCells(val cells: Object2DArray<out AbstractCell>, val state: ChunkState)
abstract fun loadCells(pos: ChunkPos): CompletableFuture<KOptional<Pair<Object2DArray<out AbstractCell>, ChunkState>>>
abstract fun loadEntities(pos: ChunkPos): CompletableFuture<KOptional<Collection<AbstractEntity>>>
abstract fun loadMetadata(): CompletableFuture<KOptional<Metadata>>
abstract fun loadCells(pos: ChunkPos): CompletableFuture<ChunkCells?>
abstract fun loadEntities(pos: ChunkPos): CompletableFuture<Collection<AbstractEntity>>
abstract fun loadMetadata(): CompletableFuture<Metadata?>
abstract fun saveEntities(pos: ChunkPos, entities: Collection<AbstractEntity>)
abstract fun saveCells(pos: ChunkPos, data: ChunkCells)
abstract fun saveMetadata(data: Metadata)
// original method of storing and indexing unique entities is crude
// good thing we will only ever work with memory backed storage when dealing
// with legacy data storage (I HOPE)
data class UniqueEntitySearchResult(val chunk: ChunkPos, val pos: Vector2d)
abstract fun findUniqueEntity(identifier: String): CompletableFuture<UniqueEntitySearchResult?>
protected fun readEntities(pos: ChunkPos, data: ByteArray): List<AbstractEntity> {
val reader = DataInputStream(BufferedInputStream(InflaterInputStream(ByteArrayInputStream(data))))
@ -61,107 +62,11 @@ abstract class WorldStorage : Closeable {
return objects
}
protected fun writeEntities(entities: Collection<AbstractEntity>): ByteArray {
val buff = FastByteArrayOutputStream()
val stream = DataOutputStream(BufferedOutputStream(DeflaterOutputStream(buff)))
stream.writeVarInt(entities.size)
for (entity in entities) {
Starbound.storeJson {
val data = JsonObject()
entity.serialize(data)
VersionRegistry.make(entity.type.storeName, data).write(stream)
}
}
stream.close()
return buff.array.copyOf(buff.length)
}
open fun saveEntities(pos: ChunkPos, data: Collection<AbstractEntity>, now: Boolean = false): Boolean {
return false
}
open fun saveCells(pos: ChunkPos, data: Object2DArray<out AbstractCell>, state: ChunkState): Boolean {
return false
}
open fun saveMetadata(data: Metadata): Boolean {
return false
}
override fun close() {
}
private class Fixed(private val cell: ImmutableCell) : WorldStorage() {
override fun loadCells(pos: ChunkPos): CompletableFuture<KOptional<Pair<Object2DArray<out AbstractCell>, ChunkState>>> {
return CompletableFuture.completedFuture(KOptional.of(Object2DArray(CHUNK_SIZE, CHUNK_SIZE, cell) to ChunkState.FULL))
}
override fun loadEntities(pos: ChunkPos): CompletableFuture<KOptional<Collection<AbstractEntity>>> {
return CompletableFuture.completedFuture(KOptional.of(emptyList()))
}
override fun loadMetadata(): CompletableFuture<KOptional<Metadata>> {
return CompletableFuture.completedFuture(KOptional())
}
}
object Nothing : WorldStorage() {
override fun loadCells(pos: ChunkPos): CompletableFuture<KOptional<Pair<Object2DArray<out AbstractCell>, ChunkState>>> {
return CompletableFuture.completedFuture(KOptional())
}
override fun loadEntities(pos: ChunkPos): CompletableFuture<KOptional<Collection<AbstractEntity>>> {
return CompletableFuture.completedFuture(KOptional())
}
override fun loadMetadata(): CompletableFuture<KOptional<Metadata>> {
return CompletableFuture.completedFuture(KOptional())
}
}
companion object {
private val LOGGER = LogManager.getLogger()
val NULL: WorldStorage = Fixed(AbstractCell.NULL)
val EMPTY: WorldStorage = Fixed(AbstractCell.EMPTY)
}
class Dispatch(vararg storage: WorldStorage) : WorldStorage() {
private val children = ArrayList<WorldStorage>()
init {
storage.forEach { children.add(it) }
}
override fun loadCells(pos: ChunkPos): CompletableFuture<KOptional<Pair<Object2DArray<out AbstractCell>, ChunkState>>> {
return chainOptionalFutures(children) { it.loadCells(pos) }
}
override fun loadEntities(pos: ChunkPos): CompletableFuture<KOptional<Collection<AbstractEntity>>> {
return chainOptionalFutures(children) { it.loadEntities(pos) }
}
override fun loadMetadata(): CompletableFuture<KOptional<Metadata>> {
return chainOptionalFutures(children) { it.loadMetadata() }
}
override fun saveEntities(pos: ChunkPos, data: Collection<AbstractEntity>, now: Boolean): Boolean {
return children.any { it.saveEntities(pos, data, now) }
}
override fun saveCells(pos: ChunkPos, data: Object2DArray<out AbstractCell>, state: ChunkState): Boolean {
return children.any { it.saveCells(pos, data, state) }
}
override fun saveMetadata(data: Metadata): Boolean {
return children.any { it.saveMetadata(data) }
}
override fun close() {
children.forEach { it.close() }
}
}
}

View File

@ -11,7 +11,7 @@ class ActionPacer(actions: Int, handicap: Int = 0) {
private val maxBackwardNanos = handicap * delayBetween
private var currentTime = System.nanoTime() - maxBackwardNanos
suspend fun consume(actions: Int = 1) {
fun consumeAndReturnDeadline(actions: Int = 1): Long {
require(actions >= 1) { "Invalid amount of actions to consume: $actions" }
val time = System.nanoTime()
@ -21,6 +21,11 @@ class ActionPacer(actions: Int, handicap: Int = 0) {
currentTime += delayBetween * (actions - 1)
val diff = (currentTime - time) / 1_000_000L
currentTime += delayBetween
return diff
}
suspend fun consume(actions: Int = 1) {
val diff = consumeAndReturnDeadline(actions)
if (diff > 0L) delay(diff)
}
}

View File

@ -7,17 +7,31 @@ import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.LockSupport
class CarriedExecutor(private val parent: Executor) : Executor, Runnable {
class CarriedExecutor(private val parent: Executor, private val allowExecutionInPlace: Boolean = true) : Executor, Runnable {
private val queue = ConcurrentLinkedDeque<Runnable>()
private val isCarried = AtomicBoolean()
override fun execute(command: Runnable) {
queue.add(command)
@Volatile
private var carrierThread: Thread? = null
if (isCarried.compareAndSet(false, true)) {
parent.execute(this)
override fun execute(command: Runnable) {
if (allowExecutionInPlace && carrierThread === Thread.currentThread()) {
// execute blocks in place if we are inside execution loop
try {
command.run()
} catch (err: Throwable) {
LOGGER.error("Exception running task", err)
}
} else {
queue.add(command)
if (isCarried.compareAndSet(false, true)) {
parent.execute(this)
}
}
}
@ -31,6 +45,7 @@ class CarriedExecutor(private val parent: Executor) : Executor, Runnable {
override fun run() {
while (true) {
carrierThread = Thread.currentThread()
var next = queue.poll()
while (next != null) {
@ -43,6 +58,7 @@ class CarriedExecutor(private val parent: Executor) : Executor, Runnable {
next = queue.poll()
}
carrierThread = null
isCarried.set(false)
// spinwait for little

View File

@ -60,6 +60,7 @@ import java.util.function.Predicate
import java.util.random.RandomGenerator
import java.util.stream.Stream
import kotlin.collections.ArrayList
import kotlin.collections.HashMap
import kotlin.math.roundToInt
abstract class World<This : World<This, ChunkType>, ChunkType : Chunk<This, ChunkType>>(val template: WorldTemplate) : ICellAccess {
@ -255,9 +256,29 @@ abstract class World<This : World<This, ChunkType>, ChunkType : Chunk<This, Chun
// generic lock
val lock = ReentrantLock()
/**
* All entities existing in this world, local and remote alike
*/
val entities = Int2ObjectOpenHashMap<AbstractEntity>()
/**
* Entities with set unique (symbolic) id in this world
*/
val uniqueEntities = HashMap<String, AbstractEntity>()
/**
* [entities] values but in fast traversal and copy on write list
*/
val entityList = CopyOnWriteArrayList<AbstractEntity>()
/**
* Entity spatial index
*/
val entityIndex = EntityIndex(geometry)
/**
* Entities with movement controller, to be simulated in parallel
*/
val dynamicEntities = ArrayList<DynamicEntity>()
var playerSpawnPosition = Vector2d.ZERO
@ -588,6 +609,14 @@ abstract class World<This : World<This, ChunkType>, ChunkType : Chunk<This, Chun
// doesn't write stacktrace
class MessageCallException(message: String) : RuntimeException(message, null, true, false)
protected fun createRemoteEntityMessageFuture(sourceConnection: Int, entityID: Either<Int, String>, message: String, arguments: JsonArray): Pair<CompletableFuture<JsonElement>, EntityMessagePacket> {
val future = CompletableFuture<JsonElement>()
val uuid = UUID(random.nextLong(), random.nextLong())
pendingEntityMessages.put(uuid, future)
val packet = EntityMessagePacket(entityID, message, arguments, uuid, sourceConnection)
return future to packet
}
fun dispatchEntityMessage(sourceConnection: Int, entityID: Int, message: String, arguments: JsonArray): CompletableFuture<JsonElement> {
val connectionID = Connection.connectionForEntityID(entityID)
@ -596,17 +625,13 @@ abstract class World<This : World<This, ChunkType>, ChunkType : Chunk<This, Chun
return entity.tryHandleMessage(sourceConnection, message, arguments)
} else {
val connection = remote(connectionID) ?: return CompletableFuture.failedFuture(NoSuchElementException("Can't dispatch entity message, no such connection $connectionID"))
val future = CompletableFuture<JsonElement>()
val uuid = UUID(random.nextLong(), random.nextLong())
pendingEntityMessages.put(uuid, future)
connection.send(EntityMessagePacket(Either.left(entityID), message, arguments, uuid, sourceConnection))
val (future, packet) = createRemoteEntityMessageFuture(sourceConnection, Either.left(entityID), message, arguments)
connection.send(packet)
return future
}
}
fun dispatchEntityMessage(sourceConnection: Int, entityID: String, message: String, arguments: JsonArray): CompletableFuture<JsonElement> {
TODO()
}
abstract fun dispatchEntityMessage(sourceConnection: Int, entityID: String, message: String, arguments: JsonArray): CompletableFuture<JsonElement>
// this *could* have been divided into per-entity map and beheaded world's map
// but we can't, because response packets contain only message UUID, and don't contain entity ID
@ -618,6 +643,8 @@ abstract class World<This : World<This, ChunkType>, ChunkType : Chunk<This, Chun
.removalListener<UUID, CompletableFuture<JsonElement>> { key, value, cause -> if (cause.wasEvicted()) value?.completeExceptionally(TimeoutException("Did not receive response from remote in time")) }
.build()
abstract fun findUniqueEntity(id: String): CompletableFuture<Vector2d?>
companion object {
private val LOGGER = LogManager.getLogger()

View File

@ -117,6 +117,25 @@ abstract class AbstractEntity : Comparable<AbstractEntity> {
*/
val uniqueID = networkedData(null, InternedStringCodec.nullable())
init {
uniqueID.addListener { new, old ->
if (isInWorld) {
if (new != null && new in world.uniqueEntities && world.uniqueEntities[new] != this) {
uniqueID.accept(old) // rollback the change to avoid bad things from happening
throw IllegalArgumentException("Duplicate unique entity ID: $new")
}
if (old != null) {
if (world.uniqueEntities[old] == this) {
world.uniqueEntities.remove(old)
}
}
if (new != null) world.uniqueEntities[new] = this
}
}
}
var description = ""
/**
@ -185,7 +204,7 @@ abstract class AbstractEntity : Comparable<AbstractEntity> {
fun joinWorld(world: World<*, *>) {
if (innerWorld != null)
throw IllegalStateException("Already spawned (in world $innerWorld)")
throw IllegalStateException("Already spawned")
removalReason = null
@ -198,10 +217,14 @@ abstract class AbstractEntity : Comparable<AbstractEntity> {
}
world.eventLoop.ensureSameThread()
check(!world.entities.containsKey(entityID)) { "Duplicate entity ID: $entityID" }
innerWorld = world
uniqueID.get()?.let {
check(it !in world.uniqueEntities) { "Duplicate unique entity ID: $it" }
world.uniqueEntities[it] = this
}
world.entities[entityID] = this
world.entityList.add(this)
spatialEntry = world.entityIndex.Entry(this)
@ -227,8 +250,13 @@ abstract class AbstractEntity : Comparable<AbstractEntity> {
scheduledTasks.forEach { it.cancel(false) }
scheduledTasks.clear()
check(world.entities.remove(entityID) == this) { "Tried to remove $this from $world, but removed something else!" }
uniqueID.get()?.let {
if (world.uniqueEntities[it] == this)
world.uniqueEntities.remove(it)
}
world.entityList.remove(this)
check(world.entities.remove(entityID) == this) { "Tried to remove $this from $world, but removed something else!" }
try {
onRemove(world, reason)