diff --git a/src/main/kotlin/ru/dbotthepony/kstarbound/util/ManualExecutorService.kt b/src/main/kotlin/ru/dbotthepony/kstarbound/util/ManualExecutorService.kt index abc25490..9ffc09d9 100644 --- a/src/main/kotlin/ru/dbotthepony/kstarbound/util/ManualExecutorService.kt +++ b/src/main/kotlin/ru/dbotthepony/kstarbound/util/ManualExecutorService.kt @@ -1,5 +1,6 @@ package ru.dbotthepony.kstarbound.util +import com.google.common.util.concurrent.Futures import java.util.concurrent.Callable import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ExecutorService @@ -12,32 +13,6 @@ class ManualExecutorService(val thread: Thread = Thread.currentThread()) : Execu private val executeQueue = ConcurrentLinkedQueue<Runnable>() private val futureQueue = ConcurrentLinkedQueue<FutureTask<*>>() - private data class InPlaceFuture<V>(private val value: V) : Future<V> { - override fun cancel(mayInterruptIfRunning: Boolean): Boolean { - return false - } - - override fun isCancelled(): Boolean { - return false - } - - override fun isDone(): Boolean { - return true - } - - override fun get(): V { - return value - } - - override fun get(timeout: Long, unit: TimeUnit): V { - return value - } - - companion object { - val EMPTY = InPlaceFuture(Unit) - } - } - fun isSameThread(): Boolean { return Thread.currentThread() === thread } @@ -91,24 +66,23 @@ class ManualExecutorService(val thread: Thread = Thread.currentThread()) : Execu } override fun <T : Any?> submit(task: Callable<T>): Future<T> { - if (isSameThread()) return InPlaceFuture(task.call()) + if (isSameThread()) return Futures.immediateFuture(task.call()) return FutureTask(task).also { futureQueue.add(it); LockSupport.unpark(thread) } } override fun <T : Any?> submit(task: Runnable, result: T): Future<T> { - if (isSameThread()) { task.run(); return InPlaceFuture(result) } + if (isSameThread()) { task.run(); return Futures.immediateFuture(result) } return FutureTask { task.run(); result }.also { futureQueue.add(it); LockSupport.unpark(thread) } } override fun submit(task: Runnable): Future<*> { - if (isSameThread()) { task.run(); return InPlaceFuture.EMPTY - } + if (isSameThread()) { task.run(); return Futures.immediateVoidFuture() } return FutureTask { task.run() }.also { futureQueue.add(it); LockSupport.unpark(thread) } } override fun <T : Any?> invokeAll(tasks: Collection<Callable<T>>): List<Future<T>> { if (isSameThread()) { - return tasks.map { InPlaceFuture(it.call()) } + return tasks.map { Futures.immediateFuture(it.call()) } } else { return tasks.map { submit(it) }.onEach { it.get() } } @@ -120,7 +94,7 @@ class ManualExecutorService(val thread: Thread = Thread.currentThread()) : Execu unit: TimeUnit ): List<Future<T>> { if (isSameThread()) { - return tasks.map { InPlaceFuture(it.call()) } + return tasks.map { Futures.immediateFuture(it.call()) } } else { return tasks.map { submit(it) }.onEach { it.get(timeout, unit) } } @@ -148,4 +122,15 @@ class ManualExecutorService(val thread: Thread = Thread.currentThread()) : Execu } } + fun <V> join(future: Future<V>): V { + if (!isSameThread()) + return future.get() + + while (!future.isDone) { + executeQueuedTasks() + LockSupport.parkNanos(1_000_000L) + } + + return future.get() + } }