event loop now accepts tasks during shutdown if they happen inside event loop

This commit is contained in:
DBotThePony 2024-04-22 22:01:16 +07:00
parent 7270b1b47b
commit 4212390bf3
Signed by: DBot
GPG Key ID: DCC23B5715498507

View File

@ -170,12 +170,12 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
} }
final override fun execute(command: Runnable) { final override fun execute(command: Runnable) {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
if (currentThread() === this) { if (currentThread() === this) {
command.run() command.run()
} else { } else {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
val future = CompletableFuture<Unit>() val future = CompletableFuture<Unit>()
val pair = TaskPair(future) { command.run() } val pair = TaskPair(future) { command.run() }
@ -189,9 +189,6 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
} }
final override fun <T> submit(task: Callable<T>): CompletableFuture<T> { final override fun <T> submit(task: Callable<T>): CompletableFuture<T> {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
if (currentThread() === this) { if (currentThread() === this) {
try { try {
return CompletableFuture.completedFuture(task.call()) return CompletableFuture.completedFuture(task.call())
@ -199,6 +196,9 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
return CompletableFuture.failedFuture(err) return CompletableFuture.failedFuture(err)
} }
} else { } else {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
val future = CompletableFuture<T>() val future = CompletableFuture<T>()
val pair = TaskPair(future, task) val pair = TaskPair(future, task)
@ -228,9 +228,6 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
fun isSameThread() = this === currentThread() fun isSameThread() = this === currentThread()
final override fun submit(task: Runnable): CompletableFuture<*> { final override fun submit(task: Runnable): CompletableFuture<*> {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
if (currentThread() === this) { if (currentThread() === this) {
try { try {
return CompletableFuture.completedFuture(task.run()) return CompletableFuture.completedFuture(task.run())
@ -238,6 +235,9 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
return CompletableFuture.failedFuture<Unit>(err) return CompletableFuture.failedFuture<Unit>(err)
} }
} else { } else {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
val future = CompletableFuture<Unit>() val future = CompletableFuture<Unit>()
val pair = TaskPair(future) { task.run() } val pair = TaskPair(future) { task.run() }
@ -252,9 +252,6 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
} }
final override fun <T> submit(task: Runnable, result: T): CompletableFuture<T> { final override fun <T> submit(task: Runnable, result: T): CompletableFuture<T> {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
if (currentThread() === this) { if (currentThread() === this) {
try { try {
task.run() task.run()
@ -263,6 +260,9 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
return CompletableFuture.failedFuture(err) return CompletableFuture.failedFuture(err)
} }
} else { } else {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
val future = CompletableFuture<T>() val future = CompletableFuture<T>()
val pair = TaskPair(future) { task.run(); result } val pair = TaskPair(future) { task.run(); result }
@ -278,32 +278,20 @@ open class BlockableEventLoop(name: String) : Thread(name), ScheduledExecutorSer
} }
final override fun <T> invokeAll(tasks: Collection<Callable<T>>): List<Future<T>> { final override fun <T> invokeAll(tasks: Collection<Callable<T>>): List<Future<T>> {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
return tasks.map { submit(it) } return tasks.map { submit(it) }
} }
final override fun <T> invokeAll(tasks: Collection<Callable<T>>, timeout: Long, unit: TimeUnit): List<Future<T>> { final override fun <T> invokeAll(tasks: Collection<Callable<T>>, timeout: Long, unit: TimeUnit): List<Future<T>> {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
val futures = tasks.map { submit(it) } val futures = tasks.map { submit(it) }
CompletableFuture.allOf(*futures.toTypedArray()).get(timeout, unit) CompletableFuture.allOf(*futures.toTypedArray()).get(timeout, unit)
return futures return futures
} }
final override fun <T> invokeAny(tasks: Collection<Callable<T>>): T { final override fun <T> invokeAny(tasks: Collection<Callable<T>>): T {
if (!isRunning)
throw RejectedExecutionException("EventLoop shut down")
return submit(tasks.first()).get() return submit(tasks.first()).get()
} }
final override fun <T> invokeAny(tasks: Collection<Callable<T>>, timeout: Long, unit: TimeUnit): T { final override fun <T> invokeAny(tasks: Collection<Callable<T>>, timeout: Long, unit: TimeUnit): T {
if (!isRunning)
throw RejectedExecutionException("EventLoop is shutting down")
return submit(tasks.first()).get(timeout, unit) return submit(tasks.first()).get(timeout, unit)
} }