CarriedExecutor

This commit is contained in:
DBotThePony 2024-02-14 11:46:15 +07:00
parent 02d368fcbf
commit 4972bea1ce
Signed by: DBot
GPG Key ID: DCC23B5715498507
2 changed files with 202 additions and 1 deletions

View File

@ -4,7 +4,7 @@ kotlin.code.style=official
specifyKotlinAsDependency=false
projectGroup=ru.dbotthepony.kommons
projectVersion=2.1.8
projectVersion=2.2.0
guavaDepVersion=33.0.0
gsonDepVersion=2.8.9

View File

@ -0,0 +1,201 @@
package ru.dbotthepony.kommons.util
import java.lang.ref.Reference
import java.util.concurrent.Callable
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executor
import java.util.concurrent.ExecutorService
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.Future
import java.util.concurrent.FutureTask
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.LockSupport
/**
* [ExecutorService] which executes supplied tasks sequentially, in one of carried threads provided by [parentExecutor]
*
* Difference between this executor and single-threaded executor is that this allows to share one or more threads between multiple
* [CarriedExecutor], since when they are done, they unmount themselves from [parentExecutor], allowing thread to perform other work
*/
class CarriedExecutor(private val parentExecutor: Executor) : Runnable, ExecutorService {
constructor() : this(ASYNC_POOL)
private val isCarried = AtomicBoolean()
private val queue = ConcurrentLinkedQueue<Runnable>()
private val isShutdown = AtomicBoolean()
private val isTerminated = AtomicBoolean()
override fun execute(command: Runnable) {
if (isShutdown.get())
throw RejectedExecutionException("Shutdown initiated")
queue.add(command)
if (!isShutdown.get() && isCarried.compareAndSet(false, true))
parentExecutor.execute(this)
}
override fun shutdown() {
if (isShutdown.compareAndSet(false, true)) {
if (!isCarried.get()) {
shutdownNow()
}
}
}
override fun shutdownNow(): List<Runnable> {
if (isTerminated.compareAndSet(false, true)) {
isShutdown.set(true)
val tasks = ArrayList<Runnable>()
var next = queue.poll()
while (next != null) {
tasks.add(next)
next = queue.poll()
}
return tasks
}
return emptyList()
}
override fun isShutdown(): Boolean {
return isShutdown.get()
}
override fun isTerminated(): Boolean {
return isTerminated.get()
}
override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean {
val actual = TimeUnit.NANOSECONDS.convert(timeout, unit)
val t = System.nanoTime()
while (!isTerminated.get() && System.nanoTime() - t < actual) {
LockSupport.parkNanos(100_000L)
}
while (isCarried.get() && System.nanoTime() - t < actual) {
LockSupport.parkNanos(100_000L)
}
return isTerminated.get()
}
override fun <T : Any?> submit(task: Callable<T>): Future<T> {
return FutureTask(task).also(::execute)
}
override fun <T : Any?> submit(task: Runnable, result: T): Future<T> {
return FutureTask(task, result).also(::execute)
}
override fun submit(task: Runnable): Future<*> {
return FutureTask(task, Unit).also(::execute)
}
override fun <T : Any?> invokeAll(tasks: Collection<Callable<T>>): List<Future<T>> {
val futures = tasks.map { submit(it) }
futures.forEach {
try {
it.get()
} catch (err: ExecutionException) {
}
}
return futures
}
override fun <T : Any?> invokeAll(tasks: Collection<Callable<T>>, timeout: Long, unit: TimeUnit): List<Future<T>> {
val t = System.nanoTime()
val actual = TimeUnit.NANOSECONDS.convert(timeout, unit)
val futures = tasks.map { submit(it) }
futures.forEach {
val elapsed = System.nanoTime() - t
if (elapsed < actual) {
try {
it.get(actual - elapsed, TimeUnit.NANOSECONDS)
} catch (err: ExecutionException) {
} catch (err: InterruptedException) {
}
} else {
it.cancel(false)
}
}
return futures
}
override fun <T : Any?> invokeAny(tasks: Collection<Callable<T>>): T {
return submit(tasks.first()).get()
}
override fun <T : Any?> invokeAny(tasks: Collection<Callable<T>>, timeout: Long, unit: TimeUnit): T {
return submit(tasks.first()).get(timeout, unit)
}
override fun run() {
if (isShutdown.get()) {
shutdownNow()
isCarried.set(false)
return
}
var next = queue.poll()
while (next != null) {
next.run()
if (isShutdown.get()) {
shutdownNow()
isCarried.set(false)
return
}
next = queue.poll()
}
isCarried.set(false)
// do nothing for very short time
var i = 10
while (i-- > 0) { Reference.reachabilityFence(queue) }
if (isShutdown.get()) {
shutdownNow()
return
}
// ensure that queue is empty
if (!queue.isEmpty() && isCarried.compareAndSet(false, true)) {
// queue got updated when we left while loop, but didn't switch isCarried back to false
run()
}
if (isShutdown.get()) {
shutdownNow()
isCarried.set(false)
return
}
}
companion object {
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private val ASYNC_POOL: Executor =
if (ForkJoinPool.getCommonPoolParallelism() > 1) ForkJoinPool.commonPool() else ThreadPoolExecutor(0, Int.MAX_VALUE, 10L, TimeUnit.SECONDS, SynchronousQueue())
}
}