From fa478e4b152ce001799d1122bfb85ebbcf3d6070 Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Sat, 29 Jul 2023 19:17:40 +0700 Subject: [PATCH] Use separate thread to dispatch OTM network packets to players --- .../dbotthepony/mc/otm/GlobalEventHandler.kt | 7 +- .../mc/otm/network/MatteryNetworkChannel.kt | 77 +++++++++++++++---- 2 files changed, 67 insertions(+), 17 deletions(-) diff --git a/src/main/kotlin/ru/dbotthepony/mc/otm/GlobalEventHandler.kt b/src/main/kotlin/ru/dbotthepony/mc/otm/GlobalEventHandler.kt index c290f27f9..f1c7c6aeb 100644 --- a/src/main/kotlin/ru/dbotthepony/mc/otm/GlobalEventHandler.kt +++ b/src/main/kotlin/ru/dbotthepony/mc/otm/GlobalEventHandler.kt @@ -22,6 +22,7 @@ import ru.dbotthepony.mc.otm.core.util.IConditionalTickable import ru.dbotthepony.mc.otm.core.util.ITickable import ru.dbotthepony.mc.otm.core.util.TickList import ru.dbotthepony.mc.otm.graph.Abstract6Graph +import ru.dbotthepony.mc.otm.network.MatteryNetworkChannel import java.util.* private val preServerTick = TickList() @@ -151,6 +152,7 @@ fun onServerTick(event: ServerTickEvent) { // чтоб не плодить кучу подписчиков, вызовем напрямую отсюда Abstract6Graph.tick() AbstractProfiledStorage.onServerPostTick() + MatteryNetworkChannel.onServerPostTick() } } @@ -270,16 +272,18 @@ fun onServerStarting(event: ServerAboutToStartEvent) { SERVER_IS_LIVE = true _server = event.server _serverThread = Thread.currentThread() + MatteryNetworkChannel.onServerStarting() } fun onServerStopping(event: ServerStoppingEvent) { clear() SERVER_IS_LIVE = false + MatteryNetworkChannel.onServerStopping() } fun onServerStopped(event: ServerStoppedEvent) { if (SERVER_IS_LIVE) { - LOGGER.fatal("ServerStoppingEvent did not fire. If server has crashed this is normal. However, if server finished it's work 'gracefully' this is a bug.") + LOGGER.fatal("ServerStoppingEvent did not fire. If server has crashed this is normal. However, if server finished it's work 'gracefully' this is a bug!") clear() SERVER_IS_LIVE = false @@ -287,4 +291,5 @@ fun onServerStopped(event: ServerStoppedEvent) { _server = null _serverThread = null + MatteryNetworkChannel.onServerStopped() } diff --git a/src/main/kotlin/ru/dbotthepony/mc/otm/network/MatteryNetworkChannel.kt b/src/main/kotlin/ru/dbotthepony/mc/otm/network/MatteryNetworkChannel.kt index 00f8ae47a..4a0fbfc06 100644 --- a/src/main/kotlin/ru/dbotthepony/mc/otm/network/MatteryNetworkChannel.kt +++ b/src/main/kotlin/ru/dbotthepony/mc/otm/network/MatteryNetworkChannel.kt @@ -6,17 +6,24 @@ import net.minecraft.resources.ResourceLocation import net.minecraft.server.level.ServerPlayer import net.minecraft.world.entity.Entity import net.minecraft.world.entity.player.Player +import net.minecraftforge.event.TickEvent +import net.minecraftforge.event.TickEvent.ServerTickEvent +import net.minecraftforge.event.server.ServerStoppedEvent +import net.minecraftforge.event.server.ServerStoppingEvent import net.minecraftforge.network.NetworkDirection import net.minecraftforge.network.NetworkEvent import net.minecraftforge.network.NetworkRegistry import net.minecraftforge.network.PacketDistributor import net.minecraftforge.network.simple.SimpleChannel +import org.apache.logging.log4j.LogManager import ru.dbotthepony.mc.otm.NULLABLE_MINECRAFT_SERVER import ru.dbotthepony.mc.otm.OverdriveThatMatters import java.math.BigDecimal import java.math.BigInteger import java.util.Optional import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.locks.LockSupport import java.util.function.BiConsumer import java.util.function.Supplier import kotlin.reflect.KClass @@ -30,18 +37,6 @@ var Supplier.packetHandled: Boolean val Supplier.sender: ServerPlayer? get() = get().sender -fun FriendlyByteBuf.writeDecimal(value: BigDecimal) { - writeInt(value.scale()) - writeByteArray(value.unscaledValue().toByteArray()) -} - -fun FriendlyByteBuf.readDecimal(): BigDecimal { - val scale = readInt() - val bytes = readByteArray() - - return BigDecimal(BigInteger(bytes), scale) -} - interface MatteryPacket { fun write(buff: FriendlyByteBuf) fun play(context: Supplier) @@ -59,7 +54,7 @@ abstract class MatteryNetworkChannel(val version: String, val name: String) { fun send(ply: Player, packet: Any) { if (ply is ServerPlayer) { - channel.send(PacketDistributor.PLAYER.with { ply }, packet) + queue.add(Task(channel, PacketDistributor.PLAYER.with { ply }, packet)) } } @@ -68,7 +63,7 @@ abstract class MatteryNetworkChannel(val version: String, val name: String) { return } - channel.send(PacketDistributor.TRACKING_ENTITY.with { entity }, packet) + queue.add(Task(channel, PacketDistributor.TRACKING_ENTITY.with { entity }, packet)) } fun sendTrackingAndSelf(entity: Entity, packet: Any) { @@ -76,10 +71,12 @@ abstract class MatteryNetworkChannel(val version: String, val name: String) { return } - channel.send(PacketDistributor.TRACKING_ENTITY_AND_SELF.with { entity }, packet) + queue.add(Task(channel, PacketDistributor.TRACKING_ENTITY_AND_SELF.with { entity }, packet)) } - fun send(distributor: PacketDistributor.PacketTarget, packet: Any) = channel.send(distributor, packet) + fun send(distributor: PacketDistributor.PacketTarget, packet: Any) { + queue.add(Task(channel, distributor, packet)) + } private var nextNetworkPacketID = 0 @@ -113,4 +110,52 @@ abstract class MatteryNetworkChannel(val version: String, val name: String) { ) { add(packetClass.java, MatteryPacket::write, reader, MatteryPacket::play, direction) } + + private data class Task(val channel: SimpleChannel, val target: PacketDistributor.PacketTarget, val packet: Any) + + companion object { + private val logger = LogManager.getLogger() + private var thread: Thread? = null + + private val queue = ConcurrentLinkedDeque() + + @Volatile + private var interrupt = false + + private fun run() { + while (!interrupt) { + val task = queue.pollFirst() + + if (task == null) { + LockSupport.park() + } else { + try { + task.channel.send(task.target, task.packet) + } catch(err: Throwable) { + logger.error("Error executing network dispatcher task", err) + } + } + } + + logger.debug("Overdrive That Matters Packet Dispatcher thread exited gracefully") + } + + internal fun onServerPostTick() { + LockSupport.unpark(thread) + } + + internal fun onServerStarting() { + interrupt = false + check(thread == null) { "Already having network dispatcher thread, ServerStartingEvent was fired twice!" } + thread = Thread(this::run, "Overdrive That Matters Network Dispatcher").also { it.start() } + } + + internal fun onServerStopping() { + interrupt = true + } + + internal fun onServerStopped() { + interrupt = true + } + } }