266 lines
8.8 KiB
Kotlin
266 lines
8.8 KiB
Kotlin
package ru.dbotthepony.kstarbound.network
|
|
|
|
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
|
import io.netty.channel.Channel
|
|
import io.netty.channel.ChannelHandlerContext
|
|
import io.netty.channel.ChannelInboundHandlerAdapter
|
|
import io.netty.channel.ChannelOption
|
|
import io.netty.channel.nio.NioEventLoopGroup
|
|
import it.unimi.dsi.fastutil.ints.IntArrayList
|
|
import kotlinx.coroutines.CoroutineExceptionHandler
|
|
import kotlinx.coroutines.CoroutineScope
|
|
import kotlinx.coroutines.SupervisorJob
|
|
import kotlinx.coroutines.asCoroutineDispatcher
|
|
import kotlinx.coroutines.cancel
|
|
import org.apache.logging.log4j.LogManager
|
|
import ru.dbotthepony.kstarbound.io.IntValueCodec
|
|
import ru.dbotthepony.kstarbound.io.StreamCodec
|
|
import ru.dbotthepony.kstarbound.io.koptional
|
|
import ru.dbotthepony.kstarbound.math.AABBi
|
|
import ru.dbotthepony.kommons.util.KOptional
|
|
import ru.dbotthepony.kommons.util.getValue
|
|
import ru.dbotthepony.kommons.util.setValue
|
|
import ru.dbotthepony.kstarbound.math.vector.Vector2d
|
|
import ru.dbotthepony.kstarbound.math.vector.Vector2i
|
|
import ru.dbotthepony.kstarbound.Globals
|
|
import ru.dbotthepony.kstarbound.defs.EntityDamageTeam
|
|
import ru.dbotthepony.kstarbound.defs.WarpAction
|
|
import ru.dbotthepony.kstarbound.defs.WarpMode
|
|
import ru.dbotthepony.kstarbound.defs.WorldID
|
|
import ru.dbotthepony.kstarbound.defs.actor.player.ShipUpgrades
|
|
import ru.dbotthepony.kstarbound.network.syncher.MasterElement
|
|
import ru.dbotthepony.kstarbound.network.syncher.NetworkedGroup
|
|
import ru.dbotthepony.kstarbound.network.syncher.NetworkedList
|
|
import ru.dbotthepony.kstarbound.network.syncher.networkedBoolean
|
|
import ru.dbotthepony.kstarbound.network.syncher.networkedData
|
|
import ru.dbotthepony.kstarbound.network.syncher.networkedSignedInt
|
|
import ru.dbotthepony.kstarbound.server.ServerChannels
|
|
import ru.dbotthepony.kstarbound.world.UniversePos
|
|
import ru.dbotthepony.kstarbound.world.entities.player.PlayerEntity
|
|
import java.io.Closeable
|
|
import kotlin.math.roundToInt
|
|
import kotlin.properties.Delegates
|
|
|
|
abstract class Connection(val side: ConnectionSide, val type: ConnectionType) : ChannelInboundHandlerAdapter(), Closeable {
|
|
abstract override fun channelRead(ctx: ChannelHandlerContext, msg: Any)
|
|
|
|
var character: PlayerEntity? = null
|
|
val rpc = JsonRPC()
|
|
|
|
var entityIDRange: IntRange by Delegates.notNull()
|
|
private set
|
|
|
|
var scope: CoroutineScope by Delegates.notNull()
|
|
private set
|
|
|
|
var connectionID: Int = -1
|
|
set(value) {
|
|
require(value in 1 .. ServerChannels.MAX_PLAYERS) { "Connection ID is out of range: $value" }
|
|
field = value
|
|
val begin = value * -65536
|
|
entityIDRange = begin .. begin + 65535
|
|
}
|
|
|
|
var nickname: String = ""
|
|
|
|
val hasChannel get() = ::channel.isInitialized
|
|
lateinit var channel: Channel
|
|
private set
|
|
|
|
var isLegacy: Boolean = true
|
|
protected set
|
|
|
|
private val handshakeValidator = PacketRegistry.HANDSHAKE.Validator(side)
|
|
private val handshakeSerializer = PacketRegistry.HANDSHAKE.Serializer(side)
|
|
|
|
private val nativeValidator = PacketRegistry.NATIVE.Validator(side)
|
|
private val nativeSerializer = PacketRegistry.NATIVE.Serializer(side)
|
|
|
|
private val legacyValidator = PacketRegistry.LEGACY.Validator(side)
|
|
private val legacySerializer = PacketRegistry.LEGACY.Serializer(side)
|
|
|
|
// whenever protocol was negotiated
|
|
var isConnected = false
|
|
private set
|
|
|
|
// whenever successfully joined the game
|
|
var isReady = false
|
|
protected set
|
|
|
|
open fun setupLegacy() {
|
|
isConnected = true
|
|
LOGGER.info("Handshake successful on ${channel.remoteAddress()}, channel is using legacy protocol")
|
|
|
|
if (type == ConnectionType.MEMORY) {
|
|
channel.pipeline().remove(handshakeValidator)
|
|
channel.pipeline().addFirst(legacyValidator)
|
|
} else {
|
|
channel.pipeline().remove(handshakeSerializer)
|
|
channel.pipeline().addFirst(legacySerializer)
|
|
}
|
|
|
|
isLegacy = true
|
|
}
|
|
|
|
open fun setupNative() {
|
|
isConnected = true
|
|
LOGGER.info("Handshake successful on ${channel.remoteAddress()}, channel is using native protocol")
|
|
|
|
if (type == ConnectionType.MEMORY) {
|
|
channel.pipeline().remove(handshakeValidator)
|
|
channel.pipeline().addFirst(nativeValidator)
|
|
} else {
|
|
channel.pipeline().remove(handshakeSerializer)
|
|
channel.pipeline().addFirst(nativeSerializer)
|
|
}
|
|
|
|
isLegacy = false
|
|
|
|
inGame()
|
|
}
|
|
|
|
open fun bind(channel: Channel) {
|
|
scope = CoroutineScope(channel.eventLoop().asCoroutineDispatcher() + SupervisorJob() + CoroutineExceptionHandler { coroutineContext, throwable ->
|
|
disconnect("Uncaught exception in one of connection' coroutines: $throwable")
|
|
LOGGER.fatal("Uncaught exception in one of $this coroutines", throwable)
|
|
})
|
|
|
|
channel.config().setOption(ChannelOption.TCP_NODELAY, true)
|
|
this.channel = channel
|
|
|
|
if (type == ConnectionType.MEMORY)
|
|
channel.pipeline().addFirst(handshakeValidator)
|
|
else
|
|
channel.pipeline().addFirst(handshakeSerializer)
|
|
|
|
channel.pipeline().addLast(this)
|
|
|
|
channel.closeFuture().addListener {
|
|
isConnected = false
|
|
LOGGER.info("$channel is closed")
|
|
scope.cancel("$channel is closed")
|
|
|
|
disconnect("Connection closed")
|
|
}
|
|
}
|
|
|
|
// one connection can be reused for second channel (while first is being closed)
|
|
override fun isSharable(): Boolean {
|
|
return true
|
|
}
|
|
|
|
override fun ensureNotSharable() {
|
|
|
|
}
|
|
|
|
open fun inGame() {
|
|
isReady = true
|
|
}
|
|
|
|
fun send(packet: IPacket, flush: Boolean = true) {
|
|
if (channel.isOpen && isConnected) {
|
|
channel.write(packet)
|
|
if (flush) channel.flush()
|
|
}
|
|
}
|
|
|
|
abstract fun disconnect(reason: String = "")
|
|
|
|
override fun close() {
|
|
LOGGER.info("Terminating $this")
|
|
channel.close()
|
|
}
|
|
|
|
// global variables (per connection)
|
|
// clientside variables
|
|
val client2serverGroup = MasterElement(NetworkedGroup())
|
|
var windowXMin by client2serverGroup.upstream.add(networkedSignedInt())
|
|
var windowYMin by client2serverGroup.upstream.add(networkedSignedInt())
|
|
var windowWidth by client2serverGroup.upstream.add(networkedSignedInt())
|
|
var windowHeight by client2serverGroup.upstream.add(networkedSignedInt())
|
|
var playerID by client2serverGroup.upstream.add(networkedSignedInt())
|
|
val clientSpectatingEntities = NetworkedList(IntValueCodec, elementsFactory = ::IntArrayList).also { client2serverGroup.upstream.add(it) }
|
|
|
|
// serverside variables
|
|
val server2clientGroup = MasterElement(NetworkedGroup())
|
|
var orbitalWarpAction by server2clientGroup.upstream.add(networkedData(KOptional(), warpActionCodec, legacyWarpActionCodec))
|
|
var worldID by server2clientGroup.upstream.add(networkedData(WorldID.Limbo, WorldID.CODEC, WorldID.LEGACY_CODEC))
|
|
var isAdmin by server2clientGroup.upstream.add(networkedBoolean())
|
|
var team by server2clientGroup.upstream.add(networkedData(EntityDamageTeam.FRIENDLY, EntityDamageTeam.CODEC, EntityDamageTeam.LEGACY_CODEC))
|
|
var shipUpgrades by server2clientGroup.upstream.add(networkedData(ShipUpgrades(), ShipUpgrades.CODEC, ShipUpgrades.LEGACY_CODEC))
|
|
var shipCoordinate by server2clientGroup.upstream.add(networkedData(UniversePos(), UniversePos.CODEC, UniversePos.LEGACY_CODEC))
|
|
|
|
var playerEntity: PlayerEntity? = null
|
|
|
|
// in tiles
|
|
fun trackingTileRegions(): List<AABBi> {
|
|
val result = ArrayList<AABBi>()
|
|
|
|
var mins = Vector2i(windowXMin - Globals.client.windowMonitoringBorder, windowYMin - Globals.client.windowMonitoringBorder)
|
|
var maxs = Vector2i(windowWidth + Globals.client.windowMonitoringBorder * 2, windowHeight + Globals.client.windowMonitoringBorder * 2)
|
|
|
|
if (maxs.x - mins.x > 1000) {
|
|
// holy shit
|
|
val middle = (maxs.x - mins.x) / 2
|
|
mins = mins.copy(x = middle - 500)
|
|
maxs = maxs.copy(x = middle + 500)
|
|
}
|
|
|
|
if (maxs.y - mins.y > 1000) {
|
|
// holy shit
|
|
val middle = (maxs.y - mins.y) / 2
|
|
mins = mins.copy(y = middle - 500)
|
|
maxs = maxs.copy(y = middle + 500)
|
|
}
|
|
|
|
val window = AABBi(mins, maxs + mins)
|
|
|
|
if (window.mins != Vector2i.ZERO && window.maxs != Vector2i.ZERO) {
|
|
result.add(window)
|
|
}
|
|
|
|
val playerEntity = playerEntity
|
|
|
|
if (playerEntity != null) {
|
|
// add an extra region the size of the window centered on the player's position to prevent nearby sectors
|
|
// being unloaded due to camera panning or centering on other entities
|
|
val diff = Vector2d(window.width / 2.0, window.height / 2.0)
|
|
|
|
val pmins = playerEntity.position - diff
|
|
val pmaxs = playerEntity.position + diff
|
|
|
|
result.add(AABBi(
|
|
Vector2i(pmins.x.roundToInt(), pmins.y.roundToInt()),
|
|
Vector2i(pmaxs.x.roundToInt(), pmaxs.y.roundToInt()),
|
|
))
|
|
}
|
|
|
|
for (entity in clientSpectatingEntities) {
|
|
// TODO
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
companion object {
|
|
const val SERVER_CONNECTION_ID = 0
|
|
|
|
private val LOGGER = LogManager.getLogger()
|
|
|
|
private val warpActionCodec = StreamCodec.Pair(WarpAction.CODEC, WarpMode.CODEC).koptional()
|
|
private val legacyWarpActionCodec = StreamCodec.Pair(WarpAction.LEGACY_CODEC, WarpMode.CODEC).koptional()
|
|
|
|
fun connectionForEntityID(id: Int): Int {
|
|
if (id >= 0) {
|
|
return 0
|
|
} else {
|
|
return (-id - 1) / 65536 + 1
|
|
}
|
|
}
|
|
|
|
val NIO_POOL by lazy {
|
|
NioEventLoopGroup(1, ThreadFactoryBuilder().setDaemon(true).setNameFormat("Network IO %d").build())
|
|
}
|
|
}
|
|
}
|