Coroutine scope for connection

This commit is contained in:
DBotThePony 2024-04-02 20:38:03 +07:00
parent 209c1a5776
commit 24fdf7ab70
Signed by: DBot
GPG Key ID: DCC23B5715498507
3 changed files with 52 additions and 43 deletions

View File

@ -7,6 +7,8 @@ import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelOption
import io.netty.channel.nio.NioEventLoopGroup
import it.unimi.dsi.fastutil.ints.IntAVLTreeSet
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import org.apache.logging.log4j.LogManager
import ru.dbotthepony.kommons.io.StreamCodec
import ru.dbotthepony.kommons.io.VarIntValueCodec
@ -18,6 +20,7 @@ import ru.dbotthepony.kommons.util.setValue
import ru.dbotthepony.kommons.vector.Vector2d
import ru.dbotthepony.kommons.vector.Vector2i
import ru.dbotthepony.kstarbound.GlobalDefaults
import ru.dbotthepony.kstarbound.Starbound
import ru.dbotthepony.kstarbound.defs.WarpAction
import ru.dbotthepony.kstarbound.defs.EntityDamageTeam
import ru.dbotthepony.kstarbound.defs.WarpMode
@ -47,6 +50,8 @@ abstract class Connection(val side: ConnectionSide, val type: ConnectionType) :
var entityIDRange: IntRange by Delegates.notNull()
private set
protected val coroutineScope = CoroutineScope(Starbound.COROUTINE_EXECUTOR)
var connectionID: Int = -1
set(value) {
require(value in 1 .. ServerChannels.MAX_PLAYERS) { "Connection ID is out of range: $value" }
@ -109,7 +114,9 @@ abstract class Connection(val side: ConnectionSide, val type: ConnectionType) :
}
protected open fun onChannelClosed() {
isConnected = false
LOGGER.info("$this is terminated")
coroutineScope.cancel("$this is terminated")
}
fun bind(channel: Channel) {

View File

@ -3,6 +3,9 @@ package ru.dbotthepony.kstarbound.server
import com.google.gson.JsonObject
import io.netty.channel.ChannelHandlerContext
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import ru.dbotthepony.kommons.io.ByteKey
import ru.dbotthepony.kommons.util.KOptional
@ -105,6 +108,7 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn
playerEntity = null
super.onChannelClosed()
warpQueue.close()
server.channels.freeConnectionID(connectionID)
server.channels.connections.remove(this)
server.freeNickname(nickname)
@ -121,12 +125,45 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn
}
}
private var warpingAllowed = false
private var pendingWarp: Pair<WarpAction, Boolean>? = null
private var currentWarpStatus: CompletableFuture<*>? = null
private val warpQueue = Channel<Pair<WarpAction, Boolean>>(capacity = 10)
private suspend fun handleWarps() {
while (true) {
val (request, deploy) = warpQueue.receive()
LOGGER.info("Trying to warp $this to $request")
val resolve = request.resolve(this)
if (resolve.isLimbo) {
send(PlayerWarpResultPacket(false, request, true))
} else if (tracker?.world?.worldID == resolve) {
LOGGER.info("$this tried to warp into world they are already in.")
send(PlayerWarpResultPacket(true, request, false))
} else {
val world = server.worlds[resolve]
if (world == null) {
send(PlayerWarpResultPacket(false, request, false))
} else {
try {
world.acceptClient(this, request).await()
} catch (err: Throwable) {
send(PlayerWarpResultPacket(false, request, false))
if (world == shipWorld) {
disconnect("ShipWorld refused to accept its owner: $err")
} else {
enqueueWarp(returnWarp ?: WarpAlias.OwnShip)
}
}
}
}
}
}
fun enqueueWarp(destination: WarpAction, deploy: Boolean = false) {
pendingWarp = destination to deploy
warpQueue.trySend(destination to deploy)
}
fun tick() {
@ -134,44 +171,6 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn
return
flush()
if (currentWarpStatus?.isDone == true)
currentWarpStatus = null
if (currentWarpStatus == null && warpingAllowed) {
val pendingWarp = pendingWarp
this.pendingWarp = null
if (pendingWarp != null) {
val (request, deploy) = pendingWarp
LOGGER.info("Trying to warp $this to $request")
val resolve = request.resolve(this)
if (resolve.isLimbo) {
send(PlayerWarpResultPacket(false, request, true))
} else if (tracker?.world?.worldID == resolve) {
LOGGER.info("$this tried to warp into world they are already in.")
send(PlayerWarpResultPacket(true, request, false))
} else {
val world = server.worlds[resolve]
if (world == null) {
send(PlayerWarpResultPacket(false, request, false))
} else {
currentWarpStatus = world.acceptClient(this, request).exceptionally {
send(PlayerWarpResultPacket(false, request, false))
if (world == shipWorld) {
disconnect("ShipWorld refused to accept its owner: $it")
} else {
enqueueWarp(returnWarp ?: WarpAlias.OwnShip)
}
}
}
}
}
}
}
private var announcedDisconnect = true
@ -242,7 +241,7 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn
shipWorld = it
shipWorld.thread.start()
enqueueWarp(WarpAlias.OwnShip)
warpingAllowed = true
coroutineScope.launch { handleWarps() }
if (server.channels.connections.size > 1) {
enqueueWarp(WarpAction.Player(server.channels.connections.first().uuid!!))

View File

@ -75,6 +75,9 @@ class ServerWorld private constructor(
if (clients.any { it.client == client })
throw IllegalStateException("$client is already in $this")
if (!client.isConnected)
throw IllegalStateException("$client disconnected while joining $this")
val start = if (action is WarpAction.Player)
clients.firstOrNull { it.client.uuid == action.uuid }?.client?.playerEntity?.position
else if (action is WarpAction.World)