From 0fb5359521776fdb9aff99d72ef5d7621174f013 Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Wed, 24 Apr 2024 11:21:23 +0700 Subject: [PATCH] Much smarter tick scheduling --- .../kstarbound/defs/image/Image.kt | 25 ++-- .../server/IntegratedStarboundServer.kt | 1 + .../kstarbound/server/ServerConnection.kt | 18 ++- .../kstarbound/server/StarboundServer.kt | 70 ++--------- .../server/world/ServerSystemWorld.kt | 115 +++++++++++++----- 5 files changed, 123 insertions(+), 106 deletions(-) diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/defs/image/Image.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/defs/image/Image.kt index 9396c8a1..98258dab 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/defs/image/Image.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/defs/image/Image.kt @@ -106,7 +106,7 @@ class Image private constructor( } val data: ByteBuffer - get() = dataCache.get(source).join() + get() = dataFuture.join() val dataFuture: CompletableFuture get() = dataCache.get(source) @@ -191,11 +191,10 @@ class Image private constructor( * returns integer in big-endian ABGR format if it is RGB or RGBA picture, * otherwise returns pixels as-is */ - operator fun get(x: Int, y: Int): Int { + operator fun get(x: Int, y: Int, data: ByteBuffer = this@Image.data): Int { require(x in 0 until width && y in 0 until height) { "Position out of bounds: $x $y" } val offset = (this.y + y) * this@Image.width * 4 + (this.x + x) * 4 - val data = data return data[offset].toInt().and(0xFF) or // red data[offset + 1].toInt().and(0xFF).shl(8) or // green @@ -206,28 +205,30 @@ class Image private constructor( /** * returns integer in ABGR format */ - operator fun get(x: Int, y: Int, flip: Boolean): Int { + operator fun get(x: Int, y: Int, flip: Boolean, data: ByteBuffer = this@Image.data): Int { if (flip) { require(x in 0 until width && y in 0 until height) { "Position out of bounds: $x $y" } - return this[width - x - 1, y] + return this[width - x - 1, y, data] } else { - return this[x, y] + return this[x, y, data] } } - fun isTransparent(x: Int, y: Int, flip: Boolean): Boolean { + fun isTransparent(x: Int, y: Int, flip: Boolean, data: ByteBuffer = this@Image.data): Boolean { if (x !in 0 until width) return true if (y !in 0 until height) return true - return this[x, y, flip] and -0x1000000 == 0x0 + return this[x, y, flip, data] and -0x1000000 == 0x0 } val nonEmptyRegion by lazy { var x0 = 0 var y0 = 0 + val data = data + search@for (y in 0 until height) { for (x in 0 until width) { - if (!isTransparent(x, y, false)) { + if (!isTransparent(x, y, false, data)) { x0 = x y0 = y break@search @@ -240,7 +241,7 @@ class Image private constructor( search@for (y in height - 1 downTo y0) { for (x in width - 1 downTo x0) { - if (!isTransparent(x, y, false)) { + if (!isTransparent(x, y, false, data)) { x1 = x y1 = y break@search @@ -272,6 +273,8 @@ class Image private constructor( // this is weird, but that's how original game handles this // also we don't cache this info since that's a waste of precious ram + val data = data + for (yspace in minY until maxY) { for (xspace in minX until maxX) { var fillRatio = 0.0 @@ -288,7 +291,7 @@ class Image private constructor( if (xpixel !in 0 until width) continue - if (!isTransparent(xpixel, height - ypixel - 1, flip)) { + if (!isTransparent(xpixel, height - ypixel - 1, flip, data)) { fillRatio += 1.0 / (PIXELS_IN_STARBOUND_UNIT * PIXELS_IN_STARBOUND_UNIT) } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/IntegratedStarboundServer.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/IntegratedStarboundServer.kt index dbdc855e..5c8ba08a 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/IntegratedStarboundServer.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/IntegratedStarboundServer.kt @@ -7,6 +7,7 @@ import java.io.File class IntegratedStarboundServer(val client: StarboundClient, root: File) : StarboundServer(root) { init { channels.createLocalChannel() + start() } override fun tick0(delta: Double) { diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt index 79c83d41..ec18e59e 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt @@ -32,6 +32,7 @@ import ru.dbotthepony.kstarbound.network.packets.ClientContextUpdatePacket import ru.dbotthepony.kstarbound.network.packets.clientbound.CelestialResponsePacket import ru.dbotthepony.kstarbound.network.packets.clientbound.PlayerWarpResultPacket import ru.dbotthepony.kstarbound.network.packets.clientbound.ServerDisconnectPacket +import ru.dbotthepony.kstarbound.network.packets.clientbound.UniverseTimeUpdatePacket import ru.dbotthepony.kstarbound.server.world.ServerWorldTracker import ru.dbotthepony.kstarbound.server.world.WorldStorage import ru.dbotthepony.kstarbound.server.world.LegacyWorldStorage @@ -114,6 +115,8 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn private var saveClientContextTask: Future<*>? = null override fun onChannelClosed() { + tickTask?.cancel(false) + sendUniverseTimeTask?.cancel(false) playerEntity = null saveClientContextTask?.cancel(false) @@ -462,7 +465,10 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn warpQueue.trySend(WarpRequest(destination, deploy, ifFailed)) } - fun tick() { + private var tickTask: Future<*>? = null + private var sendUniverseTimeTask: Future<*>? = null + + private fun tick() { if (isConnected && isReady) { val entries = rpc.write() @@ -499,6 +505,7 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn if (channel.isOpen) { // send pending updates + tick() channel.flush() } @@ -591,6 +598,15 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn countedTowardsPlayerCount = true server.channels.incrementPlayerCount() + tickTask = channel.eventLoop().scheduleWithFixedDelay(Runnable { + tick() + }, Starbound.TIMESTEP_NANOS, Starbound.TIMESTEP_NANOS, TimeUnit.NANOSECONDS) + + sendUniverseTimeTask = channel.eventLoop().scheduleWithFixedDelay(Runnable { + if (isReady) + send(UniverseTimeUpdatePacket(server.universeClock.time), false) + }, Globals.universeServer.clockUpdatePacketInterval, Globals.universeServer.clockUpdatePacketInterval, TimeUnit.MILLISECONDS) + if (isLegacy) { scope.launch { celestialRequestsHandler() } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt index f33780f8..146b921b 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/StarboundServer.kt @@ -1,12 +1,8 @@ package ru.dbotthepony.kstarbound.server -import com.github.benmanes.caffeine.cache.CacheLoader -import com.github.benmanes.caffeine.cache.Caffeine import com.google.gson.JsonElement import com.google.gson.JsonObject import com.google.gson.JsonPrimitive -import it.unimi.dsi.fastutil.io.FastByteArrayInputStream -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import it.unimi.dsi.fastutil.objects.ObjectArrayList import it.unimi.dsi.fastutil.objects.ObjectArraySet import kotlinx.coroutines.CoroutineScope @@ -15,7 +11,6 @@ import kotlinx.coroutines.async import kotlinx.coroutines.cancel import kotlinx.coroutines.future.asCompletableFuture import kotlinx.coroutines.future.await -import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import ru.dbotthepony.kommons.util.KOptional import ru.dbotthepony.kstarbound.math.vector.Vector3i @@ -38,13 +33,9 @@ import ru.dbotthepony.kstarbound.server.world.ServerSystemWorld import ru.dbotthepony.kstarbound.server.world.WorldStorage import ru.dbotthepony.kstarbound.util.BlockableEventLoop import ru.dbotthepony.kstarbound.util.JVMClock -import ru.dbotthepony.kstarbound.util.asStringOrNull import ru.dbotthepony.kstarbound.util.random.random import ru.dbotthepony.kstarbound.util.toStarboundString import ru.dbotthepony.kstarbound.util.uuidFromStarboundString -import ru.dbotthepony.kstarbound.world.UniversePos -import java.io.DataInputStream -import java.io.DataOutputStream import java.io.File import java.sql.DriverManager import java.util.UUID @@ -149,6 +140,10 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread private val systemWorlds = HashMap>() + fun notifySystemWorldUnloaded(pos: Vector3i) { + execute { systemWorlds.remove(pos) } + } + private suspend fun loadSystemWorld0(location: Vector3i): ServerSystemWorld? { try { return ServerSystemWorld.create(this, location) @@ -304,26 +299,17 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread } init { - scheduleAtFixedRate(Runnable { - channels.broadcast(UniverseTimeUpdatePacket(universeClock.time), false) - setMetadata("universe_clock", JsonPrimitive(universeClock.time)) - }, Globals.universeServer.clockUpdatePacketInterval, Globals.universeServer.clockUpdatePacketInterval, TimeUnit.MILLISECONDS) - scheduleWithFixedDelay(Runnable { + setMetadata("universe_clock", JsonPrimitive(universeClock.time)) database.commit() universe.flush() }, Globals.universeServer.universeStorageInterval, Globals.universeServer.universeStorageInterval, TimeUnit.MILLISECONDS) scheduleAtFixedRate(Runnable { - tickNormal(Starbound.TIMESTEP) - }, Starbound.TIMESTEP_NANOS, Starbound.TIMESTEP_NANOS, TimeUnit.NANOSECONDS) - - scheduleAtFixedRate(Runnable { - tickSystemWorlds(Starbound.SYSTEM_WORLD_TIMESTEP) + tick(Starbound.SYSTEM_WORLD_TIMESTEP) }, Starbound.SYSTEM_WORLD_TIMESTEP_NANOS, Starbound.SYSTEM_WORLD_TIMESTEP_NANOS, TimeUnit.NANOSECONDS) isDaemon = false - start() } private val occupiedNicknames = ObjectArraySet() @@ -358,50 +344,8 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread protected abstract fun close0() protected abstract fun tick0(delta: Double) - private fun tickSystemWorlds(delta: Double) { - systemWorlds.values.removeIf { - if (it.isCompletedExceptionally) { - return@removeIf true - } - - if (!it.isDone) { - return@removeIf false - } - - if (it.get() == null) { - return@removeIf true - } - - scope.launch { - try { - it.get()!!.tick(delta) - } catch (err: Throwable) { - LOGGER.fatal("Exception in system world $it event loop", err) - } - } - - if (it.get()!!.shouldClose()) { - LOGGER.info("Stopping idling ${it.get()}") - return@removeIf true - } - - return@removeIf false - } - } - - private fun tickNormal(delta: Double) { + private fun tick(delta: Double) { try { - // universeClock.nanos += Starbound.TIMESTEP_NANOS - - channels.connections.forEach { - try { - it.tick() - } catch (err: Throwable) { - LOGGER.error("Exception while ticking client connection", err) - it.disconnect("Exception while ticking client connection: $err") - } - } - tick0(delta) } catch (err: Throwable) { LOGGER.fatal("Exception in main server event loop", err) diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerSystemWorld.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerSystemWorld.kt index 506c4186..f400c9e5 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerSystemWorld.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerSystemWorld.kt @@ -6,6 +6,14 @@ import com.google.gson.JsonObject import it.unimi.dsi.fastutil.bytes.ByteArrayList import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.async +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.future.asCompletableFuture +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import ru.dbotthepony.kommons.io.writeBinaryString import ru.dbotthepony.kommons.io.writeUUID @@ -36,6 +44,8 @@ import java.io.DataOutputStream import java.util.UUID import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.Future +import java.util.concurrent.TimeUnit import java.util.function.Supplier import kotlin.math.PI import kotlin.math.cos @@ -65,8 +75,6 @@ class ServerSystemWorld : SystemWorld { } } - private val tasks = ConcurrentLinkedQueue>() - override fun toString(): String { return "ServerSystemWorld at $systemLocation" } @@ -119,26 +127,25 @@ class ServerSystemWorld : SystemWorld { } fun addClient(client: ServerConnection, shipSpeed: Double = Globals.systemWorld.clientShip.speed, location: SystemWorldLocation = SystemWorldLocation.Transit): CompletableFuture { - val task = Task { addClient0(client, shipSpeed, location) } - tasks.add(task) - return task.future + return scope.async { addClient0(client, shipSpeed, location) }.asCompletableFuture() } fun removeClient(client: ServerConnection): CompletableFuture { - val task = Task { removeClient0(client) } - tasks.add(task) - return task.future - } - - private fun flyShip0(client: ServerConnection, location: SystemWorldLocation, future: CompletableFuture) { - val ship = ships[client.uuid] ?: throw IllegalStateException("No client $client in $this!") - ship.destination(location, future) + return scope.async { removeClient0(client) }.asCompletableFuture() } fun flyShip(client: ServerConnection, location: SystemWorldLocation): CompletableFuture { val future = CompletableFuture() - val task = Task { flyShip0(client, location, future) } - tasks.add(task) + + scope.launch { + val ship = ships[client.uuid] ?: throw IllegalStateException("No client $client in $this!") + ship.destination(location, future) + }.invokeOnCompletion { + if (it != null) { + future.completeExceptionally(it) + } + } + return future } @@ -148,10 +155,13 @@ class ServerSystemWorld : SystemWorld { var lastSpawn = 0.0 private set + private val scope: CoroutineScope + private constructor(server: StarboundServer, location: Vector3i) : super(location, server.universeClock, server.universe) { this.server = server this.lastSpawn = clock.time - Globals.systemWorld.objectSpawnCycle objectSpawnTime = random.nextRange(Globals.systemWorld.objectSpawnInterval) + scope = CoroutineScope(server.coroutines + SupervisorJob()) } private constructor(server: StarboundServer, data: JsonData) : super(data.location, server.universeClock, server.universe) { @@ -163,6 +173,7 @@ class ServerSystemWorld : SystemWorld { } this.lastSpawn = data.lastSpawn + scope = CoroutineScope(server.coroutines + SupervisorJob()) } private suspend fun spawnInitialObjects() { @@ -249,21 +260,42 @@ class ServerSystemWorld : SystemWorld { } private var ticksWithoutPlayers = 0 + private val tickSignal = Channel(120) + private var tickSignaler: Future<*>? = null - fun shouldClose(): Boolean { - return ticksWithoutPlayers > 1800 + // system worlds are very lightweight, launching separate threads for them + // is overkill; launch tick loop inside main server's thread + // However, if this proves to be a problem, then it can be otherwise dispatched to + // common ForkJoinPool in conjunction of CarriedExecutor + ExecutorWithScheduler + fun launchTickLoop() { + check(tickSignaler == null) { "Already ticking" } + + tickSignaler = server.scheduleAtFixedRate( + Runnable { tickSignal.trySend(Starbound.SYSTEM_WORLD_TIMESTEP) }, + Starbound.SYSTEM_WORLD_TIMESTEP_NANOS, + Starbound.SYSTEM_WORLD_TIMESTEP_NANOS, + TimeUnit.NANOSECONDS) + + scope.launch { + while (ticksWithoutPlayers < 600) { + try { + tick(tickSignal.receive()) + } catch (err: Throwable) { + if (err !is CancellationException) + LOGGER.fatal("Exception in System world tick loop", err) + + // TODO: We should probably kick client ships out of system, but to where? + break + } + } + + LOGGER.info("Stopping system world at $location") + tickSignaler?.cancel(false) + server.notifySystemWorldUnloaded(location) + } } - // in original engine, ticking happens at 20 updates per second - // Since there is no Lua driven code, we can tick as fast as we want - suspend fun tick(delta: Double) { - var next = tasks.poll() - - while (next != null) { - next.run() - next = tasks.poll() - } - + private suspend fun tick(delta: Double) { // safeguard for cases when client wasn't removed properly ships.values.removeIf { if (it.shouldRemove()) { @@ -300,7 +332,7 @@ class ServerSystemWorld : SystemWorld { return@removeIf false } - // spawnObjects() + spawnObjects() ships.values.forEach { it.sendUpdates() } @@ -530,17 +562,38 @@ class ServerSystemWorld : SystemWorld { } else { LOGGER.info("Creating new System World at $location") val world = ServerSystemWorld(server, location) - world.spawnInitialObjects() - world.spawnObjects() + + try { + world.spawnInitialObjects() + world.spawnObjects() + world.launchTickLoop() + } catch (err: Throwable) { + world.tickSignaler?.cancel(false) + world.scope.cancel() + world.tickSignal.close() + throw err + } + return world } } suspend fun load(server: StarboundServer, data: JsonElement): ServerSystemWorld { val load = Starbound.gson.fromJson(data, JsonData::class.java) + LOGGER.info("Loading System World at ${load.location}") val world = ServerSystemWorld(server, load) - world.spawnObjects() + + try { + world.spawnObjects() + world.launchTickLoop() + } catch (err: Throwable) { + world.tickSignaler?.cancel(false) + world.scope.cancel() + world.tickSignal.close() + throw err + } + return world } }