package ru.dbotthepony.kstarbound.network import com.google.common.util.concurrent.ThreadFactoryBuilder import io.netty.channel.Channel import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.channel.ChannelOption import io.netty.channel.nio.NioEventLoopGroup import it.unimi.dsi.fastutil.ints.IntArrayList import kotlinx.coroutines.CoroutineExceptionHandler import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.cancel import org.apache.logging.log4j.LogManager import ru.dbotthepony.kstarbound.io.IntValueCodec import ru.dbotthepony.kstarbound.io.StreamCodec import ru.dbotthepony.kstarbound.io.koptional import ru.dbotthepony.kstarbound.math.AABBi import ru.dbotthepony.kommons.util.KOptional import ru.dbotthepony.kommons.util.getValue import ru.dbotthepony.kommons.util.setValue import ru.dbotthepony.kstarbound.math.vector.Vector2d import ru.dbotthepony.kstarbound.math.vector.Vector2i import ru.dbotthepony.kstarbound.Globals import ru.dbotthepony.kstarbound.defs.EntityDamageTeam import ru.dbotthepony.kstarbound.defs.WarpAction import ru.dbotthepony.kstarbound.defs.WarpMode import ru.dbotthepony.kstarbound.defs.WorldID import ru.dbotthepony.kstarbound.defs.actor.player.ShipUpgrades import ru.dbotthepony.kstarbound.network.syncher.MasterElement import ru.dbotthepony.kstarbound.network.syncher.NetworkedGroup import ru.dbotthepony.kstarbound.network.syncher.NetworkedList import ru.dbotthepony.kstarbound.network.syncher.networkedBoolean import ru.dbotthepony.kstarbound.network.syncher.networkedData import ru.dbotthepony.kstarbound.network.syncher.networkedSignedInt import ru.dbotthepony.kstarbound.server.ServerChannels import ru.dbotthepony.kstarbound.world.UniversePos import ru.dbotthepony.kstarbound.world.entities.player.PlayerEntity import java.io.Closeable import kotlin.math.roundToInt import kotlin.properties.Delegates abstract class Connection(val side: ConnectionSide, val type: ConnectionType) : ChannelInboundHandlerAdapter(), Closeable { abstract override fun channelRead(ctx: ChannelHandlerContext, msg: Any) var character: PlayerEntity? = null val rpc = JsonRPC() var entityIDRange: IntRange by Delegates.notNull() private set var scope: CoroutineScope by Delegates.notNull() private set var connectionID: Int = -1 set(value) { require(value in 1 .. ServerChannels.MAX_PLAYERS) { "Connection ID is out of range: $value" } field = value val begin = value * -65536 entityIDRange = begin .. begin + 65535 } var nickname: String = "" val hasChannel get() = ::channel.isInitialized lateinit var channel: Channel private set var isLegacy: Boolean = true protected set private val handshakeValidator = PacketRegistry.HANDSHAKE.Validator(side) private val handshakeSerializer = PacketRegistry.HANDSHAKE.Serializer(side) private val nativeValidator = PacketRegistry.NATIVE.Validator(side) private val nativeSerializer = PacketRegistry.NATIVE.Serializer(side) private val legacyValidator = PacketRegistry.LEGACY.Validator(side) private val legacySerializer = PacketRegistry.LEGACY.Serializer(side) // whenever protocol was negotiated var isConnected = false private set // whenever successfully joined the game var isReady = false protected set open fun setupLegacy() { isConnected = true LOGGER.info("Handshake successful on ${channel.remoteAddress()}, channel is using legacy protocol") if (type == ConnectionType.MEMORY) { channel.pipeline().remove(handshakeValidator) channel.pipeline().addFirst(legacyValidator) } else { channel.pipeline().remove(handshakeSerializer) channel.pipeline().addFirst(legacySerializer) } isLegacy = true } open fun setupNative() { isConnected = true LOGGER.info("Handshake successful on ${channel.remoteAddress()}, channel is using native protocol") if (type == ConnectionType.MEMORY) { channel.pipeline().remove(handshakeValidator) channel.pipeline().addFirst(nativeValidator) } else { channel.pipeline().remove(handshakeSerializer) channel.pipeline().addFirst(nativeSerializer) } isLegacy = false inGame() } fun bind(channel: Channel) { scope = CoroutineScope(channel.eventLoop().asCoroutineDispatcher() + SupervisorJob() + CoroutineExceptionHandler { coroutineContext, throwable -> disconnect("Uncaught exception in one of connection' coroutines: $throwable") LOGGER.fatal("Uncaught exception in one of $this coroutines", throwable) }) channel.config().setOption(ChannelOption.TCP_NODELAY, true) this.channel = channel if (type == ConnectionType.MEMORY) channel.pipeline().addFirst(handshakeValidator) else channel.pipeline().addFirst(handshakeSerializer) channel.pipeline().addLast(this) channel.closeFuture().addListener { isConnected = false LOGGER.info("$channel is closed") scope.cancel("$channel is closed") disconnect("Connection closed") } } // one connection can be reused for second channel (while first is being closed) override fun isSharable(): Boolean { return true } override fun ensureNotSharable() { } open fun inGame() { isReady = true } fun send(packet: IPacket, flush: Boolean = true) { if (channel.isOpen && isConnected) { channel.write(packet) if (flush) channel.flush() } } abstract fun disconnect(reason: String = "") override fun close() { LOGGER.info("Terminating $this") channel.close() } // global variables (per connection) // clientside variables val client2serverGroup = MasterElement(NetworkedGroup()) var windowXMin by client2serverGroup.upstream.add(networkedSignedInt()) var windowYMin by client2serverGroup.upstream.add(networkedSignedInt()) var windowWidth by client2serverGroup.upstream.add(networkedSignedInt()) var windowHeight by client2serverGroup.upstream.add(networkedSignedInt()) var playerID by client2serverGroup.upstream.add(networkedSignedInt()) val clientSpectatingEntities = NetworkedList(IntValueCodec, elementsFactory = ::IntArrayList).also { client2serverGroup.upstream.add(it) } // serverside variables val server2clientGroup = MasterElement(NetworkedGroup()) var orbitalWarpAction by server2clientGroup.upstream.add(networkedData(KOptional(), warpActionCodec, legacyWarpActionCodec)) var worldID by server2clientGroup.upstream.add(networkedData(WorldID.Limbo, WorldID.CODEC, WorldID.LEGACY_CODEC)) var isAdmin by server2clientGroup.upstream.add(networkedBoolean()) var team by server2clientGroup.upstream.add(networkedData(EntityDamageTeam.FRIENDLY, EntityDamageTeam.CODEC, EntityDamageTeam.LEGACY_CODEC)) var shipUpgrades by server2clientGroup.upstream.add(networkedData(ShipUpgrades(), ShipUpgrades.CODEC, ShipUpgrades.LEGACY_CODEC)) var shipCoordinate by server2clientGroup.upstream.add(networkedData(UniversePos(), UniversePos.CODEC, UniversePos.LEGACY_CODEC)) var playerEntity: PlayerEntity? = null // in tiles fun trackingTileRegions(): List { val result = ArrayList() var mins = Vector2i(windowXMin - Globals.client.windowMonitoringBorder, windowYMin - Globals.client.windowMonitoringBorder) var maxs = Vector2i(windowWidth + Globals.client.windowMonitoringBorder * 2, windowHeight + Globals.client.windowMonitoringBorder * 2) if (maxs.x - mins.x > 1000) { // holy shit val middle = (maxs.x - mins.x) / 2 mins = mins.copy(x = middle - 500) maxs = maxs.copy(x = middle + 500) } if (maxs.y - mins.y > 1000) { // holy shit val middle = (maxs.y - mins.y) / 2 mins = mins.copy(y = middle - 500) maxs = maxs.copy(y = middle + 500) } val window = AABBi(mins, maxs + mins) if (window.mins != Vector2i.ZERO && window.maxs != Vector2i.ZERO) { result.add(window) } val playerEntity = playerEntity if (playerEntity != null) { // add an extra region the size of the window centered on the player's position to prevent nearby sectors // being unloaded due to camera panning or centering on other entities val diff = Vector2d(window.width / 2.0, window.height / 2.0) val pmins = playerEntity.position - diff val pmaxs = playerEntity.position + diff result.add(AABBi( Vector2i(pmins.x.roundToInt(), pmins.y.roundToInt()), Vector2i(pmaxs.x.roundToInt(), pmaxs.y.roundToInt()), )) } for (entity in clientSpectatingEntities) { // TODO } return result } companion object { const val SERVER_CONNECTION_ID = 0 private val LOGGER = LogManager.getLogger() private val warpActionCodec = StreamCodec.Pair(WarpAction.CODEC, WarpMode.CODEC).koptional() private val legacyWarpActionCodec = StreamCodec.Pair(WarpAction.LEGACY_CODEC, WarpMode.CODEC).koptional() fun connectionForEntityID(id: Int): Int { if (id >= 0) { return 0 } else { return (-id - 1) / 65536 + 1 } } val NIO_POOL by lazy { NioEventLoopGroup(1, ThreadFactoryBuilder().setDaemon(true).setNameFormat("Network IO %d").build()) } } }