530 lines
15 KiB
Kotlin
530 lines
15 KiB
Kotlin
package ru.dbotthepony.kstarbound.server
|
|
|
|
import com.google.gson.JsonObject
|
|
import io.netty.channel.ChannelHandlerContext
|
|
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet
|
|
import kotlinx.coroutines.Job
|
|
import kotlinx.coroutines.channels.Channel
|
|
import kotlinx.coroutines.delay
|
|
import kotlinx.coroutines.future.await
|
|
import kotlinx.coroutines.launch
|
|
import org.apache.logging.log4j.LogManager
|
|
import ru.dbotthepony.kommons.io.ByteKey
|
|
import ru.dbotthepony.kommons.util.Either
|
|
import ru.dbotthepony.kommons.util.KOptional
|
|
import ru.dbotthepony.kommons.vector.Vector2i
|
|
import ru.dbotthepony.kommons.vector.Vector3i
|
|
import ru.dbotthepony.kstarbound.Globals
|
|
import ru.dbotthepony.kstarbound.defs.WarpAction
|
|
import ru.dbotthepony.kstarbound.defs.WarpAlias
|
|
import ru.dbotthepony.kstarbound.defs.WarpMode
|
|
import ru.dbotthepony.kstarbound.defs.WorldID
|
|
import ru.dbotthepony.kstarbound.defs.world.CelestialParameters
|
|
import ru.dbotthepony.kstarbound.defs.world.SkyType
|
|
import ru.dbotthepony.kstarbound.defs.world.VisitableWorldParameters
|
|
import ru.dbotthepony.kstarbound.network.Connection
|
|
import ru.dbotthepony.kstarbound.network.ConnectionSide
|
|
import ru.dbotthepony.kstarbound.network.ConnectionType
|
|
import ru.dbotthepony.kstarbound.network.IServerPacket
|
|
import ru.dbotthepony.kstarbound.network.packets.ClientContextUpdatePacket
|
|
import ru.dbotthepony.kstarbound.network.packets.clientbound.CelestialResponsePacket
|
|
import ru.dbotthepony.kstarbound.network.packets.clientbound.PlayerWarpResultPacket
|
|
import ru.dbotthepony.kstarbound.network.packets.clientbound.ServerDisconnectPacket
|
|
import ru.dbotthepony.kstarbound.server.world.ServerWorldTracker
|
|
import ru.dbotthepony.kstarbound.server.world.WorldStorage
|
|
import ru.dbotthepony.kstarbound.server.world.LegacyWorldStorage
|
|
import ru.dbotthepony.kstarbound.server.world.ServerSystemWorld
|
|
import ru.dbotthepony.kstarbound.server.world.ServerWorld
|
|
import ru.dbotthepony.kstarbound.world.SystemWorldLocation
|
|
import ru.dbotthepony.kstarbound.world.UniversePos
|
|
import java.util.HashMap
|
|
import java.util.UUID
|
|
import kotlin.properties.Delegates
|
|
|
|
// serverside part of connection
|
|
class ServerConnection(val server: StarboundServer, type: ConnectionType) : Connection(ConnectionSide.SERVER, type) {
|
|
var tracker: ServerWorldTracker? = null
|
|
var worldStartAcknowledged = false
|
|
var returnWarp: WarpAction? = null
|
|
private var systemWorld: ServerSystemWorld? = null
|
|
|
|
val world: ServerWorld?
|
|
get() = tracker?.world
|
|
|
|
// packets which interact with world must be
|
|
// executed on world's thread
|
|
fun enqueue(task: ServerWorld.() -> Unit): Boolean {
|
|
val isInWorld = tracker?.enqueue(task) != null
|
|
|
|
if (!isInWorld) {
|
|
LOGGER.warn("$this tried to interact with world, but they are not in one.")
|
|
}
|
|
|
|
return isInWorld
|
|
}
|
|
|
|
lateinit var shipWorld: ServerWorld
|
|
private set
|
|
|
|
var uuid: UUID? = null
|
|
|
|
init {
|
|
connectionID = server.channels.nextConnectionID()
|
|
|
|
rpc.add("team.fetchTeamStatus") {
|
|
JsonObject()
|
|
}
|
|
}
|
|
|
|
override fun toString(): String {
|
|
val channel = if (hasChannel) channel.remoteAddress().toString() else "<no channel>"
|
|
val world = tracker?.world?.toString() ?: "<not in world>"
|
|
return "ServerConnection[$nickname $uuid ID=$connectionID channel=$channel / $world]"
|
|
}
|
|
|
|
fun alias(): String {
|
|
return "$nickname <$connectionID/$uuid>"
|
|
}
|
|
|
|
private val shipChunks = HashMap<ByteKey, KOptional<ByteArray>>()
|
|
private val modifiedShipChunks = ObjectOpenHashSet<ByteKey>()
|
|
var shipChunkSource by Delegates.notNull<WorldStorage>()
|
|
private set
|
|
|
|
override fun setupLegacy() {
|
|
super.setupLegacy()
|
|
shipChunkSource = LegacyWorldStorage.memory(shipChunks)
|
|
}
|
|
|
|
override fun setupNative() {
|
|
super.setupNative()
|
|
shipChunkSource = WorldStorage.EMPTY
|
|
}
|
|
|
|
fun receiveShipChunks(chunks: Map<ByteKey, KOptional<ByteArray>>) {
|
|
check(shipChunks.isEmpty()) { "Already has ship chunks" }
|
|
shipChunks.putAll(chunks)
|
|
}
|
|
|
|
private var remoteVersion = 0L
|
|
|
|
override fun flush() {
|
|
if (isConnected) {
|
|
val entries = rpc.write()
|
|
|
|
if (entries != null || modifiedShipChunks.isNotEmpty() || server2clientGroup.upstream.hasChangedSince(remoteVersion)) {
|
|
val (data, version) = server2clientGroup.write(remoteVersion, isLegacy)
|
|
remoteVersion = version
|
|
|
|
channel.write(ClientContextUpdatePacket(
|
|
entries ?: listOf(),
|
|
KOptional(modifiedShipChunks.associateWith { shipChunks[it]!! }),
|
|
KOptional(data)))
|
|
|
|
modifiedShipChunks.clear()
|
|
}
|
|
}
|
|
|
|
super.flush()
|
|
}
|
|
|
|
override fun onChannelClosed() {
|
|
playerEntity = null
|
|
|
|
super.onChannelClosed()
|
|
warpQueue.close()
|
|
server.channels.freeConnectionID(connectionID)
|
|
server.channels.connections.remove(this)
|
|
server.freeNickname(nickname)
|
|
systemWorld?.removeClient(this)
|
|
systemWorld = null
|
|
|
|
announceDisconnect("Connection to remote host is lost.")
|
|
|
|
if (::shipWorld.isInitialized) {
|
|
shipWorld.eventLoop.shutdown()
|
|
}
|
|
|
|
if (countedTowardsPlayerCount) {
|
|
countedTowardsPlayerCount = false
|
|
server.channels.decrementPlayerCount()
|
|
}
|
|
}
|
|
|
|
private val warpQueue = Channel<Pair<WarpAction, Boolean>>(capacity = 10)
|
|
|
|
private suspend fun warpEventLoop() {
|
|
while (true) {
|
|
var (request, deploy) = warpQueue.receive()
|
|
|
|
if (request is WarpAlias)
|
|
request = request.remap(this)
|
|
|
|
LOGGER.info("Trying to warp ${alias()} to $request")
|
|
|
|
val resolve = request.resolve(this)
|
|
|
|
if (resolve.isLimbo) {
|
|
send(PlayerWarpResultPacket(false, request, true))
|
|
} else if (tracker?.world?.worldID == resolve) {
|
|
LOGGER.info("${alias()} tried to warp into world they are already in.")
|
|
send(PlayerWarpResultPacket(true, request, false))
|
|
} else {
|
|
val world = try {
|
|
server.loadWorld(resolve).await()
|
|
} catch (err: Throwable) {
|
|
send(PlayerWarpResultPacket(false, request, false))
|
|
LOGGER.error("Unable to wark ${alias()} to $request", err)
|
|
continue
|
|
}
|
|
|
|
try {
|
|
world.acceptClient(this, request).await()
|
|
|
|
if (resolve is WorldID.ShipWorld) {
|
|
val connection = server.channels.connectionByUUID(resolve.uuid)
|
|
|
|
if (connection != null) {
|
|
orbitalWarpAction = connection.currentOrbitalWarpAction
|
|
}
|
|
} else {
|
|
orbitalWarpAction = KOptional()
|
|
}
|
|
} catch (err: Throwable) {
|
|
send(PlayerWarpResultPacket(false, request, false))
|
|
|
|
if (world == shipWorld) {
|
|
disconnect("ShipWorld refused to accept its owner: $err")
|
|
} else {
|
|
enqueueWarp(returnWarp ?: WarpAlias.OwnShip)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private val flyShipQueue = Channel<Pair<Vector3i, SystemWorldLocation>>(capacity = 40)
|
|
|
|
fun flyShip(system: Vector3i, location: SystemWorldLocation) {
|
|
flyShipQueue.trySend(system to location)
|
|
}
|
|
|
|
private var currentOrbitalWarpAction = KOptional<Pair<WarpAction, WarpMode>>()
|
|
|
|
// coordinates ship flight
|
|
private suspend fun shipFlightEventLoop() {
|
|
shipWorld.sky.skyType = SkyType.ORBITAL
|
|
shipWorld.sky.startFlying(true, true)
|
|
var visited = 0
|
|
|
|
LOGGER.info("Finding starter world for ${alias()}...")
|
|
val params = Globals.universeServer.findStarterWorldParameters
|
|
|
|
// visit all since randomly trying to find specific world is not healthy performance wise
|
|
val found = server.universe.findRandomWorld(params.tries, params.range, visitAll = true, predicate = {
|
|
if (++visited % 600 == 0) {
|
|
LOGGER.info("Still finding starter world for ${alias()}...")
|
|
}
|
|
|
|
val parameters = server.universe.parameters(it) ?: return@findRandomWorld false
|
|
if (parameters.visitableParameters == null) return@findRandomWorld false
|
|
if (!params.starterWorld.test(parameters.visitableParameters!!)) return@findRandomWorld false
|
|
|
|
val children = ArrayList<VisitableWorldParameters>()
|
|
|
|
for (child in server.universe.children(it.system())) {
|
|
val p = server.universe.parameters(child)
|
|
|
|
if (p?.visitableParameters != null) {
|
|
children.add(p.visitableParameters!!)
|
|
}
|
|
|
|
for (child2 in server.universe.children(child)) {
|
|
val p2 = server.universe.parameters(child2)
|
|
|
|
if (p2?.visitableParameters != null) {
|
|
children.add(p2.visitableParameters!!)
|
|
}
|
|
}
|
|
}
|
|
|
|
params.requiredSystemWorlds.all { predicate -> children.any { predicate.test(it) } }
|
|
})
|
|
|
|
if (found == null) {
|
|
LOGGER.fatal("Unable to find starter world for $this!")
|
|
disconnect("Unable to find starter world")
|
|
return
|
|
}
|
|
|
|
LOGGER.info("Found appropriate starter world at $found for ${alias()}")
|
|
|
|
val worldPromise = server.loadSystemWorld(found.location)
|
|
|
|
worldPromise.thenApply {
|
|
systemWorld = it
|
|
|
|
if (!isConnected) {
|
|
it.removeClient(this)
|
|
}
|
|
}
|
|
|
|
var world = worldPromise.await()
|
|
var ship = world.addClient(this, location = SystemWorldLocation.Celestial(found)).await()
|
|
shipWorld.sky.stopFlyingAt(ship.location.skyParameters(world))
|
|
shipCoordinate = found
|
|
|
|
run {
|
|
val action = ship.location.orbitalAction(world)
|
|
currentOrbitalWarpAction = action
|
|
orbitalWarpAction = action
|
|
|
|
for (client in shipWorld.clients) {
|
|
client.client.orbitalWarpAction = action
|
|
}
|
|
}
|
|
|
|
var currentFlightJob: Job? = null
|
|
|
|
while (true) {
|
|
val (system, location) = flyShipQueue.receive()
|
|
|
|
if (system == world.location) {
|
|
// fly ship in current system
|
|
currentFlightJob?.cancel()
|
|
val flight = world.flyShip(this, location)
|
|
|
|
shipWorld.eventLoop.execute {
|
|
shipWorld.sky.startFlying(false)
|
|
}
|
|
|
|
currentFlightJob = scope.launch {
|
|
val coords = flight.await()
|
|
val action = coords.orbitalAction(world)
|
|
currentOrbitalWarpAction = action
|
|
|
|
for (client in shipWorld.clients) {
|
|
client.client.orbitalWarpAction = action
|
|
}
|
|
|
|
val sky = coords.skyParameters(world)
|
|
|
|
shipWorld.eventLoop.execute {
|
|
shipWorld.sky.stopFlyingAt(sky)
|
|
}
|
|
}
|
|
|
|
currentOrbitalWarpAction = KOptional()
|
|
|
|
for (client in shipWorld.clients) {
|
|
client.client.orbitalWarpAction = KOptional()
|
|
}
|
|
} else {
|
|
// we need to travel to other system
|
|
val exists = server.universe.parameters(UniversePos(system)) != null
|
|
|
|
if (!exists)
|
|
continue
|
|
|
|
currentFlightJob?.cancel()
|
|
world.removeClient(this)
|
|
|
|
shipWorld.eventLoop.execute {
|
|
shipWorld.sky.startFlying(true)
|
|
}
|
|
|
|
LOGGER.info("${alias()} is flying to new system: ${UniversePos(system)}")
|
|
val newSystem = server.loadSystemWorld(system)
|
|
|
|
shipCoordinate = UniversePos(system)
|
|
currentOrbitalWarpAction = KOptional()
|
|
|
|
for (client in shipWorld.clients) {
|
|
client.client.orbitalWarpAction = KOptional()
|
|
}
|
|
|
|
newSystem.thenApply {
|
|
systemWorld = it
|
|
|
|
if (!isConnected) {
|
|
it.removeClient(this)
|
|
}
|
|
}
|
|
|
|
world = newSystem.await()
|
|
ship = world.addClient(this).await()
|
|
|
|
val newParams = ship.location.skyParameters(world)
|
|
|
|
shipWorld.eventLoop.execute {
|
|
shipWorld.sky.stopFlyingAt(newParams)
|
|
}
|
|
|
|
// this seems to be way too big isn't it?
|
|
delay((Globals.universeServer.queuedFlightWaitTime * 1000.0).toLong())
|
|
}
|
|
}
|
|
}
|
|
|
|
val celestialRequestQueue = Channel<Either<Vector2i, Vector3i>>(capacity = 100)
|
|
|
|
fun pushCelestialRequests(requests: Iterable<Either<Vector2i, Vector3i>>) {
|
|
requests.forEach {
|
|
celestialRequestQueue.trySend(it)
|
|
}
|
|
}
|
|
|
|
private suspend fun handleCelestialRequests(requests: Collection<Either<Vector2i, Vector3i>>) {
|
|
val responses = ArrayList<Either<CelestialResponsePacket.ChunkData, CelestialResponsePacket.SystemData>>()
|
|
|
|
for (request in requests) {
|
|
if (request.isLeft) {
|
|
val chunkPos = request.left()
|
|
responses.add(Either.left(server.universe.getChunk(chunkPos)?.toNetwork() ?: continue))
|
|
} else {
|
|
val systemPos = UniversePos(request.right())
|
|
val map = HashMap<Int, CelestialResponsePacket.PlanetData>()
|
|
|
|
for (planet in server.universe.children(systemPos)) {
|
|
val planetData = server.universe.parameters(planet) ?: continue
|
|
val children = HashMap<Int, CelestialParameters>()
|
|
|
|
for (satellite in server.universe.children(planet)) {
|
|
children[satellite.satelliteOrbit] = server.universe.parameters(satellite) ?: continue
|
|
}
|
|
|
|
map[planet.planetOrbit] = CelestialResponsePacket.PlanetData(planetData, children)
|
|
}
|
|
|
|
responses.add(Either.right(CelestialResponsePacket.SystemData(systemPos.location, map)))
|
|
}
|
|
}
|
|
|
|
send(CelestialResponsePacket(responses))
|
|
}
|
|
|
|
// protects from malicious actors
|
|
private suspend fun celestialRequestsHandler() {
|
|
while (true) {
|
|
val next = celestialRequestQueue.receive()
|
|
val requests = ArrayList<Either<Vector2i, Vector3i>>()
|
|
requests.add(next)
|
|
|
|
while (true) {
|
|
val tryNext = celestialRequestQueue.tryReceive().getOrNull() ?: break
|
|
requests.add(tryNext)
|
|
}
|
|
|
|
handleCelestialRequests(requests)
|
|
}
|
|
}
|
|
|
|
fun enqueueWarp(destination: WarpAction, deploy: Boolean = false) {
|
|
warpQueue.trySend(destination to deploy)
|
|
}
|
|
|
|
fun tick(delta: Double) {
|
|
if (!isConnected || !channel.isOpen)
|
|
return
|
|
|
|
flush()
|
|
}
|
|
|
|
private var announcedDisconnect = true
|
|
|
|
private fun announceDisconnect(reason: String) {
|
|
if (!announcedDisconnect && nickname.isNotBlank()) {
|
|
if (reason.isBlank()) {
|
|
server.chat.systemMessage("Player '$nickname' disconnected")
|
|
} else {
|
|
server.chat.systemMessage("Player '$nickname' disconnected ($reason)")
|
|
}
|
|
|
|
announcedDisconnect = true
|
|
}
|
|
}
|
|
|
|
override fun disconnect(reason: String) {
|
|
announceDisconnect(reason)
|
|
|
|
if (channel.isOpen) {
|
|
// send pending updates
|
|
flush()
|
|
}
|
|
|
|
tracker?.remove()
|
|
tracker = null
|
|
|
|
if (::shipWorld.isInitialized) {
|
|
shipWorld.eventLoop.shutdown()
|
|
}
|
|
|
|
if (channel.isOpen) {
|
|
// say goodbye
|
|
channel.write(ServerDisconnectPacket(reason))
|
|
channel.flush()
|
|
channel.close()
|
|
}
|
|
}
|
|
|
|
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
|
|
if (!channel.isOpen)
|
|
return
|
|
|
|
if (msg is IServerPacket) {
|
|
try {
|
|
msg.play(this)
|
|
} catch (err: Throwable) {
|
|
LOGGER.error("Failed to handle serverbound packet $msg", err)
|
|
disconnect("Incoming packet caused an exception: $err")
|
|
}
|
|
} else {
|
|
LOGGER.error("Unknown serverbound packet type $msg")
|
|
disconnect("Unknown serverbound packet type $msg")
|
|
}
|
|
}
|
|
|
|
private var countedTowardsPlayerCount = false
|
|
|
|
override fun inGame() {
|
|
announcedDisconnect = false
|
|
server.chat.systemMessage("Player '$nickname' connected")
|
|
countedTowardsPlayerCount = true
|
|
server.channels.incrementPlayerCount()
|
|
|
|
if (isLegacy) {
|
|
scope.launch { celestialRequestsHandler() }
|
|
|
|
server.loadShipWorld(this, shipChunkSource).thenAccept {
|
|
if (!isConnected || !channel.isOpen) {
|
|
LOGGER.warn("$this disconnected before loaded their ShipWorld")
|
|
it.eventLoop.shutdown()
|
|
} else {
|
|
shipWorld = it
|
|
shipWorld.sky.referenceClock = server.universeClock
|
|
// shipWorld.sky.startFlying(true, true)
|
|
shipWorld.eventLoop.start()
|
|
enqueueWarp(WarpAlias.OwnShip)
|
|
shipUpgrades = shipUpgrades.addCapability("planetTravel")
|
|
shipUpgrades = shipUpgrades.addCapability("teleport")
|
|
shipUpgrades = shipUpgrades.copy(maxFuel = 10000, shipLevel = 3)
|
|
scope.launch { shipFlightEventLoop() }
|
|
scope.launch { warpEventLoop() }
|
|
|
|
//if (server.channels.connections.size > 1) {
|
|
// enqueueWarp(WarpAction.Player(server.channels.connections.first().uuid!!))
|
|
//}
|
|
}
|
|
}.exceptionally {
|
|
LOGGER.error("Error while initializing shipworld for $this", it)
|
|
disconnect("Error while initializing shipworld for player: $it")
|
|
null
|
|
}
|
|
}
|
|
}
|
|
|
|
companion object {
|
|
private val LOGGER = LogManager.getLogger()
|
|
}
|
|
}
|