Remove separate packet dispatcher channel, since "low" performance of netty dispatcher is caused by underlying socket wakeup() call (which interrupts wait() call on event loop thread), and it shouldn't be a performance issue
This commit is contained in:
parent
e63cce63ff
commit
2006e22e40
@ -387,11 +387,7 @@ abstract class MatteryMenu(
|
||||
val payload = synchronizerRemote.write()
|
||||
|
||||
if (payload != null) {
|
||||
if (broadcastOnce) {
|
||||
MenuNetworkChannel.send(player, MenuFieldPacket(containerId, payload))
|
||||
} else {
|
||||
MenuNetworkChannel.sendNow(player, MenuFieldPacket(containerId, payload))
|
||||
}
|
||||
MenuNetworkChannel.send(player, MenuFieldPacket(containerId, payload))
|
||||
}
|
||||
|
||||
broadcastOnce = true
|
||||
|
@ -48,12 +48,6 @@ abstract class MatteryNetworkChannel(val version: Int, val name: String) {
|
||||
fun sendToServer(packet: Any) = channel.send(packet, PacketDistributor.SERVER.noArg())
|
||||
|
||||
fun send(ply: Player, packet: Any) {
|
||||
if (ply is ServerPlayer) {
|
||||
queue.add(Task(channel, PacketDistributor.PLAYER.with(ply), packet))
|
||||
}
|
||||
}
|
||||
|
||||
fun sendNow(ply: Player, packet: Any) {
|
||||
if (ply is ServerPlayer) {
|
||||
channel.send(packet, PacketDistributor.PLAYER.with(ply))
|
||||
}
|
||||
@ -64,7 +58,7 @@ abstract class MatteryNetworkChannel(val version: Int, val name: String) {
|
||||
return
|
||||
}
|
||||
|
||||
queue.add(Task(channel, PacketDistributor.TRACKING_ENTITY.with(entity), packet))
|
||||
channel.send(packet, PacketDistributor.TRACKING_ENTITY.with(entity))
|
||||
}
|
||||
|
||||
fun sendTrackingAndSelf(entity: Entity, packet: Any) {
|
||||
@ -72,14 +66,10 @@ abstract class MatteryNetworkChannel(val version: Int, val name: String) {
|
||||
return
|
||||
}
|
||||
|
||||
queue.add(Task(channel, PacketDistributor.TRACKING_ENTITY_AND_SELF.with(entity), packet))
|
||||
channel.send(packet, PacketDistributor.TRACKING_ENTITY_AND_SELF.with(entity))
|
||||
}
|
||||
|
||||
fun send(distributor: PacketDistributor.PacketTarget, packet: Any) {
|
||||
queue.add(Task(channel, distributor, packet))
|
||||
}
|
||||
|
||||
fun sendNow(distributor: PacketDistributor.PacketTarget, packet: Any) {
|
||||
channel.send(packet, distributor)
|
||||
}
|
||||
|
||||
@ -128,65 +118,4 @@ abstract class MatteryNetworkChannel(val version: Int, val name: String) {
|
||||
) {
|
||||
add(packetClass.java, MatteryPacket::write, reader, MatteryPacket::play, direction, handleOnMainThread)
|
||||
}
|
||||
|
||||
private data class Task(val channel: SimpleChannel, val target: PacketDistributor.PacketTarget, val packet: Any)
|
||||
|
||||
companion object {
|
||||
private val logger = LogManager.getLogger()
|
||||
private var thread: Thread? = null
|
||||
|
||||
private val queue = ConcurrentLinkedQueue<Task>()
|
||||
|
||||
@Volatile
|
||||
private var interrupt = false
|
||||
|
||||
/**
|
||||
* Reason behind separate thread is that Minecraft connection class does not allow to enqueue messages,
|
||||
* nor does Forge. This leads to extensive calls to Netty Channel eventLoop().execute(), which is somehow resource
|
||||
* heavy, as it calls wakeup on Windows, eventFdWrite on Linux, etc., when all we want to do is to enqueue packet.
|
||||
*
|
||||
* Concurrent safety - Minecraft Channel (send method) does not do any writes (unless player is unconnected, in which case it uses thread safe queue), hence it is thread safe.
|
||||
* OTM packets are always dispatched in correct order to client.
|
||||
* Netty eventLoop executor is thread safe by definition.
|
||||
*/
|
||||
private fun run() {
|
||||
while (!interrupt) {
|
||||
val task = queue.poll()
|
||||
|
||||
if (task == null) {
|
||||
LockSupport.park()
|
||||
} else {
|
||||
try {
|
||||
task.channel.send(task.packet, task.target)
|
||||
} catch(err: Throwable) {
|
||||
logger.error("Error executing network dispatcher task", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug("Overdrive That Matters Packet Dispatcher thread exited gracefully")
|
||||
}
|
||||
|
||||
internal fun onServerPostTick() {
|
||||
if (queue.isNotEmpty()) {
|
||||
LockSupport.unpark(thread)
|
||||
}
|
||||
}
|
||||
|
||||
internal fun onServerStarting() {
|
||||
interrupt = false
|
||||
check(thread?.isAlive != true) { "Already having network dispatcher thread, ServerStartingEvent was fired twice!" }
|
||||
thread = Thread(this::run, "Overdrive That Matters Network Dispatcher").also { it.isDaemon = true; it.start() }
|
||||
}
|
||||
|
||||
internal fun onServerStopping() {
|
||||
interrupt = true
|
||||
LockSupport.unpark(thread)
|
||||
}
|
||||
|
||||
internal fun onServerStopped() {
|
||||
interrupt = true
|
||||
LockSupport.unpark(thread)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user