Remove carried executor since it is, again, it is very usage specific and quite flawed in its current state
This commit is contained in:
parent
c46e2086b5
commit
b42219ac97
@ -4,7 +4,7 @@ kotlin.code.style=official
|
||||
specifyKotlinAsDependency=false
|
||||
|
||||
projectGroup=ru.dbotthepony.kommons
|
||||
projectVersion=2.15.1
|
||||
projectVersion=2.16.0
|
||||
|
||||
guavaDepVersion=33.0.0
|
||||
gsonDepVersion=2.8.9
|
||||
|
@ -1,175 +0,0 @@
|
||||
package ru.dbotthepony.kommons.util
|
||||
|
||||
import java.lang.ref.Reference
|
||||
import java.util.concurrent.Callable
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
import java.util.concurrent.ExecutionException
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Future
|
||||
import java.util.concurrent.FutureTask
|
||||
import java.util.concurrent.RejectedExecutionException
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.locks.LockSupport
|
||||
|
||||
/**
|
||||
* [ExecutorService] which executes supplied tasks sequentially, in one of carried threads provided by [parentExecutor]
|
||||
*
|
||||
* Difference between this executor and single-threaded executor is that this allows to share one or more threads between multiple
|
||||
* [CarriedExecutor], since when they are done, they unmount themselves from [parentExecutor], allowing thread to perform other work
|
||||
*/
|
||||
class CarriedExecutor(private val parentExecutor: Executor) : Runnable, ExecutorService {
|
||||
private val isCarried = AtomicBoolean()
|
||||
private val queue = ConcurrentLinkedQueue<FutureTask<*>>()
|
||||
private val isShutdown = AtomicBoolean()
|
||||
private val isTerminated = AtomicBoolean()
|
||||
|
||||
override fun execute(command: Runnable) {
|
||||
if (isShutdown.get())
|
||||
throw RejectedExecutionException("Shutdown initiated")
|
||||
|
||||
queue.add(FutureTask(command, Unit))
|
||||
|
||||
if (!isShutdown.get() && isCarried.compareAndSet(false, true))
|
||||
parentExecutor.execute(this)
|
||||
}
|
||||
|
||||
override fun shutdown() {
|
||||
if (isShutdown.compareAndSet(false, true)) {
|
||||
if (!isCarried.get()) {
|
||||
shutdownNow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun shutdownNow(): List<Runnable> {
|
||||
if (isTerminated.compareAndSet(false, true)) {
|
||||
isShutdown.set(true)
|
||||
val tasks = ArrayList<Runnable>()
|
||||
|
||||
var next = queue.poll()
|
||||
|
||||
while (next != null) {
|
||||
next.cancel(false)
|
||||
tasks.add(next)
|
||||
next = queue.poll()
|
||||
}
|
||||
|
||||
return tasks
|
||||
}
|
||||
|
||||
return emptyList()
|
||||
}
|
||||
|
||||
override fun isShutdown(): Boolean {
|
||||
return isShutdown.get()
|
||||
}
|
||||
|
||||
override fun isTerminated(): Boolean {
|
||||
return isTerminated.get()
|
||||
}
|
||||
|
||||
override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean {
|
||||
val actual = TimeUnit.NANOSECONDS.convert(timeout, unit)
|
||||
val t = System.nanoTime()
|
||||
|
||||
while (!isTerminated.get() && System.nanoTime() - t < actual) {
|
||||
LockSupport.parkNanos(100_000L)
|
||||
}
|
||||
|
||||
while (isCarried.get() && System.nanoTime() - t < actual) {
|
||||
LockSupport.parkNanos(100_000L)
|
||||
}
|
||||
|
||||
return isTerminated.get()
|
||||
}
|
||||
|
||||
override fun <T : Any?> submit(task: Callable<T>): Future<T> {
|
||||
return FutureTask(task).also(::execute)
|
||||
}
|
||||
|
||||
override fun <T : Any?> submit(task: Runnable, result: T): Future<T> {
|
||||
return FutureTask(task, result).also(::execute)
|
||||
}
|
||||
|
||||
override fun submit(task: Runnable): Future<*> {
|
||||
return FutureTask(task, Unit).also(::execute)
|
||||
}
|
||||
|
||||
override fun <T : Any?> invokeAll(tasks: Collection<Callable<T>>): List<Future<T>> {
|
||||
val futures = tasks.map { submit(it) }
|
||||
|
||||
futures.forEach {
|
||||
try {
|
||||
it.get()
|
||||
} catch (err: ExecutionException) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return futures
|
||||
}
|
||||
|
||||
override fun <T : Any?> invokeAll(tasks: Collection<Callable<T>>, timeout: Long, unit: TimeUnit): List<Future<T>> {
|
||||
val t = System.nanoTime()
|
||||
val actual = TimeUnit.NANOSECONDS.convert(timeout, unit)
|
||||
val futures = tasks.map { submit(it) }
|
||||
|
||||
futures.forEach {
|
||||
val elapsed = System.nanoTime() - t
|
||||
|
||||
if (elapsed < actual) {
|
||||
try {
|
||||
it.get(actual - elapsed, TimeUnit.NANOSECONDS)
|
||||
} catch (err: ExecutionException) {
|
||||
} catch (err: InterruptedException) {
|
||||
}
|
||||
} else {
|
||||
it.cancel(false)
|
||||
}
|
||||
}
|
||||
|
||||
return futures
|
||||
}
|
||||
|
||||
override fun <T : Any?> invokeAny(tasks: Collection<Callable<T>>): T {
|
||||
return submit(tasks.first()).get()
|
||||
}
|
||||
|
||||
override fun <T : Any?> invokeAny(tasks: Collection<Callable<T>>, timeout: Long, unit: TimeUnit): T {
|
||||
return submit(tasks.first()).get(timeout, unit)
|
||||
}
|
||||
|
||||
override fun run() {
|
||||
var next = queue.poll()
|
||||
|
||||
while (next != null) {
|
||||
next.run()
|
||||
next = queue.poll()
|
||||
}
|
||||
|
||||
isCarried.set(false)
|
||||
|
||||
// do nothing for very short time
|
||||
var i = 10
|
||||
while (i-- > 0) { Reference.reachabilityFence(queue) }
|
||||
|
||||
if (isShutdown.get()) {
|
||||
shutdownNow()
|
||||
return
|
||||
}
|
||||
|
||||
// ensure that queue is empty
|
||||
if (!queue.isEmpty() && isCarried.compareAndSet(false, true)) {
|
||||
// queue got updated when we left while loop, but didn't switch isCarried back to false
|
||||
run()
|
||||
}
|
||||
|
||||
if (isShutdown.get()) {
|
||||
shutdownNow()
|
||||
isCarried.set(false)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user