diff --git a/gradle.properties b/gradle.properties index 4dbb90c..e04523a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -4,7 +4,7 @@ kotlin.code.style=official specifyKotlinAsDependency=false projectGroup=ru.dbotthepony.kommons -projectVersion=2.15.1 +projectVersion=2.16.0 guavaDepVersion=33.0.0 gsonDepVersion=2.8.9 diff --git a/src/main/kotlin/ru/dbotthepony/kommons/util/CarriedExecutor.kt b/src/main/kotlin/ru/dbotthepony/kommons/util/CarriedExecutor.kt deleted file mode 100644 index 9a8fcbd..0000000 --- a/src/main/kotlin/ru/dbotthepony/kommons/util/CarriedExecutor.kt +++ /dev/null @@ -1,175 +0,0 @@ -package ru.dbotthepony.kommons.util - -import java.lang.ref.Reference -import java.util.concurrent.Callable -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.ExecutionException -import java.util.concurrent.Executor -import java.util.concurrent.ExecutorService -import java.util.concurrent.Future -import java.util.concurrent.FutureTask -import java.util.concurrent.RejectedExecutionException -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.locks.LockSupport - -/** - * [ExecutorService] which executes supplied tasks sequentially, in one of carried threads provided by [parentExecutor] - * - * Difference between this executor and single-threaded executor is that this allows to share one or more threads between multiple - * [CarriedExecutor], since when they are done, they unmount themselves from [parentExecutor], allowing thread to perform other work - */ -class CarriedExecutor(private val parentExecutor: Executor) : Runnable, ExecutorService { - private val isCarried = AtomicBoolean() - private val queue = ConcurrentLinkedQueue>() - private val isShutdown = AtomicBoolean() - private val isTerminated = AtomicBoolean() - - override fun execute(command: Runnable) { - if (isShutdown.get()) - throw RejectedExecutionException("Shutdown initiated") - - queue.add(FutureTask(command, Unit)) - - if (!isShutdown.get() && isCarried.compareAndSet(false, true)) - parentExecutor.execute(this) - } - - override fun shutdown() { - if (isShutdown.compareAndSet(false, true)) { - if (!isCarried.get()) { - shutdownNow() - } - } - } - - override fun shutdownNow(): List { - if (isTerminated.compareAndSet(false, true)) { - isShutdown.set(true) - val tasks = ArrayList() - - var next = queue.poll() - - while (next != null) { - next.cancel(false) - tasks.add(next) - next = queue.poll() - } - - return tasks - } - - return emptyList() - } - - override fun isShutdown(): Boolean { - return isShutdown.get() - } - - override fun isTerminated(): Boolean { - return isTerminated.get() - } - - override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean { - val actual = TimeUnit.NANOSECONDS.convert(timeout, unit) - val t = System.nanoTime() - - while (!isTerminated.get() && System.nanoTime() - t < actual) { - LockSupport.parkNanos(100_000L) - } - - while (isCarried.get() && System.nanoTime() - t < actual) { - LockSupport.parkNanos(100_000L) - } - - return isTerminated.get() - } - - override fun submit(task: Callable): Future { - return FutureTask(task).also(::execute) - } - - override fun submit(task: Runnable, result: T): Future { - return FutureTask(task, result).also(::execute) - } - - override fun submit(task: Runnable): Future<*> { - return FutureTask(task, Unit).also(::execute) - } - - override fun invokeAll(tasks: Collection>): List> { - val futures = tasks.map { submit(it) } - - futures.forEach { - try { - it.get() - } catch (err: ExecutionException) { - - } - } - - return futures - } - - override fun invokeAll(tasks: Collection>, timeout: Long, unit: TimeUnit): List> { - val t = System.nanoTime() - val actual = TimeUnit.NANOSECONDS.convert(timeout, unit) - val futures = tasks.map { submit(it) } - - futures.forEach { - val elapsed = System.nanoTime() - t - - if (elapsed < actual) { - try { - it.get(actual - elapsed, TimeUnit.NANOSECONDS) - } catch (err: ExecutionException) { - } catch (err: InterruptedException) { - } - } else { - it.cancel(false) - } - } - - return futures - } - - override fun invokeAny(tasks: Collection>): T { - return submit(tasks.first()).get() - } - - override fun invokeAny(tasks: Collection>, timeout: Long, unit: TimeUnit): T { - return submit(tasks.first()).get(timeout, unit) - } - - override fun run() { - var next = queue.poll() - - while (next != null) { - next.run() - next = queue.poll() - } - - isCarried.set(false) - - // do nothing for very short time - var i = 10 - while (i-- > 0) { Reference.reachabilityFence(queue) } - - if (isShutdown.get()) { - shutdownNow() - return - } - - // ensure that queue is empty - if (!queue.isEmpty() && isCarried.compareAndSet(false, true)) { - // queue got updated when we left while loop, but didn't switch isCarried back to false - run() - } - - if (isShutdown.get()) { - shutdownNow() - isCarried.set(false) - return - } - } -}