KStarbound/src/main/kotlin/ru/dbotthepony/kstarbound/util/CarriedExecutor.kt

99 lines
2.2 KiB
Kotlin

package ru.dbotthepony.kstarbound.util
import kotlinx.coroutines.delay
import org.apache.logging.log4j.LogManager
import java.lang.ref.Reference
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.LockSupport
class CarriedExecutor(private val parent: Executor, private val allowExecutionInPlace: Boolean = true) : Executor, Runnable {
private val queue = ConcurrentLinkedDeque<Runnable>()
private val isCarried = AtomicBoolean()
@Volatile
private var carrierThread: Thread? = null
override fun execute(command: Runnable) {
if (allowExecutionInPlace && carrierThread === Thread.currentThread()) {
// execute blocks in place if we are inside execution loop
try {
command.run()
} catch (err: Throwable) {
LOGGER.error("Exception running task", err)
}
} else {
queue.add(command)
if (isCarried.compareAndSet(false, true)) {
parent.execute(this)
}
}
}
fun executePriority(command: Runnable) {
queue.addFirst(command)
if (isCarried.compareAndSet(false, true)) {
parent.execute(this)
}
}
override fun run() {
while (true) {
carrierThread = Thread.currentThread()
var next = queue.poll()
while (next != null) {
try {
next.run()
} catch (err: Throwable) {
LOGGER.error("Exception running task", err)
}
next = queue.poll()
}
carrierThread = null
isCarried.set(false)
// spinwait for little
var i = 10
while (i-- > 0) {
Reference.reachabilityFence(queue)
}
if (queue.isNotEmpty() && isCarried.compareAndSet(false, true)) {
// bugger! queue updated while we have finished
// and no other thread has picked it up
} else {
break
}
}
}
fun wait(howLong: Long, unit: TimeUnit) {
val deadline = System.nanoTime() + unit.toNanos(howLong)
while (System.nanoTime() < deadline && isCarried.get()) {
LockSupport.parkNanos(500_000L)
}
}
suspend fun waitAsync() {
while (isCarried.get()) {
delay(10L)
}
}
companion object {
private val LOGGER = LogManager.getLogger()
}
}