Make mailbox be lightweight as possible
This commit is contained in:
parent
792e0c6ff6
commit
f0791a47b7
@ -38,10 +38,7 @@ private fun <E : Comparable<E>> LinkedList<E>.enqueue(value: E) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
class MailboxExecutorService(val thread: Thread = Thread.currentThread()) : ScheduledExecutorService {
|
class MailboxExecutorService(val thread: Thread = Thread.currentThread()) : ScheduledExecutorService {
|
||||||
private val executeQueue = ConcurrentLinkedQueue<Runnable>()
|
|
||||||
private val futureQueue = ConcurrentLinkedQueue<FutureTask<*>>()
|
private val futureQueue = ConcurrentLinkedQueue<FutureTask<*>>()
|
||||||
private val timerBacklog = ConcurrentLinkedQueue<Timer<*>>()
|
|
||||||
private val repeatableTimersBacklog = ConcurrentLinkedQueue<RepeatableTimer>()
|
|
||||||
|
|
||||||
private val timers = LinkedList<Timer<*>>()
|
private val timers = LinkedList<Timer<*>>()
|
||||||
private val repeatableTimers = LinkedList<RepeatableTimer>()
|
private val repeatableTimers = LinkedList<RepeatableTimer>()
|
||||||
@ -116,51 +113,13 @@ class MailboxExecutorService(val thread: Thread = Thread.currentThread()) : Sche
|
|||||||
}
|
}
|
||||||
|
|
||||||
executionLock.withLock {
|
executionLock.withLock {
|
||||||
var next = executeQueue.poll()
|
var next = futureQueue.poll()
|
||||||
|
|
||||||
while (next != null) {
|
while (next != null) {
|
||||||
if (isTerminated) return
|
if (isTerminated) return
|
||||||
next.run()
|
next.run()
|
||||||
next = executeQueue.poll()
|
|
||||||
}
|
|
||||||
|
|
||||||
var next2 = futureQueue.poll()
|
|
||||||
|
|
||||||
while (next2 != null) {
|
|
||||||
if (isTerminated) return
|
|
||||||
next2.run()
|
|
||||||
Thread.interrupted()
|
Thread.interrupted()
|
||||||
next2 = futureQueue.poll()
|
next = futureQueue.poll()
|
||||||
}
|
|
||||||
|
|
||||||
var next3 = timerBacklog.poll()
|
|
||||||
|
|
||||||
while (next3 != null) {
|
|
||||||
if (isTerminated) return
|
|
||||||
if (next3.isCancelled) {
|
|
||||||
// do nothing
|
|
||||||
} else if (next3.executeAt <= timeOrigin.nanos) {
|
|
||||||
next3.run()
|
|
||||||
Thread.interrupted()
|
|
||||||
} else {
|
|
||||||
timers.enqueue(next3)
|
|
||||||
}
|
|
||||||
|
|
||||||
next3 = timerBacklog.poll()
|
|
||||||
}
|
|
||||||
|
|
||||||
var next4 = repeatableTimersBacklog.poll()
|
|
||||||
|
|
||||||
while (next4 != null) {
|
|
||||||
if (isTerminated) return
|
|
||||||
|
|
||||||
if (next4.isCancelled) {
|
|
||||||
// do nothing
|
|
||||||
} else {
|
|
||||||
repeatableTimers.enqueue(next4)
|
|
||||||
}
|
|
||||||
|
|
||||||
next4 = repeatableTimersBacklog.poll()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
while (!timers.isEmpty()) {
|
while (!timers.isEmpty()) {
|
||||||
@ -207,7 +166,7 @@ class MailboxExecutorService(val thread: Thread = Thread.currentThread()) : Sche
|
|||||||
if (isSameThread()) {
|
if (isSameThread()) {
|
||||||
command.run()
|
command.run()
|
||||||
} else {
|
} else {
|
||||||
executeQueue.add(command)
|
futureQueue.add(FutureTask(command, Unit))
|
||||||
LockSupport.unpark(thread)
|
LockSupport.unpark(thread)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -223,9 +182,6 @@ class MailboxExecutorService(val thread: Thread = Thread.currentThread()) : Sche
|
|||||||
val result = ArrayList<Runnable>()
|
val result = ArrayList<Runnable>()
|
||||||
|
|
||||||
executionLock.withLock {
|
executionLock.withLock {
|
||||||
executeQueue.forEach { result.add(it) }
|
|
||||||
executeQueue.clear()
|
|
||||||
|
|
||||||
futureQueue.forEach {
|
futureQueue.forEach {
|
||||||
it.cancel(false)
|
it.cancel(false)
|
||||||
result.add(it)
|
result.add(it)
|
||||||
@ -347,7 +303,16 @@ class MailboxExecutorService(val thread: Thread = Thread.currentThread()) : Sche
|
|||||||
} else if (isSameThread()) {
|
} else if (isSameThread()) {
|
||||||
timers.enqueue(timer)
|
timers.enqueue(timer)
|
||||||
} else {
|
} else {
|
||||||
timerBacklog.add(timer)
|
execute {
|
||||||
|
if (timer.isCancelled) {
|
||||||
|
// do nothing
|
||||||
|
} else if (timer.executeAt <= timeOrigin.nanos) {
|
||||||
|
timer.run()
|
||||||
|
Thread.interrupted()
|
||||||
|
} else {
|
||||||
|
timers.enqueue(timer)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return timer
|
return timer
|
||||||
@ -364,7 +329,16 @@ class MailboxExecutorService(val thread: Thread = Thread.currentThread()) : Sche
|
|||||||
} else if (isSameThread()) {
|
} else if (isSameThread()) {
|
||||||
timers.enqueue(timer)
|
timers.enqueue(timer)
|
||||||
} else {
|
} else {
|
||||||
timerBacklog.add(timer)
|
execute {
|
||||||
|
if (timer.isCancelled) {
|
||||||
|
// do nothing
|
||||||
|
} else if (timer.executeAt <= timeOrigin.nanos) {
|
||||||
|
timer.run()
|
||||||
|
Thread.interrupted()
|
||||||
|
} else {
|
||||||
|
timers.enqueue(timer)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return timer
|
return timer
|
||||||
@ -381,7 +355,16 @@ class MailboxExecutorService(val thread: Thread = Thread.currentThread()) : Sche
|
|||||||
return RepeatableTimer(
|
return RepeatableTimer(
|
||||||
command,
|
command,
|
||||||
timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit),
|
timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit),
|
||||||
TimeUnit.NANOSECONDS.convert(period, unit), true).also { repeatableTimersBacklog.add(it) }
|
TimeUnit.NANOSECONDS.convert(period, unit), true)
|
||||||
|
.also {
|
||||||
|
execute {
|
||||||
|
if (it.isCancelled) {
|
||||||
|
// do nothing
|
||||||
|
} else {
|
||||||
|
repeatableTimers.enqueue(it)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun scheduleWithFixedDelay(
|
override fun scheduleWithFixedDelay(
|
||||||
@ -395,6 +378,15 @@ class MailboxExecutorService(val thread: Thread = Thread.currentThread()) : Sche
|
|||||||
return RepeatableTimer(
|
return RepeatableTimer(
|
||||||
command,
|
command,
|
||||||
timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit),
|
timeOrigin.nanos + TimeUnit.NANOSECONDS.convert(initialDelay, unit),
|
||||||
TimeUnit.NANOSECONDS.convert(delay, unit), false).also { repeatableTimersBacklog.add(it) }
|
TimeUnit.NANOSECONDS.convert(delay, unit), false)
|
||||||
|
.also {
|
||||||
|
execute {
|
||||||
|
if (it.isCancelled) {
|
||||||
|
// do nothing
|
||||||
|
} else {
|
||||||
|
repeatableTimers.enqueue(it)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user