From 93711aacc108b5ac5a2e10cbf8dfd234d697401b Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Tue, 4 Oct 2022 00:22:14 +0700 Subject: [PATCH] Observing tick list, computed fields in Field Synchronizer --- .../mc/otm/OverdriveThatMatters.java | 1 + .../block/entity/SynchronizedBlockEntity.kt | 253 ++++++++++++------ .../mc/otm/network/FieldSynchronizer.kt | 145 ++++++++-- 3 files changed, 299 insertions(+), 100 deletions(-) diff --git a/src/main/java/ru/dbotthepony/mc/otm/OverdriveThatMatters.java b/src/main/java/ru/dbotthepony/mc/otm/OverdriveThatMatters.java index 3cab6a3d4..6b1bee0ef 100644 --- a/src/main/java/ru/dbotthepony/mc/otm/OverdriveThatMatters.java +++ b/src/main/java/ru/dbotthepony/mc/otm/OverdriveThatMatters.java @@ -144,6 +144,7 @@ public final class OverdriveThatMatters { EVENT_BUS.addListener(EventPriority.NORMAL, SynchronizedBlockEntity.Companion::onWatch); EVENT_BUS.addListener(EventPriority.NORMAL, SynchronizedBlockEntity.Companion::onForget); EVENT_BUS.addListener(EventPriority.NORMAL, SynchronizedBlockEntity.Companion::playerDisconnected); + EVENT_BUS.addListener(EventPriority.LOWEST, SynchronizedBlockEntity.Companion::postLevelTick); EVENT_BUS.addListener(EventPriority.NORMAL, EnderTeleporterFeature.Companion::onEntityDeath); diff --git a/src/main/kotlin/ru/dbotthepony/mc/otm/block/entity/SynchronizedBlockEntity.kt b/src/main/kotlin/ru/dbotthepony/mc/otm/block/entity/SynchronizedBlockEntity.kt index 58c3372be..5f9d2ab07 100644 --- a/src/main/kotlin/ru/dbotthepony/mc/otm/block/entity/SynchronizedBlockEntity.kt +++ b/src/main/kotlin/ru/dbotthepony/mc/otm/block/entity/SynchronizedBlockEntity.kt @@ -2,8 +2,6 @@ package ru.dbotthepony.mc.otm.block.entity import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap import it.unimi.dsi.fastutil.longs.Long2ObjectFunction -import it.unimi.dsi.fastutil.objects.Object2ObjectAVLTreeMap -import it.unimi.dsi.fastutil.objects.Reference2ObjectOpenHashMap import net.minecraft.core.BlockPos import net.minecraft.server.level.ServerLevel import net.minecraft.server.level.ServerPlayer @@ -13,93 +11,17 @@ import net.minecraft.world.level.block.entity.BlockEntity import net.minecraft.world.level.block.entity.BlockEntityType import net.minecraft.world.level.block.state.BlockState import net.minecraftforge.event.TickEvent +import net.minecraftforge.event.TickEvent.LevelTickEvent import net.minecraftforge.event.entity.player.PlayerEvent -import net.minecraftforge.event.entity.player.PlayerEvent.PlayerChangedDimensionEvent import net.minecraftforge.event.level.ChunkWatchEvent -import net.minecraftforge.event.server.ServerStartingEvent import net.minecraftforge.event.server.ServerStoppingEvent -import org.apache.logging.log4j.LogManager -import ru.dbotthepony.mc.otm.NULLABLE_MINECRAFT_SERVER -import ru.dbotthepony.mc.otm.SERVER_IS_LIVE -import ru.dbotthepony.mc.otm.core.component1 -import ru.dbotthepony.mc.otm.core.component2 -import ru.dbotthepony.mc.otm.core.component3 -import ru.dbotthepony.mc.otm.core.position import ru.dbotthepony.mc.otm.network.BlockEntitySyncPacket import ru.dbotthepony.mc.otm.network.FieldSynchronizer import ru.dbotthepony.mc.otm.network.WorldNetworkChannel import ru.dbotthepony.mc.otm.onceServer import java.lang.ref.WeakReference -import java.util.LinkedList import java.util.WeakHashMap -private data class Subscribers( - val blockEntities: ArrayList> = ArrayList(0), - val players: ArrayList = ArrayList(1), - val level: WeakReference, - val chunkPos: Long, - var changeset: Int = 0 -) { - fun cleanUpBlocks() { - val listIterator = blockEntities.listIterator() - - for (block in listIterator) { - if (block.get() == null) { - listIterator.remove() - } - } - } - - fun subscribe(blockEntity: SynchronizedBlockEntity) { - val iterator = blockEntities.listIterator() - - for (value in iterator) { - val ref = value.get() - - if (ref === blockEntity) { - return - } else if (ref == null) { - iterator.remove() - } - } - - blockEntities.add(WeakReference(blockEntity)) - changeset++ - - onceServer { - blockEntity.synchronizeToPlayers() - } - } - - fun unsubscribe(blockEntity: SynchronizedBlockEntity): Boolean { - val listIterator = blockEntities.listIterator() - - for (value in listIterator) { - if (value.get() === blockEntity) { - listIterator.remove() - changeset++ - break - } - } - - if (players.isNotEmpty()) { - return false - } - - cleanUpBlocks() - return blockEntities.isEmpty() - } - - val isEmpty: Boolean get() { - if (!players.isEmpty()) { - return false - } - - cleanUpBlocks() - return blockEntities.isEmpty() - } -} - abstract class SynchronizedBlockEntity(p_155228_: BlockEntityType<*>, p_155229_: BlockPos, p_155230_: BlockState) : BlockEntity(p_155228_, p_155229_, p_155230_) { val synchronizer = FieldSynchronizer { if (!isRemoved && level?.isClientSide == false) { @@ -136,7 +58,7 @@ abstract class SynchronizedBlockEntity(p_155228_: BlockEntityType<*>, p_155229_: } } - private var _subCache: Subscribers? = null + private var _subCache: ChunkSubscribers? = null private fun unsubscribe() { val subCache = _subCache ?: return @@ -152,11 +74,15 @@ abstract class SynchronizedBlockEntity(p_155228_: BlockEntityType<*>, p_155229_: lastSubscriptionChangeset = -1 } - private fun subscribe(): Subscribers { + private fun subscribe(): ChunkSubscribers { val level = level check(level is ServerLevel) { "Invalid realm" } unsubscribe() - val subs = playerMap.computeIfAbsent(level) { Long2ObjectAVLTreeMap() }.computeIfAbsent(ChunkPos(blockPos).toLong(), Long2ObjectFunction { Subscribers(level = WeakReference(level), chunkPos = ChunkPos(blockPos).toLong()) }) + + val subs = playerMap + .computeIfAbsent(level) { Long2ObjectAVLTreeMap() } + .computeIfAbsent(ChunkPos(blockPos).toLong(), Long2ObjectFunction { ChunkSubscribers(level = WeakReference(level), chunkPos = ChunkPos(blockPos).toLong()) }) + subs.subscribe(this) _subCache = subs return subs @@ -199,6 +125,124 @@ abstract class SynchronizedBlockEntity(p_155228_: BlockEntityType<*>, p_155229_: synchronizer.markClean() } + private data class ChunkSubscribers( + val blockEntities: ArrayList> = ArrayList(0), + val players: ArrayList = ArrayList(1), + val observingBlockEntities: ArrayList> = ArrayList(0), + val level: WeakReference, + val chunkPos: Long, + var changeset: Int = 0 + ) { + fun cleanUpBlocks() { + val listIterator = blockEntities.listIterator() + + for (block in listIterator) { + if (block.get() == null) { + listIterator.remove() + } + } + } + + val hasObservers: Boolean get() { + val listIterator = blockEntities.listIterator() + + for (value in listIterator) { + val ref = value.get() + + if (ref == null) { + listIterator.remove() + } else if (ref.synchronizer.hasObservers) { + return true + } + } + + return false + } + + fun subscribe(blockEntity: SynchronizedBlockEntity) { + val iterator = blockEntities.listIterator() + + for (value in iterator) { + val ref = value.get() + + if (ref === blockEntity) { + return + } else if (ref == null) { + iterator.remove() + } + } + + val bref = WeakReference(blockEntity) + blockEntities.add(bref) + changeset++ + + onceServer { + blockEntity.synchronizeToPlayers() + } + + if (blockEntity.synchronizer.hasObservers) { + observingBlockEntities.add(bref) + val listing = tickingMap.computeIfAbsent(level.get() ?: throw NullPointerException("Level got GCd!")) { ArrayList(1) } + val tickerIterator = listing.listIterator() + + for (value in tickerIterator) { + val ref = value.get() + + if (ref === this) { + return + } else if (ref == null) { + tickerIterator.remove() + } + } + + listing.add(WeakReference(this)) + } + } + + fun unsubscribe(blockEntity: SynchronizedBlockEntity): Boolean { + val listIterator = blockEntities.listIterator() + + for (value in listIterator) { + if (value.get() === blockEntity) { + listIterator.remove() + changeset++ + break + } + } + + if (blockEntity.synchronizer.hasObservers) { + val tickerIterator = observingBlockEntities.listIterator() + + for (value in tickerIterator) { + val ref = value.get() + + if (ref === blockEntity) { + tickerIterator.remove() + break + } else if (ref == null) { + tickerIterator.remove() + } + } + } + + if (players.isNotEmpty()) { + return false + } + + cleanUpBlocks() + return blockEntities.isEmpty() + } + + val isEmpty: Boolean get() { + if (players.isNotEmpty()) { + return false + } + + cleanUpBlocks() + return blockEntities.isEmpty() + } + } + /** * Why track player-tracked chunks? * @@ -214,14 +258,55 @@ abstract class SynchronizedBlockEntity(p_155228_: BlockEntityType<*>, p_155229_: * [net.minecraft.server.level.ChunkMap.getPlayers], which is not something we want */ companion object { - private val playerMap = WeakHashMap>() + private val playerMap = WeakHashMap>() + private val tickingMap = WeakHashMap>>() fun onServerStopping(event: ServerStoppingEvent) { playerMap.clear() } + fun postLevelTick(event: LevelTickEvent) { + if (event.phase == TickEvent.Phase.END) { + val level = event.level as? ServerLevel ?: return + val listing = tickingMap[level] ?: return + val listIterator = listing.listIterator() + var hitAnyObservers = false + + for (value in listIterator) { + val ref = value.get() + + if (ref == null) { + listIterator.remove() + } else { + var hitObservers = false + val blockIterator = ref.observingBlockEntities.listIterator() + + for (bvalue in blockIterator) { + val bref = bvalue.get() + + if (bref == null) { + blockIterator.remove() + } else { + hitObservers = true + hitAnyObservers = true + bref.synchronizeToPlayers() + } + } + + if (!hitObservers) { + listIterator.remove() + } + } + } + + if (!hitAnyObservers) { + tickingMap.remove(level) + } + } + } + fun onWatch(event: ChunkWatchEvent.Watch) { - playerMap.computeIfAbsent(event.level) { Long2ObjectAVLTreeMap() }.computeIfAbsent(event.pos.toLong(), Long2ObjectFunction { Subscribers(level = WeakReference(event.level), chunkPos = event.pos.toLong()) }).let { + playerMap.computeIfAbsent(event.level) { Long2ObjectAVLTreeMap() }.computeIfAbsent(event.pos.toLong(), Long2ObjectFunction { ChunkSubscribers(level = WeakReference(event.level), chunkPos = event.pos.toLong()) }).let { val (blocks, players) = it if (event.player !in players) { diff --git a/src/main/kotlin/ru/dbotthepony/mc/otm/network/FieldSynchronizer.kt b/src/main/kotlin/ru/dbotthepony/mc/otm/network/FieldSynchronizer.kt index 5841fc42a..b9be1c94c 100644 --- a/src/main/kotlin/ru/dbotthepony/mc/otm/network/FieldSynchronizer.kt +++ b/src/main/kotlin/ru/dbotthepony/mc/otm/network/FieldSynchronizer.kt @@ -12,7 +12,7 @@ import java.io.InputStream import java.lang.ref.WeakReference import java.math.BigDecimal import java.util.* -import java.util.function.Consumer +import java.util.function.Supplier import kotlin.ConcurrentModificationException import kotlin.collections.ArrayList import kotlin.collections.HashMap @@ -20,6 +20,7 @@ import kotlin.properties.ReadOnlyProperty import kotlin.properties.ReadWriteProperty import kotlin.reflect.KMutableProperty0 import kotlin.reflect.KProperty +import kotlin.reflect.KProperty0 interface FieldAccess { fun read(): V @@ -35,7 +36,7 @@ fun interface FieldSetter { } sealed interface IField : ReadOnlyProperty { - fun observe() + fun observe(): Boolean fun markDirty() fun markDirty(endpoint: FieldSynchronizer.Endpoint) val value: V @@ -70,6 +71,7 @@ enum class MapAction { CLEAR, ADD, REMOVE } +@Suppress("unused") class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCallback: Boolean) { constructor() : this(Runnable {}, false) constructor(callback: Runnable) : this(callback, false) @@ -77,6 +79,7 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa private val fields = ArrayList>(0) private val observers = ArrayList>(0) + val hasObservers: Boolean get() = observers.isNotEmpty() val isEmpty: Boolean get() = fields.isEmpty() val isNotEmpty: Boolean get() = fields.isNotEmpty() @@ -99,6 +102,54 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa hasChanges = false } + fun byte(getter: () -> Byte) = ComputedField(getter, ByteValueCodec) + fun bool(getter: () -> Boolean) = ComputedField(getter, BooleanValueCodec) + fun short(getter: () -> Short) = ComputedField(getter, ShortValueCodec) + fun long(getter: () -> Long) = ComputedField(getter, VarLongValueCodec) + fun fixedLong(getter: () -> Long) = ComputedField(getter, LongValueCodec) + fun float(getter: () -> Float) = ComputedField(getter, FloatValueCodec) + fun double(getter: () -> Double) = ComputedField(getter, DoubleValueCodec) + fun uuid(getter: () -> UUID) = ComputedField(getter, UUIDValueCodec) + fun int(getter: () -> Int) = ComputedField(getter, VarIntValueCodec) + fun fixedInt(getter: () -> Int) = ComputedField(getter, IntValueCodec) + fun fraction(getter: () -> ImpreciseFraction) = ComputedField(getter, ImpreciseFractionValueCodec) + fun bigDecimal(getter: () -> BigDecimal) = ComputedField(getter, BigDecimalValueCodec) + fun item(getter: () -> ItemStack) = ComputedField(getter, ItemStackValueCodec) + + fun byte(getter: KProperty0) = ComputedField(getter, ByteValueCodec) + fun bool(getter: KProperty0) = ComputedField(getter, BooleanValueCodec) + fun short(getter: KProperty0) = ComputedField(getter, ShortValueCodec) + fun long(getter: KProperty0) = ComputedField(getter, VarLongValueCodec) + fun fixedLong(getter: KProperty0) = ComputedField(getter, LongValueCodec) + fun float(getter: KProperty0) = ComputedField(getter, FloatValueCodec) + fun double(getter: KProperty0) = ComputedField(getter, DoubleValueCodec) + fun uuid(getter: KProperty0) = ComputedField(getter, UUIDValueCodec) + fun int(getter: KProperty0) = ComputedField(getter, VarIntValueCodec) + fun fixedInt(getter: KProperty0) = ComputedField(getter, IntValueCodec) + fun fraction(getter: KProperty0) = ComputedField(getter, ImpreciseFractionValueCodec) + fun bigDecimal(getter: KProperty0) = ComputedField(getter, BigDecimalValueCodec) + fun item(getter: KProperty0) = ComputedField(getter, ItemStackValueCodec) + + fun byte(getter: Supplier) = ComputedField(getter::get, ByteValueCodec) + fun bool(getter: Supplier) = ComputedField(getter::get, BooleanValueCodec) + fun short(getter: Supplier) = ComputedField(getter::get, ShortValueCodec) + fun long(getter: Supplier) = ComputedField(getter::get, VarLongValueCodec) + fun fixedLong(getter: Supplier) = ComputedField(getter::get, LongValueCodec) + fun float(getter: Supplier) = ComputedField(getter::get, FloatValueCodec) + fun double(getter: Supplier) = ComputedField(getter::get, DoubleValueCodec) + fun uuid(getter: Supplier) = ComputedField(getter::get, UUIDValueCodec) + fun int(getter: Supplier) = ComputedField(getter::get, VarIntValueCodec) + fun fixedInt(getter: Supplier) = ComputedField(getter::get, IntValueCodec) + fun fraction(getter: Supplier) = ComputedField(getter::get, ImpreciseFractionValueCodec) + fun bigDecimal(getter: Supplier) = ComputedField(getter::get, BigDecimalValueCodec) + fun item(getter: Supplier) = ComputedField(getter::get, ItemStackValueCodec) + + fun > enum(type: Class, getter: () -> T) = ComputedField(getter, EnumValueCodec(type)) + inline fun > enum(noinline getter: () -> T) = ComputedField(getter, EnumValueCodec(T::class.java)) + + fun > enum(type: Class, getter: KProperty0) = ComputedField(getter, EnumValueCodec(type)) + inline fun > enum(getter: KProperty0) = ComputedField(getter, EnumValueCodec(T::class.java)) + fun byte( value: Byte = 0, getter: FieldGetter? = null, @@ -216,8 +267,9 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa value: ItemStack = ItemStack.EMPTY, getter: FieldGetter? = null, setter: FieldSetter? = null, + observe: Boolean = true ): Field { - return Field(value, ItemStackValueCodec, getter, setter, isObserver = true) + return Field(value, ItemStackValueCodec, getter, setter, isObserver = observe) } fun item( @@ -421,11 +473,13 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa } } - override fun observe() { + override fun observe(): Boolean { if (!isDirty && !codec.compare(remote, field)) { notifyEndpoints(this@Field) isDirty = true } + + return isDirty } override var value: V @@ -489,8 +543,59 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa } } + inner class ComputedField( + private val getter: () -> V, + private val codec: IStreamCodec, + ) : IField { + private var remote: V? = null + private var clientValue: V? = null + private var isDirty = false + + val id = fields.size + 1 + + init { + fields.add(this) + observers.add(this) + } + + override fun observe(): Boolean { + if (!isDirty && (remote == null || !codec.compare(remote ?: throw ConcurrentModificationException(), value))) { + notifyEndpoints(this) + isDirty = true + remote = codec.copy(value) + } + + return isDirty + } + + override fun markDirty() { + if (!isDirty && clientValue == null) { + notifyEndpoints(this) + isDirty = true + } + } + + override fun markDirty(endpoint: Endpoint) { + endpoint.addDirtyField(this) + } + + override val value: V + get() = clientValue ?: getter.invoke() + + override fun write(stream: DataOutputStream, endpoint: Endpoint) { + stream.write(id) + val value = value + codec.write(stream, value) + isDirty = false + } + + override fun read(stream: DataInputStream) { + clientValue = codec.read(stream) + } + } + inner class ObservedField : IMutableField { - private val codec: StreamCodec + private val codec: IStreamCodec private val getter: () -> V private val setter: (V) -> Unit @@ -500,18 +605,18 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa get() = getter.invoke() set(value) { setter.invoke(value) } - constructor(field: KMutableProperty0, dispatcher: StreamCodec) { - this.codec = dispatcher + constructor(field: KMutableProperty0, codec: IStreamCodec) { + this.codec = codec getter = field::get setter = field::set - remote = dispatcher.copy(value) + remote = codec.copy(value) } - constructor(getter: () -> V, setter: (V) -> Unit, dispatcher: StreamCodec) { - this.codec = dispatcher + constructor(getter: () -> V, setter: (V) -> Unit, codec: IStreamCodec) { + this.codec = codec this.getter = getter this.setter = setter - remote = dispatcher.copy(value) + remote = codec.copy(value) } private var isDirty = false @@ -523,12 +628,14 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa observers.add(this) } - override fun observe() { + override fun observe(): Boolean { if (!isDirty && !codec.compare(remote, value)) { notifyEndpoints(this) isDirty = true remote = codec.copy(value) } + + return isDirty } override fun markDirty() { @@ -601,9 +708,9 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa } } - override fun observe() { + override fun observe(): Boolean { if (isRemote) { - return + return false } val observingBackingMap = observingBackingMap @@ -630,6 +737,8 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa } } } + + return isDirty } override fun markDirty() { @@ -844,12 +953,16 @@ class FieldSynchronizer(private val callback: Runnable, private val alwaysCallCa } } - fun observe() { + fun observe(): Boolean { + var changes = false + if (observers.isNotEmpty()) { for (field in observers) { - field.observe() + changes = field.observe() || changes } } + + return changes } /**