From 24fdf7ab700d49de66d57b42dac1466e169fca5d Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Tue, 2 Apr 2024 20:38:03 +0700 Subject: [PATCH] Coroutine scope for connection --- .../kstarbound/network/Connection.kt | 7 ++ .../kstarbound/server/ServerConnection.kt | 85 +++++++++---------- .../kstarbound/server/world/ServerWorld.kt | 3 + 3 files changed, 52 insertions(+), 43 deletions(-) diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/network/Connection.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/network/Connection.kt index 56c701a9..78c6ad9d 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/network/Connection.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/network/Connection.kt @@ -7,6 +7,8 @@ import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.channel.ChannelOption import io.netty.channel.nio.NioEventLoopGroup import it.unimi.dsi.fastutil.ints.IntAVLTreeSet +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.cancel import org.apache.logging.log4j.LogManager import ru.dbotthepony.kommons.io.StreamCodec import ru.dbotthepony.kommons.io.VarIntValueCodec @@ -18,6 +20,7 @@ import ru.dbotthepony.kommons.util.setValue import ru.dbotthepony.kommons.vector.Vector2d import ru.dbotthepony.kommons.vector.Vector2i import ru.dbotthepony.kstarbound.GlobalDefaults +import ru.dbotthepony.kstarbound.Starbound import ru.dbotthepony.kstarbound.defs.WarpAction import ru.dbotthepony.kstarbound.defs.EntityDamageTeam import ru.dbotthepony.kstarbound.defs.WarpMode @@ -47,6 +50,8 @@ abstract class Connection(val side: ConnectionSide, val type: ConnectionType) : var entityIDRange: IntRange by Delegates.notNull() private set + protected val coroutineScope = CoroutineScope(Starbound.COROUTINE_EXECUTOR) + var connectionID: Int = -1 set(value) { require(value in 1 .. ServerChannels.MAX_PLAYERS) { "Connection ID is out of range: $value" } @@ -109,7 +114,9 @@ abstract class Connection(val side: ConnectionSide, val type: ConnectionType) : } protected open fun onChannelClosed() { + isConnected = false LOGGER.info("$this is terminated") + coroutineScope.cancel("$this is terminated") } fun bind(channel: Channel) { diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt index 2b50d25d..f83db693 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/ServerConnection.kt @@ -3,6 +3,9 @@ package ru.dbotthepony.kstarbound.server import com.google.gson.JsonObject import io.netty.channel.ChannelHandlerContext import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.future.await +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import ru.dbotthepony.kommons.io.ByteKey import ru.dbotthepony.kommons.util.KOptional @@ -105,6 +108,7 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn playerEntity = null super.onChannelClosed() + warpQueue.close() server.channels.freeConnectionID(connectionID) server.channels.connections.remove(this) server.freeNickname(nickname) @@ -121,12 +125,45 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn } } - private var warpingAllowed = false - private var pendingWarp: Pair? = null - private var currentWarpStatus: CompletableFuture<*>? = null + private val warpQueue = Channel>(capacity = 10) + + private suspend fun handleWarps() { + while (true) { + val (request, deploy) = warpQueue.receive() + + LOGGER.info("Trying to warp $this to $request") + + val resolve = request.resolve(this) + + if (resolve.isLimbo) { + send(PlayerWarpResultPacket(false, request, true)) + } else if (tracker?.world?.worldID == resolve) { + LOGGER.info("$this tried to warp into world they are already in.") + send(PlayerWarpResultPacket(true, request, false)) + } else { + val world = server.worlds[resolve] + + if (world == null) { + send(PlayerWarpResultPacket(false, request, false)) + } else { + try { + world.acceptClient(this, request).await() + } catch (err: Throwable) { + send(PlayerWarpResultPacket(false, request, false)) + + if (world == shipWorld) { + disconnect("ShipWorld refused to accept its owner: $err") + } else { + enqueueWarp(returnWarp ?: WarpAlias.OwnShip) + } + } + } + } + } + } fun enqueueWarp(destination: WarpAction, deploy: Boolean = false) { - pendingWarp = destination to deploy + warpQueue.trySend(destination to deploy) } fun tick() { @@ -134,44 +171,6 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn return flush() - - if (currentWarpStatus?.isDone == true) - currentWarpStatus = null - - if (currentWarpStatus == null && warpingAllowed) { - val pendingWarp = pendingWarp - this.pendingWarp = null - - if (pendingWarp != null) { - val (request, deploy) = pendingWarp - LOGGER.info("Trying to warp $this to $request") - - val resolve = request.resolve(this) - - if (resolve.isLimbo) { - send(PlayerWarpResultPacket(false, request, true)) - } else if (tracker?.world?.worldID == resolve) { - LOGGER.info("$this tried to warp into world they are already in.") - send(PlayerWarpResultPacket(true, request, false)) - } else { - val world = server.worlds[resolve] - - if (world == null) { - send(PlayerWarpResultPacket(false, request, false)) - } else { - currentWarpStatus = world.acceptClient(this, request).exceptionally { - send(PlayerWarpResultPacket(false, request, false)) - - if (world == shipWorld) { - disconnect("ShipWorld refused to accept its owner: $it") - } else { - enqueueWarp(returnWarp ?: WarpAlias.OwnShip) - } - } - } - } - } - } } private var announcedDisconnect = true @@ -242,7 +241,7 @@ class ServerConnection(val server: StarboundServer, type: ConnectionType) : Conn shipWorld = it shipWorld.thread.start() enqueueWarp(WarpAlias.OwnShip) - warpingAllowed = true + coroutineScope.launch { handleWarps() } if (server.channels.connections.size > 1) { enqueueWarp(WarpAction.Player(server.channels.connections.first().uuid!!)) diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorld.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorld.kt index 209e17a8..75a4da9e 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorld.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/server/world/ServerWorld.kt @@ -75,6 +75,9 @@ class ServerWorld private constructor( if (clients.any { it.client == client }) throw IllegalStateException("$client is already in $this") + if (!client.isConnected) + throw IllegalStateException("$client disconnected while joining $this") + val start = if (action is WarpAction.Player) clients.firstOrNull { it.client.uuid == action.uuid }?.client?.playerEntity?.position else if (action is WarpAction.World)