diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/serverbound/DamageTileGroupPacket.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/serverbound/DamageTileGroupPacket.kt index 5c79dddc..ed25533f 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/serverbound/DamageTileGroupPacket.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/serverbound/DamageTileGroupPacket.kt @@ -42,8 +42,6 @@ class DamageTileGroupPacket(val tiles: Collection, val isBackground: B } override fun play(connection: ServerConnection) { - connection.enqueue { - damageTiles(tiles, isBackground, sourcePosition, damage) - } + connection.tracker?.damageTiles(tiles, isBackground, sourcePosition, damage) } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/serverbound/ModifyTileListPacket.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/serverbound/ModifyTileListPacket.kt index 79dc00ae..848c523d 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/serverbound/ModifyTileListPacket.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/network/packets/serverbound/ModifyTileListPacket.kt @@ -21,13 +21,7 @@ class ModifyTileListPacket(val modifications: Collection, isBackground: Boolean, sourcePosition: Vector2d, damage: TileDamage, source: AbstractEntity? = null): TileDamageResult { + /** + * this method does not block if pacer is null (safe to use with runBlocking {}) + */ + suspend fun damageTiles(positions: Collection, isBackground: Boolean, sourcePosition: Vector2d, damage: TileDamage, source: AbstractEntity? = null, pacer: Pacer? = null): TileDamageResult { if (damage.amount <= 0.0) return TileDamageResult.NONE @@ -188,6 +193,7 @@ class ServerWorld private constructor( .filter { p -> actualPositions.any { it.first == p } } .toList() + pacer?.consume(10) val broken = entity.damage(occupySpaces, sourcePosition, damage) if (source != null && broken) { @@ -205,6 +211,7 @@ class ServerWorld private constructor( // entity. if (tileEntityResult == TileDamageResult.NONE || damage.type.isPenetrating) { chunk ?: continue + pacer?.consume() val (result, health, tile) = chunk.damageTile(pos - chunk.pos.tile, isBackground, sourcePosition, damage, source) topMost = topMost.coerceAtLeast(result) @@ -225,6 +232,10 @@ class ServerWorld private constructor( } override fun applyTileModifications(modifications: Collection>, allowEntityOverlap: Boolean, ignoreTileProtection: Boolean): List> { + return runBlocking { applyTileModifications(modifications, allowEntityOverlap, ignoreTileProtection, null) } + } + + suspend fun applyTileModifications(modifications: Collection>, allowEntityOverlap: Boolean, ignoreTileProtection: Boolean = false, pacer: Pacer?): List> { val unapplied = ArrayList(modifications) var size: Int @@ -239,6 +250,7 @@ class ServerWorld private constructor( continue if (modification.allowed(this, pos, allowEntityOverlap)) { + pacer?.consume() modification.apply(this, pos, allowEntityOverlap) itr.remove() } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorldTracker.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorldTracker.kt index 223e9ad3..d9a0d566 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorldTracker.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorldTracker.kt @@ -3,13 +3,17 @@ package ru.dbotthepony.kstarbound.server.world import it.unimi.dsi.fastutil.bytes.ByteArrayList import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap import it.unimi.dsi.fastutil.ints.Int2ObjectFunction -import it.unimi.dsi.fastutil.ints.Int2ObjectMaps import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap import it.unimi.dsi.fastutil.ints.IntArrayList import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet -import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenHashSet -import kotlinx.coroutines.future.await +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import ru.dbotthepony.kstarbound.math.vector.Vector2d import ru.dbotthepony.kstarbound.math.vector.Vector2i @@ -18,6 +22,7 @@ import ru.dbotthepony.kstarbound.client.network.packets.ChunkCellsPacket import ru.dbotthepony.kstarbound.defs.SpawnTarget import ru.dbotthepony.kstarbound.defs.WarpAction import ru.dbotthepony.kstarbound.defs.WorldID +import ru.dbotthepony.kstarbound.defs.tile.TileDamage import ru.dbotthepony.kstarbound.defs.world.FlyingType import ru.dbotthepony.kstarbound.network.IPacket import ru.dbotthepony.kstarbound.network.packets.EntityCreatePacket @@ -28,12 +33,15 @@ import ru.dbotthepony.kstarbound.network.packets.clientbound.EnvironmentUpdatePa import ru.dbotthepony.kstarbound.network.packets.clientbound.LegacyTileArrayUpdatePacket import ru.dbotthepony.kstarbound.network.packets.clientbound.LegacyTileUpdatePacket import ru.dbotthepony.kstarbound.network.packets.clientbound.TileDamageUpdatePacket +import ru.dbotthepony.kstarbound.network.packets.clientbound.TileModificationFailurePacket import ru.dbotthepony.kstarbound.network.packets.clientbound.WorldStartPacket import ru.dbotthepony.kstarbound.network.packets.clientbound.WorldStopPacket import ru.dbotthepony.kstarbound.server.ServerConnection +import ru.dbotthepony.kstarbound.util.Pacer import ru.dbotthepony.kstarbound.world.ChunkPos import ru.dbotthepony.kstarbound.world.IChunkListener import ru.dbotthepony.kstarbound.world.TileHealth +import ru.dbotthepony.kstarbound.world.TileModification import ru.dbotthepony.kstarbound.world.api.ImmutableCell import ru.dbotthepony.kstarbound.world.entities.AbstractEntity import ru.dbotthepony.kstarbound.world.entities.player.PlayerEntity @@ -55,6 +63,7 @@ class ServerWorldTracker(val world: ServerWorld, val client: ServerConnection, p private val tickets = HashMap() private val tasks = ConcurrentLinkedQueue Unit>() private val entityVersions = Int2LongOpenHashMap() + private val scope = CoroutineScope(world.eventLoop.coroutines + SupervisorJob()) init { entityVersions.defaultReturnValue(-1L) @@ -66,6 +75,47 @@ class ServerWorldTracker(val world: ServerWorld, val client: ServerConnection, p client.worldID = world.worldID } + private data class DamageTileEntry(val positions: Collection, val isBackground: Boolean, val sourcePosition: Vector2d, val damage: TileDamage, val source: AbstractEntity? = null) + private val damageTilesQueue = Channel(64) // 64 pending tile damages should be enough + private val tileModificationBudget = Pacer.actionsPerSecond(actions = 512, handicap = 2048) // TODO: make this configurable + private val modifyTilesQueue = Channel>, Boolean>>(64) + + private suspend fun damageTilesLoop() { + while (true) { + val (positions, isBackground, sourcePosition, damage, source) = damageTilesQueue.receive() + world.damageTiles(positions, isBackground, sourcePosition, damage, source, tileModificationBudget) + } + } + + fun damageTiles(positions: Collection, isBackground: Boolean, sourcePosition: Vector2d, damage: TileDamage, source: AbstractEntity? = null) { + damageTilesQueue.trySend(DamageTileEntry(positions, isBackground, sourcePosition, damage, source)) + } + + private suspend fun modifyTilesLoop() { + while (true) { + val (modifications, allowEntityOverlap) = modifyTilesQueue.receive() + + try { + val unapplied = world.applyTileModifications(modifications, allowEntityOverlap, pacer = tileModificationBudget) + + if (unapplied.isNotEmpty()) { + client.send(TileModificationFailurePacket(unapplied)) + } + } catch (err: CancellationException) { + client.send(TileModificationFailurePacket(modifications)) + } + } + } + + fun modifyTiles(modifications: Collection>, allowEntityOverlap: Boolean) { + modifyTilesQueue.trySend(modifications to allowEntityOverlap) + } + + init { + scope.launch { damageTilesLoop() } + scope.launch { modifyTilesLoop() } + } + fun send(packet: IPacket) = client.send(packet) // packets which interact with world must be @@ -301,6 +351,10 @@ class ServerWorldTracker(val world: ServerWorld, val client: ServerConnection, p // this handles case where player is removed from world and // instantly added back because new world rejected us world.eventLoop.execute { remove0() } + + damageTilesQueue.close() + modifyTilesQueue.close() + scope.cancel() } } diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/util/Pacer.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/util/Pacer.kt new file mode 100644 index 00000000..58a6622f --- /dev/null +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/util/Pacer.kt @@ -0,0 +1,31 @@ +package ru.dbotthepony.kstarbound.util + +import kotlinx.coroutines.delay + +/** + * Allows to perform up to [maxForward] actions per given time window, + * otherwise pauses execution + */ +class Pacer(val maxForward: Int, val delayBetween: Long) { + private val maxForwardNanos = maxForward * delayBetween + private var currentTime = System.nanoTime() - maxForwardNanos + + suspend fun consume(actions: Int = 1) { + require(actions >= 1) { "Invalid amount of actions to consume: $actions" } + val time = System.nanoTime() + + if (time - currentTime > maxForwardNanos) + currentTime = time - maxForwardNanos + + currentTime += delayBetween * (actions - 1) + val diff = (currentTime - time) / 1_000_000L + currentTime += delayBetween + if (diff > 0L) delay(diff) + } + + companion object { + fun actionsPerSecond(actions: Int, handicap: Int = 0): Pacer { + return Pacer(handicap, 1_000_000_000L / actions) + } + } +}