Update chunk map subscribers implementation

This commit is contained in:
DBotThePony 2023-08-17 13:58:01 +07:00
parent 90348d5789
commit 66c356bd41
Signed by: DBot
GPG Key ID: DCC23B5715498507
5 changed files with 106 additions and 206 deletions

View File

@ -15,7 +15,6 @@ import net.minecraftforge.fml.event.lifecycle.FMLCommonSetupEvent;
import net.minecraftforge.fml.javafmlmod.FMLJavaModLoadingContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import ru.dbotthepony.mc.otm.android.AndroidResearchManager;
import ru.dbotthepony.mc.otm.android.feature.EnderTeleporterFeature;
import ru.dbotthepony.mc.otm.block.entity.MatteryBlockEntity;
@ -47,7 +46,6 @@ import ru.dbotthepony.mc.otm.config.MachinesConfig;
import ru.dbotthepony.mc.otm.config.ServerCompatConfig;
import ru.dbotthepony.mc.otm.config.ServerConfig;
import ru.dbotthepony.mc.otm.config.ToolsConfig;
import ru.dbotthepony.mc.otm.core.math.Decimal;
import ru.dbotthepony.mc.otm.item.tool.ExplosiveHammerItem;
import ru.dbotthepony.mc.otm.item.armor.TritaniumArmorItem;
import ru.dbotthepony.mc.otm.item.QuantumBatteryItem;
@ -56,14 +54,12 @@ import ru.dbotthepony.mc.otm.item.PortableCondensationDriveItem;
import ru.dbotthepony.mc.otm.matter.MatterManager;
import ru.dbotthepony.mc.otm.network.*;
import ru.dbotthepony.mc.otm.registry.*;
import ru.dbotthepony.mc.otm.storage.*;
import ru.dbotthepony.mc.otm.triggers.KillAsAndroidTrigger;
import top.theillusivec4.curios.api.CuriosApi;
import static net.minecraftforge.common.MinecraftForge.EVENT_BUS;
import javax.annotation.ParametersAreNonnullByDefault;
import java.util.Objects;
// The value here should match an entry in the META-INF/mods.toml file
@Mod(OverdriveThatMatters.MOD_ID)
@ -147,7 +143,7 @@ public final class OverdriveThatMatters {
EVENT_BUS.addListener(EventPriority.HIGHEST, GlobalEventHandlerKt::onServerStopped);
EVENT_BUS.addListener(EventPriority.HIGHEST, GlobalEventHandlerKt::onServerStopping);
EVENT_BUS.addListener(EventPriority.HIGHEST, GlobalEventHandlerKt::onServerStarting);
EVENT_BUS.addListener(EventPriority.LOWEST, GlobalEventHandlerKt::onWorldTick);
EVENT_BUS.addListener(EventPriority.LOWEST, GlobalEventHandlerKt::onLevelTick);
EVENT_BUS.addListener(EventPriority.LOWEST, GlobalEventHandlerKt::onServerTick);
EVENT_BUS.addListener(EventPriority.NORMAL, MatteryPlayerCapability.Companion::onPlayerTick);
@ -180,7 +176,6 @@ public final class OverdriveThatMatters {
EVENT_BUS.addListener(EventPriority.NORMAL, MatteryBlockEntity.Companion::onWatch);
EVENT_BUS.addListener(EventPriority.NORMAL, MatteryBlockEntity.Companion::onForget);
EVENT_BUS.addListener(EventPriority.NORMAL, MatteryBlockEntity.Companion::playerDisconnected);
EVENT_BUS.addListener(EventPriority.LOWEST, MatteryBlockEntity.Companion::postLevelTick);
EVENT_BUS.addListener(EventPriority.LOWEST, KillAsAndroidTrigger.INSTANCE::onKill);

View File

@ -15,6 +15,7 @@ import net.minecraftforge.event.server.ServerStoppedEvent
import net.minecraftforge.event.server.ServerStoppingEvent
import net.minecraftforge.fml.loading.FMLLoader
import org.apache.logging.log4j.LogManager
import ru.dbotthepony.mc.otm.block.entity.MatteryBlockEntity
import ru.dbotthepony.mc.otm.capability.AbstractProfiledStorage
import ru.dbotthepony.mc.otm.client.minecraft
import ru.dbotthepony.mc.otm.core.collect.WeakHashSet
@ -165,7 +166,7 @@ fun onServerTick(event: ServerTickEvent) {
}
}
fun onWorldTick(event: LevelTickEvent) {
fun onLevelTick(event: LevelTickEvent) {
if (event.phase === TickEvent.Phase.START) {
preWorldTick[event.level]?.tick()
@ -176,6 +177,7 @@ fun onWorldTick(event: LevelTickEvent) {
}
} else {
postWorldTick[event.level]?.tick()
MatteryBlockEntity.postLevelTick(event)
}
}

View File

@ -5,6 +5,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap
import it.unimi.dsi.fastutil.longs.Long2ObjectFunction
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap
import it.unimi.dsi.fastutil.objects.ObjectArraySet
import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenHashSet
import it.unimi.dsi.fastutil.objects.Reference2ObjectOpenHashMap
import mekanism.api.energy.IStrictEnergyHandler
import net.minecraft.core.BlockPos
@ -31,7 +32,6 @@ import net.minecraftforge.common.capabilities.ForgeCapabilities
import net.minecraftforge.common.util.INBTSerializable
import net.minecraftforge.common.util.LazyOptional
import net.minecraftforge.energy.IEnergyStorage
import net.minecraftforge.event.TickEvent
import net.minecraftforge.event.TickEvent.LevelTickEvent
import net.minecraftforge.event.entity.player.PlayerEvent
import net.minecraftforge.event.level.ChunkWatchEvent
@ -46,7 +46,6 @@ import ru.dbotthepony.mc.otm.compat.mekanism.Mattery2MekanismEnergyWrapper
import ru.dbotthepony.mc.otm.compat.mekanism.Mekanism2MatteryEnergyWrapper
import ru.dbotthepony.mc.otm.core.ISubscriptable
import ru.dbotthepony.mc.otm.core.collect.WeakHashSet
import ru.dbotthepony.mc.otm.core.forValidRefs
import ru.dbotthepony.mc.otm.core.get
import ru.dbotthepony.mc.otm.core.ifPresentK
import ru.dbotthepony.mc.otm.core.immutableList
@ -69,7 +68,6 @@ import java.util.function.Predicate
import java.util.function.Supplier
import java.util.stream.Stream
import kotlin.NoSuchElementException
import kotlin.collections.ArrayList
import kotlin.properties.ReadWriteProperty
import kotlin.reflect.KProperty
import kotlin.reflect.KProperty0
@ -120,12 +118,11 @@ abstract class MatteryBlockEntity(p_155228_: BlockEntityType<*>, p_155229_: Bloc
*/
protected val savetablesLevel = Savetables()
private var tickedOnce = false
open fun tick() {
tickList.tick()
tickedOnce = true
synchronizeToPlayers(false)
if (synchronizer.isNotEmpty)
synchronizeToPlayers(false)
}
/**
@ -537,10 +534,12 @@ abstract class MatteryBlockEntity(p_155228_: BlockEntityType<*>, p_155229_: Bloc
}
val synchronizer = FieldSynchronizer {
if (isSynchronizing || tickedOnce)
if (isSynchronizing || tickList.ticks != 0)
return@FieldSynchronizer
if (!isRemoved && level?.isClientSide == false && (_subCache == null || (_subCache ?: throw ConcurrentModificationException()).players.isNotEmpty())) {
if (isRemoved) markSynchronizerClean()
if (level?.isClientSide == false && (_subCache == null || (_subCache ?: throw ConcurrentModificationException()).players.isNotEmpty())) {
ru.dbotthepony.mc.otm.onceServer {
synchronizeToPlayers(true)
}
@ -599,6 +598,8 @@ abstract class MatteryBlockEntity(p_155228_: BlockEntityType<*>, p_155229_: Bloc
}
private var _subCache: ChunkSubscribers? = null
private val subscription get() = _subCache ?: subscribe()
private var playerListUpdated = false
private fun unsubscribe() {
val subCache = _subCache ?: return
@ -611,7 +612,7 @@ abstract class MatteryBlockEntity(p_155228_: BlockEntityType<*>, p_155229_: Bloc
}
}
lastSubscriptionChangeset = -1
playerListUpdated = false
}
private fun subscribe(): ChunkSubscribers {
@ -621,171 +622,106 @@ abstract class MatteryBlockEntity(p_155228_: BlockEntityType<*>, p_155229_: Bloc
val subs = playerMap
.computeIfAbsent(level) { Long2ObjectOpenHashMap() }
.computeIfAbsent(ChunkPos(blockPos).toLong(), Long2ObjectFunction { ChunkSubscribers(level = WeakReference(level), chunkPos = ChunkPos(blockPos).toLong()) })
.computeIfAbsent(ChunkPos(blockPos).toLong(), Long2ObjectFunction { ChunkSubscribers(level, ChunkPos(blockPos).toLong()) })
subs.subscribe(this)
_subCache = subs
playerListUpdated = true
return subs
}
private val subscription get() = _subCache ?: subscribe()
private var lastSubscriptionChangeset = -1
fun synchronizeToPlayers() {
synchronizeToPlayers(false)
}
protected fun synchronizeToPlayers(calledBySynchronizer: Boolean) {
private fun synchronizeToPlayers(calledBySynchronizer: Boolean) {
isSynchronizing = true
try {
if (synchronizer.isEmpty || isRemoved) {
if (calledBySynchronizer) synchronizer.markClean()
return
}
check(level is ServerLevel) { "Invalid realm or Level is null" }
synchronizer.observe()
val subscription = subscription
if (subscription.players.isEmpty() || lastSubscriptionChangeset == subscription.changeset && !synchronizer.hasChanges) {
if (calledBySynchronizer) synchronizer.markClean()
return
}
if (subscription.players.isNotEmpty() && (playerListUpdated || synchronizer.isDirty)) {
playerListUpdated = false
lastSubscriptionChangeset = subscription.changeset
for (player in subscription.players) {
val payload = synchronizer.computeEndpointFor(player).collectNetworkPayload()
for (player in subscription.players) {
val payload = synchronizer.computeEndpointFor(player).collectNetworkPayload()
if (payload != null) {
GenericNetworkChannel.send(player, BlockEntitySyncPacket(blockPos, payload.array, payload.length))
if (payload != null) {
GenericNetworkChannel.send(player, BlockEntitySyncPacket(blockPos, payload.array, payload.length))
}
}
}
synchronizer.markClean()
synchronizer.markClean()
} else if (calledBySynchronizer) {
synchronizer.markClean()
}
} finally {
isSynchronizing = false
}
}
private data class ChunkSubscribers(
val blockEntities: ArrayList<WeakReference<MatteryBlockEntity>> = ArrayList(0),
val players: ArrayList<ServerPlayer> = ArrayList(1),
val observingBlockEntities: ArrayList<WeakReference<MatteryBlockEntity>> = ArrayList(0),
val level: WeakReference<ServerLevel>,
val chunkPos: Long,
var changeset: Int = 0
) {
fun cleanUpBlocks() {
val listIterator = blockEntities.listIterator()
private class ChunkSubscribers(level: ServerLevel, val chunkPos: Long) {
val level = WeakReference(level)
val blockEntities = WeakHashSet<MatteryBlockEntity>(linked = true, initialCapacity = 0)
val players = ObjectLinkedOpenHashSet<ServerPlayer>(0)
val blockEntitiesWithObservers = WeakHashSet<MatteryBlockEntity>(linked = true, initialCapacity = 0)
for (block in listIterator) {
if (block.get() == null) {
listIterator.remove()
operator fun component1() = blockEntities
operator fun component2() = players
val hasObservers: Boolean get() {
return blockEntities.any { it.synchronizer.hasObservers }
}
fun subscribe(player: ServerPlayer) {
if (players.add(player)) {
blockEntities.forEach { it.playerListUpdated = true }
onceServer(10) {
if (!player.hasDisconnected() && player in players) {
blockEntities.forEach {
it.synchronizeToPlayers(false)
}
}
}
}
}
val hasObservers: Boolean get() {
val listIterator = blockEntities.listIterator()
for (value in listIterator) {
val ref = value.get()
if (ref == null) {
listIterator.remove()
} else if (ref.synchronizer.hasObservers) {
return true
fun unsubscribe(player: ServerPlayer): Boolean {
if (players.remove(player)) {
blockEntities.forEach {
it.synchronizer.removeEndpointFor(player)
}
return true
}
return false
}
fun subscribe(blockEntity: MatteryBlockEntity) {
val iterator = blockEntities.listIterator()
for (value in iterator) {
val ref = value.get()
if (ref === blockEntity) {
return
} else if (ref == null) {
iterator.remove()
}
}
val bref = WeakReference(blockEntity)
blockEntities.add(bref)
changeset++
if (!blockEntities.add(blockEntity)) return
onceServer {
blockEntity.synchronizeToPlayers()
blockEntity.synchronizeToPlayers(false)
}
if (blockEntity.synchronizer.hasObservers) {
observingBlockEntities.add(bref)
val listing = tickingMap.computeIfAbsent(level.get() ?: throw NullPointerException("Level got GCd!")) { ArrayList(1) }
val tickerIterator = listing.listIterator()
if (blockEntity.synchronizer.hasObservers && blockEntity.tickList.ticks == 0) {
blockEntitiesWithObservers.add(blockEntity)
for (value in tickerIterator) {
val ref = value.get()
if (ref === this) {
return
} else if (ref == null) {
tickerIterator.remove()
}
}
listing.add(WeakReference(this))
tickingMap
.computeIfAbsent(level.get() ?: throw NullPointerException("Level got GCd!")) { WeakHashSet(linked = true, initialCapacity = 2) }
.add(this)
}
}
fun unsubscribe(blockEntity: MatteryBlockEntity): Boolean {
val listIterator = blockEntities.listIterator()
blockEntities.remove(blockEntity)
blockEntitiesWithObservers.remove(blockEntity)
for (value in listIterator) {
if (value.get() === blockEntity) {
listIterator.remove()
changeset++
break
}
}
if (blockEntity.synchronizer.hasObservers) {
val tickerIterator = observingBlockEntities.listIterator()
for (value in tickerIterator) {
val ref = value.get()
if (ref === blockEntity) {
tickerIterator.remove()
break
} else if (ref == null) {
tickerIterator.remove()
}
}
}
if (players.isNotEmpty()) {
return false
}
cleanUpBlocks()
return blockEntities.isEmpty()
return players.isEmpty() && blockEntities.isEmpty()
}
val isEmpty: Boolean get() {
if (players.isNotEmpty()) {
return false
}
cleanUpBlocks()
return blockEntities.isEmpty()
return players.isEmpty() && blockEntities.isEmpty()
}
}
@ -813,7 +749,7 @@ abstract class MatteryBlockEntity(p_155228_: BlockEntityType<*>, p_155229_: Bloc
const val LOOT_TABLE_SEED_KEY = RandomizableContainerBlockEntity.LOOT_TABLE_SEED_TAG
private val playerMap = WeakHashMap<ServerLevel, Long2ObjectOpenHashMap<ChunkSubscribers>>()
private val tickingMap = WeakHashMap<ServerLevel, ArrayList<WeakReference<ChunkSubscribers>>>()
private val tickingMap = WeakHashMap<ServerLevel, WeakHashSet<ChunkSubscribers>>()
private val vec2Dir = Int2ObjectOpenHashMap<Direction>()
@ -860,7 +796,7 @@ abstract class MatteryBlockEntity(p_155228_: BlockEntityType<*>, p_155229_: Bloc
}
init {
for (dir in Direction.values()) {
for (dir in Direction.entries) {
vec2Dir[vecKey(dir.normal)] = dir
}
}
@ -877,87 +813,47 @@ abstract class MatteryBlockEntity(p_155228_: BlockEntityType<*>, p_155229_: Bloc
}
fun postLevelTick(event: LevelTickEvent) {
if (event.phase == TickEvent.Phase.END) {
val level = event.level as? ServerLevel ?: return
val listing = tickingMap[level] ?: return
val listIterator = listing.listIterator()
var hitAnyObservers = false
val level = event.level as? ServerLevel ?: return
val ticking = tickingMap[level] ?: return
for (value in listIterator) {
val ref = value.get()
ticking.removeIf {
val shouldRemove = it.blockEntitiesWithObservers.isEmpty()
if (ref == null) {
listIterator.remove()
} else if (ref.players.isNotEmpty()) {
var hitObservers = false
ref.observingBlockEntities.forValidRefs {
hitObservers = true
hitAnyObservers = true
it.synchronizeToPlayers()
}
if (!hitObservers) {
listIterator.remove()
}
if (!shouldRemove && it.players.isNotEmpty()) {
it.blockEntitiesWithObservers.forEach {
it.synchronizeToPlayers(false)
}
}
if (!hitAnyObservers) {
tickingMap.remove(level)
}
shouldRemove
}
if (ticking.isEmpty()) {
tickingMap.remove(level)
}
}
fun onWatch(event: ChunkWatchEvent.Watch) {
playerMap
.computeIfAbsent(event.level) { Long2ObjectOpenHashMap() }
.computeIfAbsent(event.pos.toLong(), Long2ObjectFunction { ChunkSubscribers(level = WeakReference(event.level), chunkPos = event.pos.toLong()) })
.let {
val (blocks, players) = it
if (event.player !in players) {
players.add(event.player)
it.changeset++
onceServer(10) {
if (!event.player.hasDisconnected() && event.player in players) {
blocks.forValidRefs {
it.synchronizeToPlayers(false)
}
}
}
}
}
.computeIfAbsent(event.pos.toLong(), Long2ObjectFunction { ChunkSubscribers(event.level, it) })
.subscribe(event.player)
}
fun onForget(event: ChunkWatchEvent.UnWatch) {
val subs = playerMap.get(event.level)?.get(event.pos.toLong()) ?: return
val levelMap = playerMap[event.level] ?: return
val subs = levelMap.get(event.pos.toLong()) ?: return
if (subs.players.remove(event.player)) {
if (subs.isEmpty) {
playerMap.get(event.level)?.remove(event.pos.toLong())
} else {
subs.changeset++
subs.blockEntities.forValidRefs {
it.synchronizer.removeEndpointFor(event.player)
}
}
if (subs.unsubscribe(event.player) && subs.isEmpty) {
levelMap.remove(event.pos.toLong())
}
}
fun playerDisconnected(event: PlayerEvent.PlayerLoggedOutEvent) {
for (tree in playerMap.values) {
val iterator = tree.iterator()
for (entry in iterator) {
if (entry.value.players.remove(event.entity)) {
entry.value.changeset++
if (entry.value.isEmpty) {
iterator.remove()
}
}
tree.values.removeIf {
it.unsubscribe(event.entity as ServerPlayer)
it.isEmpty
}
}
}

View File

@ -1,12 +1,14 @@
package ru.dbotthepony.mc.otm.core.collect
import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenCustomHashSet
import it.unimi.dsi.fastutil.objects.ObjectOpenCustomHashSet
import it.unimi.dsi.fastutil.objects.ObjectSet
import ru.dbotthepony.mc.otm.core.util.HashedWeakReference
import java.lang.ref.ReferenceQueue
class WeakHashSet<E : Any> : MutableSet<E> {
class WeakHashSet<E : Any>(initialCapacity: Int = 16, loadFactor: Float = 0.75f, linked: Boolean = false) : MutableSet<E> {
private val queue = ReferenceQueue<E>()
private val backing = ObjectOpenCustomHashSet<Any>(ReferenceHashStrategy)
private val backing: ObjectSet<Any> = if (linked) ObjectLinkedOpenCustomHashSet(initialCapacity, loadFactor, ReferenceHashStrategy) else ObjectOpenCustomHashSet(initialCapacity, loadFactor, ReferenceHashStrategy)
private fun purge() {
var next = queue.poll()

View File

@ -66,10 +66,13 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa
private var nextFieldID = 0
val hasObservers: Boolean get() = observers.isNotEmpty()
val isEmpty: Boolean get() = fields.isEmpty()
val isNotEmpty: Boolean get() = fields.isNotEmpty()
var hasChanges: Boolean = false
var isEmpty: Boolean = true
private set
val isNotEmpty: Boolean get() = !isEmpty
var isDirty: Boolean = false
private set(value) {
if (value != field) {
field = value
@ -85,7 +88,7 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa
}
fun markClean() {
hasChanges = false
isDirty = false
}
fun computedByte(getter: () -> Byte) = ComputedField(getter, ByteValueCodec)
@ -301,7 +304,7 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa
private var nextEndpointsCleanup = secondTime
private fun notifyEndpoints(dirtyField: AbstractField<*>) {
hasChanges = true
isDirty = true
forEachEndpoint {
it.addDirtyField(dirtyField)
@ -480,6 +483,7 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa
} else {
fields.add(this)
id = fields.size
isEmpty = false
}
}
@ -496,6 +500,7 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa
freeSlots++
fields[id - 1] = null
observers.remove(this)
isEmpty = fields.all { it == null }
while (fields[fields.size - 1] == null) {
fields.removeAt(fields.size - 1)
@ -1994,7 +1999,7 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa
}
lmarkDirty()
hasChanges = true
this@FieldSynchronizer.isDirty = true
}
override fun onValueAdded(key: K, value: V) {