Use separate thread to dispatch OTM network packets to players

This commit is contained in:
DBotThePony 2023-07-29 19:17:40 +07:00
parent e57844bc1a
commit fa478e4b15
Signed by: DBot
GPG Key ID: DCC23B5715498507
2 changed files with 67 additions and 17 deletions

View File

@ -22,6 +22,7 @@ import ru.dbotthepony.mc.otm.core.util.IConditionalTickable
import ru.dbotthepony.mc.otm.core.util.ITickable
import ru.dbotthepony.mc.otm.core.util.TickList
import ru.dbotthepony.mc.otm.graph.Abstract6Graph
import ru.dbotthepony.mc.otm.network.MatteryNetworkChannel
import java.util.*
private val preServerTick = TickList()
@ -151,6 +152,7 @@ fun onServerTick(event: ServerTickEvent) {
// чтоб не плодить кучу подписчиков, вызовем напрямую отсюда
Abstract6Graph.tick()
AbstractProfiledStorage.onServerPostTick()
MatteryNetworkChannel.onServerPostTick()
}
}
@ -270,16 +272,18 @@ fun onServerStarting(event: ServerAboutToStartEvent) {
SERVER_IS_LIVE = true
_server = event.server
_serverThread = Thread.currentThread()
MatteryNetworkChannel.onServerStarting()
}
fun onServerStopping(event: ServerStoppingEvent) {
clear()
SERVER_IS_LIVE = false
MatteryNetworkChannel.onServerStopping()
}
fun onServerStopped(event: ServerStoppedEvent) {
if (SERVER_IS_LIVE) {
LOGGER.fatal("ServerStoppingEvent did not fire. If server has crashed this is normal. However, if server finished it's work 'gracefully' this is a bug.")
LOGGER.fatal("ServerStoppingEvent did not fire. If server has crashed this is normal. However, if server finished it's work 'gracefully' this is a bug!")
clear()
SERVER_IS_LIVE = false
@ -287,4 +291,5 @@ fun onServerStopped(event: ServerStoppedEvent) {
_server = null
_serverThread = null
MatteryNetworkChannel.onServerStopped()
}

View File

@ -6,17 +6,24 @@ import net.minecraft.resources.ResourceLocation
import net.minecraft.server.level.ServerPlayer
import net.minecraft.world.entity.Entity
import net.minecraft.world.entity.player.Player
import net.minecraftforge.event.TickEvent
import net.minecraftforge.event.TickEvent.ServerTickEvent
import net.minecraftforge.event.server.ServerStoppedEvent
import net.minecraftforge.event.server.ServerStoppingEvent
import net.minecraftforge.network.NetworkDirection
import net.minecraftforge.network.NetworkEvent
import net.minecraftforge.network.NetworkRegistry
import net.minecraftforge.network.PacketDistributor
import net.minecraftforge.network.simple.SimpleChannel
import org.apache.logging.log4j.LogManager
import ru.dbotthepony.mc.otm.NULLABLE_MINECRAFT_SERVER
import ru.dbotthepony.mc.otm.OverdriveThatMatters
import java.math.BigDecimal
import java.math.BigInteger
import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.locks.LockSupport
import java.util.function.BiConsumer
import java.util.function.Supplier
import kotlin.reflect.KClass
@ -30,18 +37,6 @@ var Supplier<NetworkEvent.Context>.packetHandled: Boolean
val Supplier<NetworkEvent.Context>.sender: ServerPlayer?
get() = get().sender
fun FriendlyByteBuf.writeDecimal(value: BigDecimal) {
writeInt(value.scale())
writeByteArray(value.unscaledValue().toByteArray())
}
fun FriendlyByteBuf.readDecimal(): BigDecimal {
val scale = readInt()
val bytes = readByteArray()
return BigDecimal(BigInteger(bytes), scale)
}
interface MatteryPacket {
fun write(buff: FriendlyByteBuf)
fun play(context: Supplier<NetworkEvent.Context>)
@ -59,7 +54,7 @@ abstract class MatteryNetworkChannel(val version: String, val name: String) {
fun send(ply: Player, packet: Any) {
if (ply is ServerPlayer) {
channel.send(PacketDistributor.PLAYER.with { ply }, packet)
queue.add(Task(channel, PacketDistributor.PLAYER.with { ply }, packet))
}
}
@ -68,7 +63,7 @@ abstract class MatteryNetworkChannel(val version: String, val name: String) {
return
}
channel.send(PacketDistributor.TRACKING_ENTITY.with { entity }, packet)
queue.add(Task(channel, PacketDistributor.TRACKING_ENTITY.with { entity }, packet))
}
fun sendTrackingAndSelf(entity: Entity, packet: Any) {
@ -76,10 +71,12 @@ abstract class MatteryNetworkChannel(val version: String, val name: String) {
return
}
channel.send(PacketDistributor.TRACKING_ENTITY_AND_SELF.with { entity }, packet)
queue.add(Task(channel, PacketDistributor.TRACKING_ENTITY_AND_SELF.with { entity }, packet))
}
fun send(distributor: PacketDistributor.PacketTarget, packet: Any) = channel.send(distributor, packet)
fun send(distributor: PacketDistributor.PacketTarget, packet: Any) {
queue.add(Task(channel, distributor, packet))
}
private var nextNetworkPacketID = 0
@ -113,4 +110,52 @@ abstract class MatteryNetworkChannel(val version: String, val name: String) {
) {
add(packetClass.java, MatteryPacket::write, reader, MatteryPacket::play, direction)
}
private data class Task(val channel: SimpleChannel, val target: PacketDistributor.PacketTarget, val packet: Any)
companion object {
private val logger = LogManager.getLogger()
private var thread: Thread? = null
private val queue = ConcurrentLinkedDeque<Task>()
@Volatile
private var interrupt = false
private fun run() {
while (!interrupt) {
val task = queue.pollFirst()
if (task == null) {
LockSupport.park()
} else {
try {
task.channel.send(task.target, task.packet)
} catch(err: Throwable) {
logger.error("Error executing network dispatcher task", err)
}
}
}
logger.debug("Overdrive That Matters Packet Dispatcher thread exited gracefully")
}
internal fun onServerPostTick() {
LockSupport.unpark(thread)
}
internal fun onServerStarting() {
interrupt = false
check(thread == null) { "Already having network dispatcher thread, ServerStartingEvent was fired twice!" }
thread = Thread(this::run, "Overdrive That Matters Network Dispatcher").also { it.start() }
}
internal fun onServerStopping() {
interrupt = true
}
internal fun onServerStopped() {
interrupt = true
}
}
}