Much smarter tick scheduling

This commit is contained in:
DBotThePony 2024-04-24 11:21:23 +07:00
parent d93cc21dcd
commit 0fb5359521
Signed by: DBot
GPG Key ID: DCC23B5715498507
5 changed files with 123 additions and 106 deletions

View File

@ -106,7 +106,7 @@ class Image private constructor(
} }
val data: ByteBuffer val data: ByteBuffer
get() = dataCache.get(source).join() get() = dataFuture.join()
val dataFuture: CompletableFuture<ByteBuffer> val dataFuture: CompletableFuture<ByteBuffer>
get() = dataCache.get(source) get() = dataCache.get(source)
@ -191,11 +191,10 @@ class Image private constructor(
* returns integer in big-endian ABGR format if it is RGB or RGBA picture, * returns integer in big-endian ABGR format if it is RGB or RGBA picture,
* otherwise returns pixels as-is * otherwise returns pixels as-is
*/ */
operator fun get(x: Int, y: Int): Int { operator fun get(x: Int, y: Int, data: ByteBuffer = this@Image.data): Int {
require(x in 0 until width && y in 0 until height) { "Position out of bounds: $x $y" } require(x in 0 until width && y in 0 until height) { "Position out of bounds: $x $y" }
val offset = (this.y + y) * this@Image.width * 4 + (this.x + x) * 4 val offset = (this.y + y) * this@Image.width * 4 + (this.x + x) * 4
val data = data
return data[offset].toInt().and(0xFF) or // red return data[offset].toInt().and(0xFF) or // red
data[offset + 1].toInt().and(0xFF).shl(8) or // green data[offset + 1].toInt().and(0xFF).shl(8) or // green
@ -206,28 +205,30 @@ class Image private constructor(
/** /**
* returns integer in ABGR format * returns integer in ABGR format
*/ */
operator fun get(x: Int, y: Int, flip: Boolean): Int { operator fun get(x: Int, y: Int, flip: Boolean, data: ByteBuffer = this@Image.data): Int {
if (flip) { if (flip) {
require(x in 0 until width && y in 0 until height) { "Position out of bounds: $x $y" } require(x in 0 until width && y in 0 until height) { "Position out of bounds: $x $y" }
return this[width - x - 1, y] return this[width - x - 1, y, data]
} else { } else {
return this[x, y] return this[x, y, data]
} }
} }
fun isTransparent(x: Int, y: Int, flip: Boolean): Boolean { fun isTransparent(x: Int, y: Int, flip: Boolean, data: ByteBuffer = this@Image.data): Boolean {
if (x !in 0 until width) return true if (x !in 0 until width) return true
if (y !in 0 until height) return true if (y !in 0 until height) return true
return this[x, y, flip] and -0x1000000 == 0x0 return this[x, y, flip, data] and -0x1000000 == 0x0
} }
val nonEmptyRegion by lazy { val nonEmptyRegion by lazy {
var x0 = 0 var x0 = 0
var y0 = 0 var y0 = 0
val data = data
search@for (y in 0 until height) { search@for (y in 0 until height) {
for (x in 0 until width) { for (x in 0 until width) {
if (!isTransparent(x, y, false)) { if (!isTransparent(x, y, false, data)) {
x0 = x x0 = x
y0 = y y0 = y
break@search break@search
@ -240,7 +241,7 @@ class Image private constructor(
search@for (y in height - 1 downTo y0) { search@for (y in height - 1 downTo y0) {
for (x in width - 1 downTo x0) { for (x in width - 1 downTo x0) {
if (!isTransparent(x, y, false)) { if (!isTransparent(x, y, false, data)) {
x1 = x x1 = x
y1 = y y1 = y
break@search break@search
@ -272,6 +273,8 @@ class Image private constructor(
// this is weird, but that's how original game handles this // this is weird, but that's how original game handles this
// also we don't cache this info since that's a waste of precious ram // also we don't cache this info since that's a waste of precious ram
val data = data
for (yspace in minY until maxY) { for (yspace in minY until maxY) {
for (xspace in minX until maxX) { for (xspace in minX until maxX) {
var fillRatio = 0.0 var fillRatio = 0.0
@ -288,7 +291,7 @@ class Image private constructor(
if (xpixel !in 0 until width) if (xpixel !in 0 until width)
continue continue
if (!isTransparent(xpixel, height - ypixel - 1, flip)) { if (!isTransparent(xpixel, height - ypixel - 1, flip, data)) {
fillRatio += 1.0 / (PIXELS_IN_STARBOUND_UNIT * PIXELS_IN_STARBOUND_UNIT) fillRatio += 1.0 / (PIXELS_IN_STARBOUND_UNIT * PIXELS_IN_STARBOUND_UNIT)
} }
} }

View File

@ -7,6 +7,7 @@ import java.io.File
class IntegratedStarboundServer(val client: StarboundClient, root: File) : StarboundServer(root) { class IntegratedStarboundServer(val client: StarboundClient, root: File) : StarboundServer(root) {
init { init {
channels.createLocalChannel() channels.createLocalChannel()
start()
} }
override fun tick0(delta: Double) { override fun tick0(delta: Double) {

View File

@ -32,6 +32,7 @@ import ru.dbotthepony.kstarbound.network.packets.ClientContextUpdatePacket
import ru.dbotthepony.kstarbound.network.packets.clientbound.CelestialResponsePacket import ru.dbotthepony.kstarbound.network.packets.clientbound.CelestialResponsePacket
import ru.dbotthepony.kstarbound.network.packets.clientbound.PlayerWarpResultPacket import ru.dbotthepony.kstarbound.network.packets.clientbound.PlayerWarpResultPacket
import ru.dbotthepony.kstarbound.network.packets.clientbound.ServerDisconnectPacket import ru.dbotthepony.kstarbound.network.packets.clientbound.ServerDisconnectPacket
import ru.dbotthepony.kstarbound.network.packets.clientbound.UniverseTimeUpdatePacket
import ru.dbotthepony.kstarbound.server.world.ServerWorldTracker import ru.dbotthepony.kstarbound.server.world.ServerWorldTracker
import ru.dbotthepony.kstarbound.server.world.WorldStorage import ru.dbotthepony.kstarbound.server.world.WorldStorage
import ru.dbotthepony.kstarbound.server.world.LegacyWorldStorage import ru.dbotthepony.kstarbound.server.world.LegacyWorldStorage
@ -114,6 +115,8 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn
private var saveClientContextTask: Future<*>? = null private var saveClientContextTask: Future<*>? = null
override fun onChannelClosed() { override fun onChannelClosed() {
tickTask?.cancel(false)
sendUniverseTimeTask?.cancel(false)
playerEntity = null playerEntity = null
saveClientContextTask?.cancel(false) saveClientContextTask?.cancel(false)
@ -462,7 +465,10 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn
warpQueue.trySend(WarpRequest(destination, deploy, ifFailed)) warpQueue.trySend(WarpRequest(destination, deploy, ifFailed))
} }
fun tick() { private var tickTask: Future<*>? = null
private var sendUniverseTimeTask: Future<*>? = null
private fun tick() {
if (isConnected && isReady) { if (isConnected && isReady) {
val entries = rpc.write() val entries = rpc.write()
@ -499,6 +505,7 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn
if (channel.isOpen) { if (channel.isOpen) {
// send pending updates // send pending updates
tick()
channel.flush() channel.flush()
} }
@ -591,6 +598,15 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn
countedTowardsPlayerCount = true countedTowardsPlayerCount = true
server.channels.incrementPlayerCount() server.channels.incrementPlayerCount()
tickTask = channel.eventLoop().scheduleWithFixedDelay(Runnable {
tick()
}, Starbound.TIMESTEP_NANOS, Starbound.TIMESTEP_NANOS, TimeUnit.NANOSECONDS)
sendUniverseTimeTask = channel.eventLoop().scheduleWithFixedDelay(Runnable {
if (isReady)
send(UniverseTimeUpdatePacket(server.universeClock.time), false)
}, Globals.universeServer.clockUpdatePacketInterval, Globals.universeServer.clockUpdatePacketInterval, TimeUnit.MILLISECONDS)
if (isLegacy) { if (isLegacy) {
scope.launch { celestialRequestsHandler() } scope.launch { celestialRequestsHandler() }

View File

@ -1,12 +1,8 @@
package ru.dbotthepony.kstarbound.server package ru.dbotthepony.kstarbound.server
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.google.gson.JsonElement import com.google.gson.JsonElement
import com.google.gson.JsonObject import com.google.gson.JsonObject
import com.google.gson.JsonPrimitive import com.google.gson.JsonPrimitive
import it.unimi.dsi.fastutil.io.FastByteArrayInputStream
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import it.unimi.dsi.fastutil.objects.ObjectArrayList import it.unimi.dsi.fastutil.objects.ObjectArrayList
import it.unimi.dsi.fastutil.objects.ObjectArraySet import it.unimi.dsi.fastutil.objects.ObjectArraySet
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
@ -15,7 +11,6 @@ import kotlinx.coroutines.async
import kotlinx.coroutines.cancel import kotlinx.coroutines.cancel
import kotlinx.coroutines.future.asCompletableFuture import kotlinx.coroutines.future.asCompletableFuture
import kotlinx.coroutines.future.await import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import ru.dbotthepony.kommons.util.KOptional import ru.dbotthepony.kommons.util.KOptional
import ru.dbotthepony.kstarbound.math.vector.Vector3i import ru.dbotthepony.kstarbound.math.vector.Vector3i
@ -38,13 +33,9 @@ import ru.dbotthepony.kstarbound.server.world.ServerSystemWorld
import ru.dbotthepony.kstarbound.server.world.WorldStorage import ru.dbotthepony.kstarbound.server.world.WorldStorage
import ru.dbotthepony.kstarbound.util.BlockableEventLoop import ru.dbotthepony.kstarbound.util.BlockableEventLoop
import ru.dbotthepony.kstarbound.util.JVMClock import ru.dbotthepony.kstarbound.util.JVMClock
import ru.dbotthepony.kstarbound.util.asStringOrNull
import ru.dbotthepony.kstarbound.util.random.random import ru.dbotthepony.kstarbound.util.random.random
import ru.dbotthepony.kstarbound.util.toStarboundString import ru.dbotthepony.kstarbound.util.toStarboundString
import ru.dbotthepony.kstarbound.util.uuidFromStarboundString import ru.dbotthepony.kstarbound.util.uuidFromStarboundString
import ru.dbotthepony.kstarbound.world.UniversePos
import java.io.DataInputStream
import java.io.DataOutputStream
import java.io.File import java.io.File
import java.sql.DriverManager import java.sql.DriverManager
import java.util.UUID import java.util.UUID
@ -149,6 +140,10 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread
private val systemWorlds = HashMap<Vector3i, CompletableFuture<ServerSystemWorld?>>() private val systemWorlds = HashMap<Vector3i, CompletableFuture<ServerSystemWorld?>>()
fun notifySystemWorldUnloaded(pos: Vector3i) {
execute { systemWorlds.remove(pos) }
}
private suspend fun loadSystemWorld0(location: Vector3i): ServerSystemWorld? { private suspend fun loadSystemWorld0(location: Vector3i): ServerSystemWorld? {
try { try {
return ServerSystemWorld.create(this, location) return ServerSystemWorld.create(this, location)
@ -304,26 +299,17 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread
} }
init { init {
scheduleAtFixedRate(Runnable {
channels.broadcast(UniverseTimeUpdatePacket(universeClock.time), false)
setMetadata("universe_clock", JsonPrimitive(universeClock.time))
}, Globals.universeServer.clockUpdatePacketInterval, Globals.universeServer.clockUpdatePacketInterval, TimeUnit.MILLISECONDS)
scheduleWithFixedDelay(Runnable { scheduleWithFixedDelay(Runnable {
setMetadata("universe_clock", JsonPrimitive(universeClock.time))
database.commit() database.commit()
universe.flush() universe.flush()
}, Globals.universeServer.universeStorageInterval, Globals.universeServer.universeStorageInterval, TimeUnit.MILLISECONDS) }, Globals.universeServer.universeStorageInterval, Globals.universeServer.universeStorageInterval, TimeUnit.MILLISECONDS)
scheduleAtFixedRate(Runnable { scheduleAtFixedRate(Runnable {
tickNormal(Starbound.TIMESTEP) tick(Starbound.SYSTEM_WORLD_TIMESTEP)
}, Starbound.TIMESTEP_NANOS, Starbound.TIMESTEP_NANOS, TimeUnit.NANOSECONDS)
scheduleAtFixedRate(Runnable {
tickSystemWorlds(Starbound.SYSTEM_WORLD_TIMESTEP)
}, Starbound.SYSTEM_WORLD_TIMESTEP_NANOS, Starbound.SYSTEM_WORLD_TIMESTEP_NANOS, TimeUnit.NANOSECONDS) }, Starbound.SYSTEM_WORLD_TIMESTEP_NANOS, Starbound.SYSTEM_WORLD_TIMESTEP_NANOS, TimeUnit.NANOSECONDS)
isDaemon = false isDaemon = false
start()
} }
private val occupiedNicknames = ObjectArraySet<String>() private val occupiedNicknames = ObjectArraySet<String>()
@ -358,50 +344,8 @@ sealed class StarboundServer(val root: File) : BlockableEventLoop("Server thread
protected abstract fun close0() protected abstract fun close0()
protected abstract fun tick0(delta: Double) protected abstract fun tick0(delta: Double)
private fun tickSystemWorlds(delta: Double) { private fun tick(delta: Double) {
systemWorlds.values.removeIf {
if (it.isCompletedExceptionally) {
return@removeIf true
}
if (!it.isDone) {
return@removeIf false
}
if (it.get() == null) {
return@removeIf true
}
scope.launch {
try {
it.get()!!.tick(delta)
} catch (err: Throwable) {
LOGGER.fatal("Exception in system world $it event loop", err)
}
}
if (it.get()!!.shouldClose()) {
LOGGER.info("Stopping idling ${it.get()}")
return@removeIf true
}
return@removeIf false
}
}
private fun tickNormal(delta: Double) {
try { try {
// universeClock.nanos += Starbound.TIMESTEP_NANOS
channels.connections.forEach {
try {
it.tick()
} catch (err: Throwable) {
LOGGER.error("Exception while ticking client connection", err)
it.disconnect("Exception while ticking client connection: $err")
}
}
tick0(delta) tick0(delta)
} catch (err: Throwable) { } catch (err: Throwable) {
LOGGER.fatal("Exception in main server event loop", err) LOGGER.fatal("Exception in main server event loop", err)

View File

@ -6,6 +6,14 @@ import com.google.gson.JsonObject
import it.unimi.dsi.fastutil.bytes.ByteArrayList import it.unimi.dsi.fastutil.bytes.ByteArrayList
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.future.asCompletableFuture
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import ru.dbotthepony.kommons.io.writeBinaryString import ru.dbotthepony.kommons.io.writeBinaryString
import ru.dbotthepony.kommons.io.writeUUID import ru.dbotthepony.kommons.io.writeUUID
@ -36,6 +44,8 @@ import java.io.DataOutputStream
import java.util.UUID import java.util.UUID
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.function.Supplier import java.util.function.Supplier
import kotlin.math.PI import kotlin.math.PI
import kotlin.math.cos import kotlin.math.cos
@ -65,8 +75,6 @@ class ServerSystemWorld : SystemWorld {
} }
} }
private val tasks = ConcurrentLinkedQueue<Task<*>>()
override fun toString(): String { override fun toString(): String {
return "ServerSystemWorld at $systemLocation" return "ServerSystemWorld at $systemLocation"
} }
@ -119,26 +127,25 @@ class ServerSystemWorld : SystemWorld {
} }
fun addClient(client: ServerConnection, shipSpeed: Double = Globals.systemWorld.clientShip.speed, location: SystemWorldLocation = SystemWorldLocation.Transit): CompletableFuture<ServerShip> { fun addClient(client: ServerConnection, shipSpeed: Double = Globals.systemWorld.clientShip.speed, location: SystemWorldLocation = SystemWorldLocation.Transit): CompletableFuture<ServerShip> {
val task = Task { addClient0(client, shipSpeed, location) } return scope.async { addClient0(client, shipSpeed, location) }.asCompletableFuture()
tasks.add(task)
return task.future
} }
fun removeClient(client: ServerConnection): CompletableFuture<Unit> { fun removeClient(client: ServerConnection): CompletableFuture<Unit> {
val task = Task { removeClient0(client) } return scope.async { removeClient0(client) }.asCompletableFuture()
tasks.add(task)
return task.future
}
private fun flyShip0(client: ServerConnection, location: SystemWorldLocation, future: CompletableFuture<SystemWorldLocation>) {
val ship = ships[client.uuid] ?: throw IllegalStateException("No client $client in $this!")
ship.destination(location, future)
} }
fun flyShip(client: ServerConnection, location: SystemWorldLocation): CompletableFuture<SystemWorldLocation> { fun flyShip(client: ServerConnection, location: SystemWorldLocation): CompletableFuture<SystemWorldLocation> {
val future = CompletableFuture<SystemWorldLocation>() val future = CompletableFuture<SystemWorldLocation>()
val task = Task { flyShip0(client, location, future) }
tasks.add(task) scope.launch {
val ship = ships[client.uuid] ?: throw IllegalStateException("No client $client in $this!")
ship.destination(location, future)
}.invokeOnCompletion {
if (it != null) {
future.completeExceptionally(it)
}
}
return future return future
} }
@ -148,10 +155,13 @@ class ServerSystemWorld : SystemWorld {
var lastSpawn = 0.0 var lastSpawn = 0.0
private set private set
private val scope: CoroutineScope
private constructor(server: StarboundServer, location: Vector3i) : super(location, server.universeClock, server.universe) { private constructor(server: StarboundServer, location: Vector3i) : super(location, server.universeClock, server.universe) {
this.server = server this.server = server
this.lastSpawn = clock.time - Globals.systemWorld.objectSpawnCycle this.lastSpawn = clock.time - Globals.systemWorld.objectSpawnCycle
objectSpawnTime = random.nextRange(Globals.systemWorld.objectSpawnInterval) objectSpawnTime = random.nextRange(Globals.systemWorld.objectSpawnInterval)
scope = CoroutineScope(server.coroutines + SupervisorJob())
} }
private constructor(server: StarboundServer, data: JsonData) : super(data.location, server.universeClock, server.universe) { private constructor(server: StarboundServer, data: JsonData) : super(data.location, server.universeClock, server.universe) {
@ -163,6 +173,7 @@ class ServerSystemWorld : SystemWorld {
} }
this.lastSpawn = data.lastSpawn this.lastSpawn = data.lastSpawn
scope = CoroutineScope(server.coroutines + SupervisorJob())
} }
private suspend fun spawnInitialObjects() { private suspend fun spawnInitialObjects() {
@ -249,21 +260,42 @@ class ServerSystemWorld : SystemWorld {
} }
private var ticksWithoutPlayers = 0 private var ticksWithoutPlayers = 0
private val tickSignal = Channel<Double>(120)
private var tickSignaler: Future<*>? = null
fun shouldClose(): Boolean { // system worlds are very lightweight, launching separate threads for them
return ticksWithoutPlayers > 1800 // is overkill; launch tick loop inside main server's thread
// However, if this proves to be a problem, then it can be otherwise dispatched to
// common ForkJoinPool in conjunction of CarriedExecutor + ExecutorWithScheduler
fun launchTickLoop() {
check(tickSignaler == null) { "Already ticking" }
tickSignaler = server.scheduleAtFixedRate(
Runnable { tickSignal.trySend(Starbound.SYSTEM_WORLD_TIMESTEP) },
Starbound.SYSTEM_WORLD_TIMESTEP_NANOS,
Starbound.SYSTEM_WORLD_TIMESTEP_NANOS,
TimeUnit.NANOSECONDS)
scope.launch {
while (ticksWithoutPlayers < 600) {
try {
tick(tickSignal.receive())
} catch (err: Throwable) {
if (err !is CancellationException)
LOGGER.fatal("Exception in System world tick loop", err)
// TODO: We should probably kick client ships out of system, but to where?
break
}
}
LOGGER.info("Stopping system world at $location")
tickSignaler?.cancel(false)
server.notifySystemWorldUnloaded(location)
}
} }
// in original engine, ticking happens at 20 updates per second private suspend fun tick(delta: Double) {
// Since there is no Lua driven code, we can tick as fast as we want
suspend fun tick(delta: Double) {
var next = tasks.poll()
while (next != null) {
next.run()
next = tasks.poll()
}
// safeguard for cases when client wasn't removed properly // safeguard for cases when client wasn't removed properly
ships.values.removeIf { ships.values.removeIf {
if (it.shouldRemove()) { if (it.shouldRemove()) {
@ -300,7 +332,7 @@ class ServerSystemWorld : SystemWorld {
return@removeIf false return@removeIf false
} }
// spawnObjects() spawnObjects()
ships.values.forEach { it.sendUpdates() } ships.values.forEach { it.sendUpdates() }
@ -530,17 +562,38 @@ class ServerSystemWorld : SystemWorld {
} else { } else {
LOGGER.info("Creating new System World at $location") LOGGER.info("Creating new System World at $location")
val world = ServerSystemWorld(server, location) val world = ServerSystemWorld(server, location)
world.spawnInitialObjects()
world.spawnObjects() try {
world.spawnInitialObjects()
world.spawnObjects()
world.launchTickLoop()
} catch (err: Throwable) {
world.tickSignaler?.cancel(false)
world.scope.cancel()
world.tickSignal.close()
throw err
}
return world return world
} }
} }
suspend fun load(server: StarboundServer, data: JsonElement): ServerSystemWorld { suspend fun load(server: StarboundServer, data: JsonElement): ServerSystemWorld {
val load = Starbound.gson.fromJson(data, JsonData::class.java) val load = Starbound.gson.fromJson(data, JsonData::class.java)
LOGGER.info("Loading System World at ${load.location}") LOGGER.info("Loading System World at ${load.location}")
val world = ServerSystemWorld(server, load) val world = ServerSystemWorld(server, load)
world.spawnObjects()
try {
world.spawnObjects()
world.launchTickLoop()
} catch (err: Throwable) {
world.tickSignaler?.cancel(false)
world.scope.cancel()
world.tickSignal.close()
throw err
}
return world return world
} }
} }