From 4972bea1ce8507a4adecd6a3fe31344512d14f7a Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Wed, 14 Feb 2024 11:46:15 +0700 Subject: [PATCH] CarriedExecutor --- gradle.properties | 2 +- .../kommons/util/CarriedExecutor.kt | 201 ++++++++++++++++++ 2 files changed, 202 insertions(+), 1 deletion(-) create mode 100644 src/main/kotlin/ru/dbotthepony/kommons/util/CarriedExecutor.kt diff --git a/gradle.properties b/gradle.properties index 2c5bdab..4cd379a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -4,7 +4,7 @@ kotlin.code.style=official specifyKotlinAsDependency=false projectGroup=ru.dbotthepony.kommons -projectVersion=2.1.8 +projectVersion=2.2.0 guavaDepVersion=33.0.0 gsonDepVersion=2.8.9 diff --git a/src/main/kotlin/ru/dbotthepony/kommons/util/CarriedExecutor.kt b/src/main/kotlin/ru/dbotthepony/kommons/util/CarriedExecutor.kt new file mode 100644 index 0000000..84747e3 --- /dev/null +++ b/src/main/kotlin/ru/dbotthepony/kommons/util/CarriedExecutor.kt @@ -0,0 +1,201 @@ +package ru.dbotthepony.kommons.util + +import java.lang.ref.Reference +import java.util.concurrent.Callable +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.ExecutionException +import java.util.concurrent.Executor +import java.util.concurrent.ExecutorService +import java.util.concurrent.ForkJoinPool +import java.util.concurrent.Future +import java.util.concurrent.FutureTask +import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.SynchronousQueue +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.LockSupport + +/** + * [ExecutorService] which executes supplied tasks sequentially, in one of carried threads provided by [parentExecutor] + * + * Difference between this executor and single-threaded executor is that this allows to share one or more threads between multiple + * [CarriedExecutor], since when they are done, they unmount themselves from [parentExecutor], allowing thread to perform other work + */ +class CarriedExecutor(private val parentExecutor: Executor) : Runnable, ExecutorService { + constructor() : this(ASYNC_POOL) + + private val isCarried = AtomicBoolean() + private val queue = ConcurrentLinkedQueue() + private val isShutdown = AtomicBoolean() + private val isTerminated = AtomicBoolean() + + override fun execute(command: Runnable) { + if (isShutdown.get()) + throw RejectedExecutionException("Shutdown initiated") + + queue.add(command) + + if (!isShutdown.get() && isCarried.compareAndSet(false, true)) + parentExecutor.execute(this) + } + + override fun shutdown() { + if (isShutdown.compareAndSet(false, true)) { + if (!isCarried.get()) { + shutdownNow() + } + } + } + + override fun shutdownNow(): List { + if (isTerminated.compareAndSet(false, true)) { + isShutdown.set(true) + val tasks = ArrayList() + + var next = queue.poll() + + while (next != null) { + tasks.add(next) + next = queue.poll() + } + + return tasks + } + + return emptyList() + } + + override fun isShutdown(): Boolean { + return isShutdown.get() + } + + override fun isTerminated(): Boolean { + return isTerminated.get() + } + + override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean { + val actual = TimeUnit.NANOSECONDS.convert(timeout, unit) + val t = System.nanoTime() + + while (!isTerminated.get() && System.nanoTime() - t < actual) { + LockSupport.parkNanos(100_000L) + } + + while (isCarried.get() && System.nanoTime() - t < actual) { + LockSupport.parkNanos(100_000L) + } + + return isTerminated.get() + } + + override fun submit(task: Callable): Future { + return FutureTask(task).also(::execute) + } + + override fun submit(task: Runnable, result: T): Future { + return FutureTask(task, result).also(::execute) + } + + override fun submit(task: Runnable): Future<*> { + return FutureTask(task, Unit).also(::execute) + } + + override fun invokeAll(tasks: Collection>): List> { + val futures = tasks.map { submit(it) } + + futures.forEach { + try { + it.get() + } catch (err: ExecutionException) { + + } + } + + return futures + } + + override fun invokeAll(tasks: Collection>, timeout: Long, unit: TimeUnit): List> { + val t = System.nanoTime() + val actual = TimeUnit.NANOSECONDS.convert(timeout, unit) + val futures = tasks.map { submit(it) } + + futures.forEach { + val elapsed = System.nanoTime() - t + + if (elapsed < actual) { + try { + it.get(actual - elapsed, TimeUnit.NANOSECONDS) + } catch (err: ExecutionException) { + } catch (err: InterruptedException) { + } + } else { + it.cancel(false) + } + } + + return futures + } + + override fun invokeAny(tasks: Collection>): T { + return submit(tasks.first()).get() + } + + override fun invokeAny(tasks: Collection>, timeout: Long, unit: TimeUnit): T { + return submit(tasks.first()).get(timeout, unit) + } + + override fun run() { + if (isShutdown.get()) { + shutdownNow() + isCarried.set(false) + return + } + + var next = queue.poll() + + while (next != null) { + next.run() + + if (isShutdown.get()) { + shutdownNow() + isCarried.set(false) + return + } + + next = queue.poll() + } + + isCarried.set(false) + + // do nothing for very short time + var i = 10 + while (i-- > 0) { Reference.reachabilityFence(queue) } + + if (isShutdown.get()) { + shutdownNow() + return + } + + // ensure that queue is empty + if (!queue.isEmpty() && isCarried.compareAndSet(false, true)) { + // queue got updated when we left while loop, but didn't switch isCarried back to false + run() + } + + if (isShutdown.get()) { + shutdownNow() + isCarried.set(false) + return + } + } + + companion object { + /** + * Default executor -- ForkJoinPool.commonPool() unless it cannot + * support parallelism. + */ + private val ASYNC_POOL: Executor = + if (ForkJoinPool.getCommonPoolParallelism() > 1) ForkJoinPool.commonPool() else ThreadPoolExecutor(0, Int.MAX_VALUE, 10L, TimeUnit.SECONDS, SynchronousQueue()) + } +}