MailboxExecutor#exceptionHandler

This commit is contained in:
DBotThePony 2024-03-28 18:46:31 +07:00
parent a772a08a2f
commit 1f65764310
Signed by: DBot
GPG Key ID: DCC23B5715498507
2 changed files with 24 additions and 3 deletions

View File

@ -4,7 +4,7 @@ kotlin.code.style=official
specifyKotlinAsDependency=false specifyKotlinAsDependency=false
projectGroup=ru.dbotthepony.kommons projectGroup=ru.dbotthepony.kommons
projectVersion=2.10.3 projectVersion=2.11.0
guavaDepVersion=33.0.0 guavaDepVersion=33.0.0
gsonDepVersion=2.8.9 gsonDepVersion=2.8.9

View File

@ -1,9 +1,11 @@
package ru.dbotthepony.kommons.util package ru.dbotthepony.kommons.util
import java.lang.Thread.UncaughtExceptionHandler
import java.util.LinkedList import java.util.LinkedList
import java.util.concurrent.Callable import java.util.concurrent.Callable
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Delayed import java.util.concurrent.Delayed
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future import java.util.concurrent.Future
import java.util.concurrent.FutureTask import java.util.concurrent.FutureTask
import java.util.concurrent.RejectedExecutionException import java.util.concurrent.RejectedExecutionException
@ -12,6 +14,7 @@ import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.LockSupport import java.util.concurrent.locks.LockSupport
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import java.util.function.Consumer
import kotlin.concurrent.withLock import kotlin.concurrent.withLock
private fun <E : Comparable<E>> LinkedList<E>.enqueue(value: E) { private fun <E : Comparable<E>> LinkedList<E>.enqueue(value: E) {
@ -59,6 +62,8 @@ class MailboxExecutorService(thread: Thread = Thread.currentThread()) : Schedule
private val timeOrigin = JVMTimeSource() private val timeOrigin = JVMTimeSource()
var exceptionHandler: Consumer<Throwable>? = null
private inner class Timer<T>(task: Callable<T>, val executeAt: Long) : FutureTask<T>(task), ScheduledFuture<T> { private inner class Timer<T>(task: Callable<T>, val executeAt: Long) : FutureTask<T>(task), ScheduledFuture<T> {
override fun compareTo(other: Delayed): Int { override fun compareTo(other: Delayed): Int {
return getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS)) return getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS))
@ -155,6 +160,13 @@ class MailboxExecutorService(thread: Thread = Thread.currentThread()) : Schedule
if (isTerminated) return if (isTerminated) return
next.run() next.run()
Thread.interrupted() Thread.interrupted()
try {
next.get()
} catch (err: ExecutionException) {
exceptionHandler?.accept(err)
}
next = futureQueue.poll() next = futureQueue.poll()
} }
@ -167,6 +179,13 @@ class MailboxExecutorService(thread: Thread = Thread.currentThread()) : Schedule
} else if (first.executeAt <= timeOrigin.nanos) { } else if (first.executeAt <= timeOrigin.nanos) {
first.run() first.run()
Thread.interrupted() Thread.interrupted()
try {
first.get()
} catch (err: ExecutionException) {
exceptionHandler?.accept(err)
}
timers.removeFirst() timers.removeFirst()
} else { } else {
break break
@ -183,8 +202,10 @@ class MailboxExecutorService(thread: Thread = Thread.currentThread()) : Schedule
if (first.isDone) { if (first.isDone) {
repeatableTimers.removeFirst() repeatableTimers.removeFirst()
} else if (first.next <= timeOrigin.nanos) { } else if (first.next <= timeOrigin.nanos) {
first.runAndReset() if (first.runAndReset()) {
executed.add(first) executed.add(first)
}
repeatableTimers.removeFirst() repeatableTimers.removeFirst()
} else { } else {
break break