Un-promisify BTreeDB, since this is job of higher abstractions
This commit is contained in:
parent
486e8834d6
commit
6dc434fa04
3
.gitignore
vendored
3
.gitignore
vendored
@ -39,4 +39,5 @@ bin/
|
||||
.vscode/
|
||||
|
||||
### Mac OS ###
|
||||
.DS_Store
|
||||
.DS_Store
|
||||
/dbtest.bdb
|
||||
|
@ -3,36 +3,29 @@ package ru.dbotthepony.kommons.io
|
||||
import ru.dbotthepony.kommons.util.KOptional
|
||||
import java.io.Closeable
|
||||
import java.io.File
|
||||
import java.util.concurrent.CompletableFuture
|
||||
|
||||
/**
|
||||
* For actual implementation, see [BTreeDB6].
|
||||
*
|
||||
*
|
||||
*/
|
||||
abstract class BTreeDB<K, V> : Closeable {
|
||||
abstract val file: File
|
||||
abstract val blockSize: Int
|
||||
|
||||
open operator fun contains(key: K): Boolean {
|
||||
return hasKey(key).get()
|
||||
}
|
||||
|
||||
abstract fun hasKey(key: K): CompletableFuture<Boolean>
|
||||
abstract operator fun contains(key: K): Boolean
|
||||
|
||||
/**
|
||||
* Reads data at specified [key]
|
||||
*/
|
||||
abstract fun read(key: K): CompletableFuture<KOptional<V>>
|
||||
abstract fun findAllKeys(): CompletableFuture<List<K>>
|
||||
abstract fun read(key: K): KOptional<V>
|
||||
abstract fun findAllKeys(): List<K>
|
||||
|
||||
abstract fun write(key: K, value: V): CompletableFuture<*>
|
||||
abstract fun write(key: K, value: V)
|
||||
}
|
||||
|
||||
abstract class ByteDataBTreeDB<K> : BTreeDB<K, ByteArray>() {
|
||||
abstract fun write(key: K, value: ByteArray, offset: Int, length: Int): CompletableFuture<*>
|
||||
abstract fun write(key: K, value: ByteArray, offset: Int, length: Int)
|
||||
|
||||
final override fun write(key: K, value: ByteArray): CompletableFuture<*> {
|
||||
final override fun write(key: K, value: ByteArray) {
|
||||
return write(key, value, 0, value.size)
|
||||
}
|
||||
}
|
||||
|
@ -46,10 +46,8 @@ private fun readHeader(reader: RandomAccessFile, required: Char) {
|
||||
* This storage does not employ any form of defragmentation, but writes try to allocate
|
||||
* (reasonably sized) continuous blocks, to avoid severe internal fragmentation.
|
||||
*
|
||||
* Specified [Executor] is utilized to execute actual I/O operations.
|
||||
*
|
||||
* [sync] determines whenever writes should be performed atomically. If [sync] is false, in event of
|
||||
* JVM crash, host system crash or power loss, there is a risk of file becoming unreadable, unless underlying filesystem
|
||||
* host system crash or power loss, there is a risk of file becoming unreadable, unless underlying filesystem
|
||||
* and/or device ensures writes to file are performed in the same order as they were issued by code.
|
||||
* If performance is priority, specifying sync as false will yield vastly better performance for writes.
|
||||
*
|
||||
@ -65,15 +63,13 @@ private fun readHeader(reader: RandomAccessFile, required: Char) {
|
||||
// TODO: Changeset counter (to determine write-in-progress blocks/trees, to ignore them when reconstructing tree)
|
||||
// TODO: Tree rotations (rebalancing)
|
||||
// TODO: Removal of keys
|
||||
class BTreeDB6 private constructor(override val file: File, private var reader: RandomAccessFile, pool: Executor, private val sync: Boolean) : ByteDataBTreeDB<ByteKey>() {
|
||||
constructor(file: File, pool: Executor, sync: Boolean = true) : this(file, RandomAccessFile(file, "rw"), pool, sync)
|
||||
class BTreeDB6 private constructor(override val file: File, private var reader: RandomAccessFile, private val sync: Boolean) : ByteDataBTreeDB<ByteKey>() {
|
||||
constructor(file: File, sync: Boolean = true) : this(file, RandomAccessFile(file, "rw"), sync)
|
||||
|
||||
init {
|
||||
reader.seek(0L)
|
||||
}
|
||||
|
||||
private val carrier = CarriedExecutor(pool)
|
||||
|
||||
init {
|
||||
readHeader(reader, 'B')
|
||||
readHeader(reader, 'T')
|
||||
@ -86,9 +82,7 @@ class BTreeDB6 private constructor(override val file: File, private var reader:
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
carrier.execute { reader.close() }
|
||||
carrier.shutdown()
|
||||
carrier.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)
|
||||
reader.close()
|
||||
}
|
||||
|
||||
override val blockSize = reader.readInt()
|
||||
@ -229,7 +223,11 @@ class BTreeDB6 private constructor(override val file: File, private var reader:
|
||||
}
|
||||
}
|
||||
|
||||
private fun doRead(key: ByteKey): KOptional<ByteArray> {
|
||||
override fun contains(key: ByteKey): Boolean {
|
||||
return searchForBlock(key) != null
|
||||
}
|
||||
|
||||
override fun read(key: ByteKey): KOptional<ByteArray> {
|
||||
val data = searchForBlock(key)
|
||||
|
||||
if (data != null) {
|
||||
@ -242,18 +240,6 @@ class BTreeDB6 private constructor(override val file: File, private var reader:
|
||||
return KOptional.empty()
|
||||
}
|
||||
|
||||
override fun hasKey(key: ByteKey): CompletableFuture<Boolean> {
|
||||
return CompletableFuture.supplyAsync(Supplier {
|
||||
searchForBlock(key) != null
|
||||
}, carrier)
|
||||
}
|
||||
|
||||
override fun read(key: ByteKey): CompletableFuture<KOptional<ByteArray>> {
|
||||
return CompletableFuture.supplyAsync(Supplier {
|
||||
doRead(key)
|
||||
}, carrier)
|
||||
}
|
||||
|
||||
private fun readKeysInto(block: Block, result: MutableList<ByteKey>) {
|
||||
when (block.type) {
|
||||
BlockType.TREE -> {
|
||||
@ -276,7 +262,7 @@ class BTreeDB6 private constructor(override val file: File, private var reader:
|
||||
}
|
||||
}
|
||||
|
||||
private fun doFindAllKeys(): List<ByteKey> {
|
||||
override fun findAllKeys(): List<ByteKey> {
|
||||
if (rootBlockIndex == INVALID_BLOCK_INDEX) {
|
||||
return emptyList()
|
||||
}
|
||||
@ -286,13 +272,7 @@ class BTreeDB6 private constructor(override val file: File, private var reader:
|
||||
return result
|
||||
}
|
||||
|
||||
override fun findAllKeys(): CompletableFuture<List<ByteKey>> {
|
||||
return CompletableFuture.supplyAsync(Supplier {
|
||||
doFindAllKeys()
|
||||
}, carrier)
|
||||
}
|
||||
|
||||
private fun doWriteData(key: ByteKey, data: ByteArray, offset: Int, length: Int) {
|
||||
override fun write(key: ByteKey, value: ByteArray, offset: Int, length: Int) {
|
||||
if (rootBlockIndex == INVALID_BLOCK_INDEX) {
|
||||
// create LEAF node for root
|
||||
val block = allocBlock(BlockType.LEAF)
|
||||
@ -308,7 +288,7 @@ class BTreeDB6 private constructor(override val file: File, private var reader:
|
||||
|
||||
val stream = DataOutputStream(BlockOutputStream(newData))
|
||||
stream.writeInt(length)
|
||||
stream.write(data, offset, length)
|
||||
stream.write(value, offset, length)
|
||||
stream.close()
|
||||
|
||||
rootBlockIndex = leaf.write()
|
||||
@ -346,7 +326,7 @@ class BTreeDB6 private constructor(override val file: File, private var reader:
|
||||
val leaf = LeafData(currentBlock)
|
||||
val stream = DataOutputStream(BlockOutputStream(leaf.compute(key, length + 4)))
|
||||
stream.writeInt(length)
|
||||
stream.write(data, offset, length)
|
||||
stream.write(value, offset, length)
|
||||
stream.close()
|
||||
|
||||
if (leaf.shouldSplit) {
|
||||
@ -415,12 +395,6 @@ class BTreeDB6 private constructor(override val file: File, private var reader:
|
||||
}
|
||||
}
|
||||
|
||||
override fun write(key: ByteKey, value: ByteArray, offset: Int, length: Int): CompletableFuture<Void> {
|
||||
return CompletableFuture.runAsync(Runnable {
|
||||
doWriteData(key, value, offset, length)
|
||||
}, carrier)
|
||||
}
|
||||
|
||||
private fun allocBlock(type: BlockType): Block {
|
||||
return allocBlocks(type, 1)[0]
|
||||
}
|
||||
@ -1023,7 +997,7 @@ class BTreeDB6 private constructor(override val file: File, private var reader:
|
||||
*/
|
||||
@JvmStatic
|
||||
@JvmOverloads
|
||||
fun create(file: File, pool: Executor, blockSize: Int = 4096, sync: Boolean = true): BTreeDB6 {
|
||||
fun create(file: File, blockSize: Int = 4096, sync: Boolean = true): BTreeDB6 {
|
||||
require(blockSize >= 128) { "Degenerate block size: $blockSize" }
|
||||
|
||||
if (file.exists())
|
||||
@ -1040,7 +1014,7 @@ class BTreeDB6 private constructor(override val file: File, private var reader:
|
||||
writer.writeInt(INVALID_BLOCK_INDEX) // free bitmap block index
|
||||
writer.writeInt(INVALID_BLOCK_INDEX) // tree root block index
|
||||
|
||||
return BTreeDB6(file, writer, pool, sync)
|
||||
return BTreeDB6(file, writer, sync)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -14,14 +14,19 @@ object BTreeDB6Tests {
|
||||
fun test() {
|
||||
val file = File("dbtest.bdb")
|
||||
if (file.exists()) file.delete()
|
||||
val create = BTreeDB6.create(file, Executor { it.run() }, 4096, sync = false)
|
||||
val create = BTreeDB6.create(file, 4096, sync = false)
|
||||
|
||||
for (i in 0 .. 8000) {
|
||||
val s = "This is key $i"
|
||||
val k = ByteKey("This is key $i")
|
||||
println("This is key $i")
|
||||
create.write(k, s.toByteArray())
|
||||
assertEquals(s, String(create.read(k).get().get()))
|
||||
assertEquals(s, String(create.read(k).get()))
|
||||
}
|
||||
|
||||
for (i in 0 .. 8000) {
|
||||
val s = "This is key $i"
|
||||
val k = ByteKey("This is key $i")
|
||||
assertEquals(s, String(create.read(k).get()))
|
||||
}
|
||||
|
||||
create.close()
|
||||
|
Loading…
Reference in New Issue
Block a user