package ru.dbotthepony.kstarbound.server import com.google.gson.JsonObject import io.netty.channel.ChannelHandlerContext import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.future.await import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import ru.dbotthepony.kommons.io.ByteKey import ru.dbotthepony.kommons.util.Either import ru.dbotthepony.kommons.util.KOptional import ru.dbotthepony.kommons.vector.Vector2i import ru.dbotthepony.kommons.vector.Vector3i import ru.dbotthepony.kstarbound.Globals import ru.dbotthepony.kstarbound.defs.WarpAction import ru.dbotthepony.kstarbound.defs.WarpAlias import ru.dbotthepony.kstarbound.defs.WarpMode import ru.dbotthepony.kstarbound.defs.WorldID import ru.dbotthepony.kstarbound.defs.world.CelestialParameters import ru.dbotthepony.kstarbound.defs.world.VisitableWorldParameters import ru.dbotthepony.kstarbound.network.Connection import ru.dbotthepony.kstarbound.network.ConnectionSide import ru.dbotthepony.kstarbound.network.ConnectionType import ru.dbotthepony.kstarbound.network.IServerPacket import ru.dbotthepony.kstarbound.network.packets.ClientContextUpdatePacket import ru.dbotthepony.kstarbound.network.packets.clientbound.CelestialResponsePacket import ru.dbotthepony.kstarbound.network.packets.clientbound.PlayerWarpResultPacket import ru.dbotthepony.kstarbound.network.packets.clientbound.ServerDisconnectPacket import ru.dbotthepony.kstarbound.server.world.ServerWorldTracker import ru.dbotthepony.kstarbound.server.world.WorldStorage import ru.dbotthepony.kstarbound.server.world.LegacyWorldStorage import ru.dbotthepony.kstarbound.server.world.ServerWorld import ru.dbotthepony.kstarbound.world.SystemWorldLocation import ru.dbotthepony.kstarbound.world.UniversePos import java.util.HashMap import java.util.UUID import kotlin.properties.Delegates // serverside part of connection class ServerConnection(val server: StarboundServer, type: ConnectionType) : Connection(ConnectionSide.SERVER, type) { var tracker: ServerWorldTracker? = null var worldStartAcknowledged = false var returnWarp: WarpAction? = null val world: ServerWorld? get() = tracker?.world // packets which interact with world must be // executed on world's thread fun enqueue(task: ServerWorld.() -> Unit) { return tracker?.enqueue(task) ?: throw IllegalStateException("Not in world.") } lateinit var shipWorld: ServerWorld private set var uuid: UUID? = null init { connectionID = server.channels.nextConnectionID() rpc.add("team.fetchTeamStatus") { JsonObject() } } override fun toString(): String { val channel = if (hasChannel) channel.remoteAddress().toString() else "" val world = tracker?.world?.toString() ?: "" return "ServerConnection[$nickname $uuid ID=$connectionID channel=$channel / $world]" } fun alias(): String { return "$nickname <$connectionID/$uuid>" } private val shipChunks = HashMap>() private val modifiedShipChunks = ObjectOpenHashSet() var shipChunkSource by Delegates.notNull() private set override fun setupLegacy() { super.setupLegacy() shipChunkSource = LegacyWorldStorage.memory(shipChunks) } override fun setupNative() { super.setupNative() shipChunkSource = WorldStorage.EMPTY } fun receiveShipChunks(chunks: Map>) { check(shipChunks.isEmpty()) { "Already has ship chunks" } shipChunks.putAll(chunks) } private var remoteVersion = 0L override fun flush() { if (isConnected) { val entries = rpc.write() if (entries != null || modifiedShipChunks.isNotEmpty() || server2clientGroup.upstream.hasChangedSince(remoteVersion)) { val (data, version) = server2clientGroup.write(remoteVersion, isLegacy) remoteVersion = version channel.write(ClientContextUpdatePacket( entries ?: listOf(), KOptional(modifiedShipChunks.associateWith { shipChunks[it]!! }), KOptional(data))) modifiedShipChunks.clear() } } super.flush() } override fun onChannelClosed() { playerEntity = null super.onChannelClosed() warpQueue.close() server.channels.freeConnectionID(connectionID) server.channels.connections.remove(this) server.freeNickname(nickname) announceDisconnect("Connection to remote host is lost.") if (::shipWorld.isInitialized) { shipWorld.close() } if (countedTowardsPlayerCount) { countedTowardsPlayerCount = false server.channels.decrementPlayerCount() } } private val warpQueue = Channel>(capacity = 10) private suspend fun warpEventLoop() { while (true) { var (request, deploy) = warpQueue.receive() if (request is WarpAlias) request = request.remap(this) LOGGER.info("Trying to warp $this to $request") val resolve = request.resolve(this) if (resolve.isLimbo) { send(PlayerWarpResultPacket(false, request, true)) } else if (tracker?.world?.worldID == resolve) { LOGGER.info("$this tried to warp into world they are already in.") send(PlayerWarpResultPacket(true, request, false)) } else { val world = server.worlds[resolve] if (world == null) { send(PlayerWarpResultPacket(false, request, false)) continue } try { world.acceptClient(this, request).await() if (resolve is WorldID.ShipWorld) { val connection = server.channels.connectionByUUID(resolve.uuid) if (connection != null) { orbitalWarpAction = connection.currentOrbitalWarpAction } } else { orbitalWarpAction = KOptional() } } catch (err: Throwable) { send(PlayerWarpResultPacket(false, request, false)) if (world == shipWorld) { disconnect("ShipWorld refused to accept its owner: $err") } else { enqueueWarp(returnWarp ?: WarpAlias.OwnShip) } } } } } private val flyShipQueue = Channel>(capacity = 40) fun flyShip(system: Vector3i, location: SystemWorldLocation) { flyShipQueue.trySend(system to location) } private var currentOrbitalWarpAction = KOptional>() // coordinates ship flight private suspend fun shipFlightEventLoop() { shipWorld.sky.startFlying(true, true) var visited = 0 LOGGER.info("Finding starter world for ${alias()}...") val params = Globals.universeServer.findStarterWorldParameters // visit all since randomly trying to find specific world is not healthy performance wise val found = server.universe.findRandomWorld(params.tries, params.range, visitAll = true, predicate = { if (++visited % 600 == 0) { LOGGER.info("Still finding starter world for ${alias()}...") } val parameters = server.universe.parameters(it) ?: return@findRandomWorld false if (parameters.visitableParameters == null) return@findRandomWorld false if (!params.starterWorld.test(parameters.visitableParameters!!)) return@findRandomWorld false val children = ArrayList() for (child in server.universe.children(it.system())) { val p = server.universe.parameters(child) if (p?.visitableParameters != null) { children.add(p.visitableParameters!!) } for (child2 in server.universe.children(child)) { val p2 = server.universe.parameters(child2) if (p2?.visitableParameters != null) { children.add(p2.visitableParameters!!) } } } params.requiredSystemWorlds.all { predicate -> children.any { predicate.test(it) } } }) if (found == null) { LOGGER.fatal("Unable to find starter world for $this!") disconnect("Unable to find starter world") return } LOGGER.info("Found appropriate starter world at $found for ${alias()}") var world = server.loadSystemWorld(found.location).await() var ship = world.addClient(this, location = SystemWorldLocation.Celestial(found)).await() shipWorld.sky.stopFlyingAt(ship.location.skyParameters(world)) shipCoordinate = found run { val action = ship.location.orbitalAction(world) orbitalWarpAction = action for (client in shipWorld.clients) { client.client.orbitalWarpAction = action } } var currentFlightJob: Job? = null while (true) { val (system, location) = flyShipQueue.receive() if (system == world.location) { // fly ship in current system currentFlightJob?.cancel() val flight = world.flyShip(this, location) shipWorld.mailbox.execute { shipWorld.sky.startFlying(false) } currentFlightJob = scope.launch { val coords = flight.await() val action = coords.orbitalAction(world) currentOrbitalWarpAction = action for (client in shipWorld.clients) { client.client.orbitalWarpAction = action } val sky = coords.skyParameters(world) shipWorld.mailbox.execute { shipWorld.sky.stopFlyingAt(sky) } } currentOrbitalWarpAction = KOptional() for (client in shipWorld.clients) { client.client.orbitalWarpAction = KOptional() } } else { // we need to travel to other system val exists = server.universe.parameters(UniversePos(system)) != null if (!exists) continue currentFlightJob?.cancel() world.removeClient(this) shipWorld.mailbox.execute { shipWorld.sky.startFlying(true) } LOGGER.info("${alias()} is flying to new system: ${UniversePos(system)}") val newSystem = server.loadSystemWorld(system) shipCoordinate = UniversePos(system) currentOrbitalWarpAction = KOptional() for (client in shipWorld.clients) { client.client.orbitalWarpAction = KOptional() } world = newSystem.await() ship = world.addClient(this).await() val newParams = ship.location.skyParameters(world) shipWorld.mailbox.execute { shipWorld.sky.stopFlyingAt(newParams) } // this seems to be way too big isn't it? delay((Globals.universeServer.queuedFlightWaitTime * 1000.0).toLong()) } } } val celestialRequestQueue = Channel>(capacity = 100) fun pushCelestialRequests(requests: Iterable>) { requests.forEach { celestialRequestQueue.trySend(it) } } private suspend fun handleCelestialRequests(requests: Collection>) { val responses = ArrayList>() for (request in requests) { if (request.isLeft) { val chunkPos = request.left() responses.add(Either.left(server.universe.getChunk(chunkPos)?.toNetwork() ?: continue)) } else { val systemPos = UniversePos(request.right()) val map = HashMap() for (planet in server.universe.children(systemPos)) { val planetData = server.universe.parameters(planet) ?: continue val children = HashMap() for (satellite in server.universe.children(planet)) { children[satellite.satelliteOrbit] = server.universe.parameters(satellite) ?: continue } map[planet.planetOrbit] = CelestialResponsePacket.PlanetData(planetData, children) } responses.add(Either.right(CelestialResponsePacket.SystemData(systemPos.location, map))) } } send(CelestialResponsePacket(responses)) } // protects from malicious actors private suspend fun celestialRequestsHandler() { while (true) { val next = celestialRequestQueue.receive() val requests = ArrayList>() requests.add(next) while (true) { val tryNext = celestialRequestQueue.tryReceive().getOrNull() ?: break requests.add(tryNext) } handleCelestialRequests(requests) } } fun enqueueWarp(destination: WarpAction, deploy: Boolean = false) { warpQueue.trySend(destination to deploy) } fun tick() { if (!isConnected || !channel.isOpen) return flush() } private var announcedDisconnect = true private fun announceDisconnect(reason: String) { if (!announcedDisconnect && nickname.isNotBlank()) { if (reason.isBlank()) { server.chat.systemMessage("Player '$nickname' disconnected") } else { server.chat.systemMessage("Player '$nickname' disconnected ($reason)") } announcedDisconnect = true } } override fun disconnect(reason: String) { announceDisconnect(reason) if (channel.isOpen) { // send pending updates flush() } tracker?.remove() tracker = null if (::shipWorld.isInitialized) { shipWorld.close() } if (channel.isOpen) { // say goodbye channel.write(ServerDisconnectPacket(reason)) channel.flush() channel.close() } } override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { if (msg is IServerPacket) { try { msg.play(this) } catch (err: Throwable) { LOGGER.error("Failed to handle serverbound packet $msg", err) disconnect("Incoming packet caused an exception: $err") } } else { LOGGER.error("Unknown serverbound packet type $msg") disconnect("Unknown serverbound packet type $msg") } } private var countedTowardsPlayerCount = false override fun inGame() { announcedDisconnect = false server.chat.systemMessage("Player '$nickname' connected") countedTowardsPlayerCount = true server.channels.incrementPlayerCount() if (isLegacy) { scope.launch { celestialRequestsHandler() } ServerWorld.load(server, shipChunkSource, WorldID.ShipWorld(uuid!!)).thenAccept { if (!isConnected || !channel.isOpen) { LOGGER.warn("$this disconnected before loaded their ShipWorld") it.close() } else { shipWorld = it shipWorld.thread.start() enqueueWarp(WarpAlias.OwnShip) shipUpgrades = shipUpgrades.addCapability("planetTravel") shipUpgrades = shipUpgrades.addCapability("teleport") shipUpgrades = shipUpgrades.copy(maxFuel = 10000, shipLevel = 1) scope.launch { warpEventLoop() } scope.launch { shipFlightEventLoop() } if (server.channels.connections.size > 1) { enqueueWarp(WarpAction.Player(server.channels.connections.first().uuid!!)) } } }.exceptionally { LOGGER.error("Error while initializing shipworld for $this", it) disconnect("Error while initializing shipworld for player: $it") null } } } companion object { private val LOGGER = LogManager.getLogger() } }