BTreeDB
This commit is contained in:
parent
b76c4e6aa9
commit
748577d9f5
@ -4,7 +4,7 @@ kotlin.code.style=official
|
||||
specifyKotlinAsDependency=false
|
||||
|
||||
projectGroup=ru.dbotthepony.kommons
|
||||
projectVersion=2.2.1
|
||||
projectVersion=2.3.0
|
||||
|
||||
guavaDepVersion=33.0.0
|
||||
gsonDepVersion=2.8.9
|
||||
|
32
src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB.kt
Normal file
32
src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB.kt
Normal file
@ -0,0 +1,32 @@
|
||||
package ru.dbotthepony.kommons.io
|
||||
|
||||
import ru.dbotthepony.kommons.util.KOptional
|
||||
import java.io.Closeable
|
||||
import java.io.File
|
||||
import java.util.concurrent.CompletableFuture
|
||||
|
||||
/**
|
||||
* For actual implementation, see [BTreeDB6].
|
||||
*
|
||||
*
|
||||
*/
|
||||
abstract class BTreeDB<K, V> : Closeable {
|
||||
abstract val file: File
|
||||
abstract val blockSize: Int
|
||||
|
||||
/**
|
||||
* Reads data at specified [key]
|
||||
*/
|
||||
abstract fun read(key: K): CompletableFuture<KOptional<V>>
|
||||
abstract fun findAllKeys(): CompletableFuture<List<K>>
|
||||
|
||||
abstract fun write(key: K, value: V): CompletableFuture<*>
|
||||
}
|
||||
|
||||
abstract class ByteDataBTreeDB<K> : BTreeDB<K, ByteArray>() {
|
||||
abstract fun write(key: K, value: ByteArray, offset: Int, length: Int): CompletableFuture<*>
|
||||
|
||||
final override fun write(key: K, value: ByteArray): CompletableFuture<*> {
|
||||
return write(key, value, 0, value.size)
|
||||
}
|
||||
}
|
916
src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB6.kt
Normal file
916
src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB6.kt
Normal file
@ -0,0 +1,916 @@
|
||||
package ru.dbotthepony.kommons.io
|
||||
|
||||
import it.unimi.dsi.fastutil.longs.LongArrayList
|
||||
import it.unimi.dsi.fastutil.longs.LongArraySet
|
||||
import it.unimi.dsi.fastutil.objects.Object2LongAVLTreeMap
|
||||
import it.unimi.dsi.fastutil.objects.ObjectArrayList
|
||||
import ru.dbotthepony.kommons.util.CarriedExecutor
|
||||
import ru.dbotthepony.kommons.util.KOptional
|
||||
import java.io.DataInputStream
|
||||
import java.io.DataOutputStream
|
||||
import java.io.File
|
||||
import java.io.InputStream
|
||||
import java.io.OutputStream
|
||||
import java.io.RandomAccessFile
|
||||
import java.util.*
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.function.LongConsumer
|
||||
import java.util.function.Supplier
|
||||
import java.util.stream.LongStream
|
||||
import java.util.stream.Stream
|
||||
|
||||
private fun readHeader(reader: RandomAccessFile, required: Char) {
|
||||
val read = reader.read()
|
||||
require(read.toChar() == required) { "Bad header, expected ${required.code}, got $read" }
|
||||
}
|
||||
|
||||
/**
|
||||
* General purpose binary object storage, employing copy-on-write updates.
|
||||
*
|
||||
* This database is designed to read/write *small* files, since any read/write operation
|
||||
* must be able to fully store the file in question in memory.
|
||||
*
|
||||
* Smallest unit of work is defined as block. Default block size is 4096 bytes (which includes block header and pointer to next block).
|
||||
* See [create] for more details.
|
||||
*
|
||||
* While attempt at minimizing tree read/writes is made, there is no tree rebalancing,
|
||||
* so it is possible to force tree to degenerate into linked list. But with relatively large
|
||||
* block size, rebalancing isn't required since even with hundreds of thousands keys tree
|
||||
* height remains relatively small.
|
||||
*
|
||||
* This storage does not employ any form of defragmentation, so read/write operations can experience
|
||||
* degraded performance after prolonged read/writes of blobs which are larger than block size.
|
||||
*
|
||||
* Specified [Executor] is utilized to execute actual I/O operations.
|
||||
*
|
||||
* While it might be tempting to use [java.util.concurrent.ForkJoinPool.commonPool] here, keep in mind that
|
||||
* common ForkJoinPool is meant ot be used by calculation-intense operations. [BTreeDB6] is, on the other hand,
|
||||
* is I/O intense, and can stall executor threads. A good Executor here would be one that can spawn a lot of threads,
|
||||
* for example, [java.util.concurrent.Executors.newCachedThreadPool].
|
||||
*/
|
||||
class BTreeDB6 private constructor(override val file: File, private val reader: RandomAccessFile, pool: Executor) : ByteDataBTreeDB<ByteKey>() {
|
||||
constructor(file: File, pool: Executor) : this(file, RandomAccessFile(file, "rws"), pool)
|
||||
|
||||
private val carrier = CarriedExecutor(pool)
|
||||
|
||||
init {
|
||||
readHeader(reader, 'B')
|
||||
readHeader(reader, 'T')
|
||||
readHeader(reader, 'r')
|
||||
readHeader(reader, 'e')
|
||||
readHeader(reader, 'e')
|
||||
readHeader(reader, 'D')
|
||||
readHeader(reader, 'B')
|
||||
readHeader(reader, '6')
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
carrier.execute { reader.close() }
|
||||
carrier.shutdown()
|
||||
carrier.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)
|
||||
}
|
||||
|
||||
override val blockSize = reader.readInt()
|
||||
|
||||
init {
|
||||
require(blockSize >= 64) { "Degenerate block size: $blockSize" }
|
||||
}
|
||||
|
||||
private val headerOffset = reader.filePointer
|
||||
private var freeBlockList = reader.readLong()
|
||||
private var rootBlockIndex = reader.readLong()
|
||||
|
||||
private enum class BlockType {
|
||||
// block that is unused
|
||||
FREE,
|
||||
// block that is part of search tree, contains relative keys and block pointers to either other trees or leafs
|
||||
TREE,
|
||||
// block contains exact keys and block pointers
|
||||
LEAF,
|
||||
// contains data length field and data itself
|
||||
DATA;
|
||||
|
||||
companion object {
|
||||
val e = values()
|
||||
}
|
||||
}
|
||||
|
||||
private val headerBuf = ByteArray(16)
|
||||
|
||||
private fun writeHeader() {
|
||||
reader.seek(headerOffset)
|
||||
|
||||
headerBuf[0] = ((freeBlockList ushr 56) and 0xFFL).toByte()
|
||||
headerBuf[1] = ((freeBlockList ushr 48) and 0xFFL).toByte()
|
||||
headerBuf[2] = ((freeBlockList ushr 40) and 0xFFL).toByte()
|
||||
headerBuf[3] = ((freeBlockList ushr 32) and 0xFFL).toByte()
|
||||
headerBuf[4] = ((freeBlockList ushr 24) and 0xFFL).toByte()
|
||||
headerBuf[5] = ((freeBlockList ushr 16) and 0xFFL).toByte()
|
||||
headerBuf[6] = ((freeBlockList ushr 8) and 0xFFL).toByte()
|
||||
headerBuf[7] = ((freeBlockList ushr 0) and 0xFFL).toByte()
|
||||
|
||||
headerBuf[8 + 0] = ((rootBlockIndex ushr 56) and 0xFFL).toByte()
|
||||
headerBuf[8 + 1] = ((rootBlockIndex ushr 48) and 0xFFL).toByte()
|
||||
headerBuf[8 + 2] = ((rootBlockIndex ushr 40) and 0xFFL).toByte()
|
||||
headerBuf[8 + 3] = ((rootBlockIndex ushr 32) and 0xFFL).toByte()
|
||||
headerBuf[8 + 4] = ((rootBlockIndex ushr 24) and 0xFFL).toByte()
|
||||
headerBuf[8 + 5] = ((rootBlockIndex ushr 16) and 0xFFL).toByte()
|
||||
headerBuf[8 + 6] = ((rootBlockIndex ushr 8) and 0xFFL).toByte()
|
||||
headerBuf[8 + 7] = ((rootBlockIndex ushr 0) and 0xFFL).toByte()
|
||||
|
||||
reader.write(headerBuf)
|
||||
}
|
||||
|
||||
private fun doRead(key: ByteKey): KOptional<ByteArray> {
|
||||
if (rootBlockIndex == INVALID_BLOCK_INDEX) {
|
||||
return KOptional.empty()
|
||||
}
|
||||
|
||||
var block = readBlock(rootBlockIndex)
|
||||
|
||||
while (block.type == BlockType.TREE) {
|
||||
val tree = TreeData(block)
|
||||
block = tree.childrenBlock(key)
|
||||
}
|
||||
|
||||
if (block.type == BlockType.LEAF) {
|
||||
val leaf = LeafData(block)
|
||||
val data = leaf.get(key)
|
||||
|
||||
if (data != null) {
|
||||
val stream = DataInputStream(BlockInputStream(data))
|
||||
val output = ByteArray(stream.readInt())
|
||||
stream.readFully(output)
|
||||
return KOptional(output)
|
||||
}
|
||||
} else if (block.type == BlockType.DATA) {
|
||||
throw IllegalStateException("Hit data block when scanning index")
|
||||
} else if (block.type == BlockType.FREE) {
|
||||
throw IllegalStateException("Hit free block when scanning index")
|
||||
}
|
||||
|
||||
return KOptional.empty()
|
||||
}
|
||||
|
||||
override fun read(key: ByteKey): CompletableFuture<KOptional<ByteArray>> {
|
||||
return CompletableFuture.supplyAsync(Supplier {
|
||||
doRead(key)
|
||||
}, carrier)
|
||||
}
|
||||
|
||||
private fun readKeysInto(block: Block, result: MutableList<ByteKey>) {
|
||||
when (block.type) {
|
||||
BlockType.TREE -> {
|
||||
val tree = TreeData(block)
|
||||
|
||||
for (sub in tree.children()) {
|
||||
readKeysInto(readBlock(sub), result)
|
||||
}
|
||||
}
|
||||
|
||||
BlockType.LEAF -> {
|
||||
val leaf = LeafData(block)
|
||||
|
||||
for ((key) in leaf.keys()) {
|
||||
result.add(key)
|
||||
}
|
||||
}
|
||||
|
||||
else -> throw IllegalStateException("Hit $block while scanning tree index")
|
||||
}
|
||||
}
|
||||
|
||||
private fun doFindAllKeys(): List<ByteKey> {
|
||||
if (rootBlockIndex == INVALID_BLOCK_INDEX) {
|
||||
return emptyList()
|
||||
}
|
||||
|
||||
val result = ArrayList<ByteKey>()
|
||||
readKeysInto(readBlock(rootBlockIndex), result)
|
||||
return result
|
||||
}
|
||||
|
||||
override fun findAllKeys(): CompletableFuture<List<ByteKey>> {
|
||||
return CompletableFuture.supplyAsync(Supplier {
|
||||
doFindAllKeys()
|
||||
}, carrier)
|
||||
}
|
||||
|
||||
private fun doWriteData(key: ByteKey, data: ByteArray, offset: Int, length: Int) {
|
||||
if (rootBlockIndex == INVALID_BLOCK_INDEX) {
|
||||
// create LEAF node for root
|
||||
val block = allocBlock(BlockType.LEAF)
|
||||
block.write()
|
||||
rootBlockIndex = block.id
|
||||
writeHeader()
|
||||
}
|
||||
|
||||
val block = readBlock(rootBlockIndex)
|
||||
|
||||
if (block.type == BlockType.LEAF) {
|
||||
val leaf = LeafData(block)
|
||||
val (newData, oldData) = leaf.compute(key)
|
||||
|
||||
val stream = DataOutputStream(BlockOutputStream(newData))
|
||||
stream.writeInt(length)
|
||||
stream.write(data, offset, length)
|
||||
stream.close()
|
||||
|
||||
val (newLeaf, oldLeaf) = leaf.flush()
|
||||
|
||||
check(newLeaf.type == BlockType.LEAF)
|
||||
rootBlockIndex = newLeaf.id
|
||||
writeHeader()
|
||||
|
||||
oldLeaf?.freeChain()
|
||||
oldData?.freeChain()
|
||||
|
||||
if (leaf.shouldSplit) {
|
||||
// replace root leaf with root tree
|
||||
val split = leaf.split()
|
||||
val tree = TreeData(allocBlock(BlockType.TREE))
|
||||
|
||||
tree.add(split)
|
||||
tree.flush(true)
|
||||
rootBlockIndex = tree.head.id
|
||||
writeHeader()
|
||||
|
||||
split.original.freeChain()
|
||||
}
|
||||
} else if (block.type == BlockType.FREE) {
|
||||
throw IllegalStateException("Root block is free block")
|
||||
} else if (block.type == BlockType.TREE) {
|
||||
val trees = ArrayList<TreeData>()
|
||||
val blocksToFree = ArrayList<Block>()
|
||||
var currentBlock = block
|
||||
var tree = TreeData(currentBlock)
|
||||
trees.add(tree)
|
||||
|
||||
while (true) {
|
||||
currentBlock = tree.childrenBlock(key)
|
||||
|
||||
if (currentBlock.type == BlockType.TREE) {
|
||||
tree = TreeData(currentBlock)
|
||||
trees.add(tree)
|
||||
} else if (currentBlock.type == BlockType.FREE) {
|
||||
throw IllegalStateException("Hit free block when scanning tree")
|
||||
} else if (currentBlock.type == BlockType.DATA) {
|
||||
throw IllegalStateException("Hit data block when scanning tree")
|
||||
} else if (currentBlock.type == BlockType.LEAF) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
val leaf = LeafData(currentBlock)
|
||||
val (newData, oldData) = leaf.compute(key)
|
||||
|
||||
if (oldData != null) blocksToFree.add(oldData)
|
||||
|
||||
val stream = DataOutputStream(BlockOutputStream(newData))
|
||||
stream.writeInt(length)
|
||||
stream.write(data, offset, length)
|
||||
stream.close()
|
||||
|
||||
if (leaf.shouldSplit) {
|
||||
// splitting is required, which may split upstream trees
|
||||
var split = leaf.split()
|
||||
blocksToFree.add(split.original)
|
||||
|
||||
val itr = trees.asReversed().iterator()
|
||||
|
||||
for (uptree in itr) {
|
||||
uptree.add(split)
|
||||
|
||||
if (uptree.shouldSplit) {
|
||||
// split subtree
|
||||
split = uptree.split()
|
||||
blocksToFree.add(split.original)
|
||||
|
||||
if (!itr.hasNext()) {
|
||||
// splitted main tree
|
||||
val newMainTree = TreeData(allocBlock(BlockType.TREE))
|
||||
newMainTree.add(split)
|
||||
trees.add(0, newMainTree)
|
||||
newMainTree.flush(true)
|
||||
|
||||
break
|
||||
}
|
||||
} else {
|
||||
// splitting has finished, only update parent trees pointers
|
||||
|
||||
val (n, o) = uptree.flush()
|
||||
blocksToFree.add(o!!)
|
||||
|
||||
var newBlock = n.id
|
||||
var oldBlock = o.id
|
||||
|
||||
for (uptree2 in itr) {
|
||||
uptree2.update(oldBlock, newBlock)
|
||||
val (n, o) = uptree2.flush()
|
||||
blocksToFree.add(o!!)
|
||||
newBlock = n.id
|
||||
oldBlock = o.id
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// no splitting required, simply construct new tree by updating pointers
|
||||
val (newLeaf, oldLeaf) = leaf.flush()
|
||||
check(newLeaf.type == BlockType.LEAF)
|
||||
checkNotNull(oldLeaf)
|
||||
|
||||
blocksToFree.add(oldLeaf)
|
||||
var newBlock = newLeaf.id
|
||||
var oldBlock = oldLeaf.id
|
||||
|
||||
for (uptree in trees.asReversed()) {
|
||||
uptree.update(oldBlock, newBlock)
|
||||
val (n, o) = uptree.flush()
|
||||
blocksToFree.add(o!!)
|
||||
newBlock = n.id
|
||||
oldBlock = o.id
|
||||
}
|
||||
}
|
||||
|
||||
/*trees.first().check(LongConsumer {
|
||||
if (blocksToFree.any { b -> b.id == it }) {
|
||||
throw IllegalStateException("Tree is referencing dead blocks")
|
||||
}
|
||||
})*/
|
||||
|
||||
rootBlockIndex = trees.first().head.id
|
||||
reader.channel.force(true)
|
||||
writeHeader()
|
||||
reader.channel.force(true)
|
||||
blocksToFree.forEach { it.freeChain() }
|
||||
}
|
||||
}
|
||||
|
||||
override fun write(key: ByteKey, value: ByteArray, offset: Int, length: Int): CompletableFuture<Void> {
|
||||
return CompletableFuture.runAsync(Runnable {
|
||||
doWriteData(key, value, offset, length)
|
||||
}, carrier)
|
||||
}
|
||||
|
||||
private var nextBlockID = (reader.channel.size().coerceAtLeast(blockSize.toLong()) - blockSize.toLong()) / blockSize
|
||||
|
||||
private fun allocBlock(type: BlockType): Block {
|
||||
if (freeBlockList == INVALID_BLOCK_INDEX) {
|
||||
// no free blocks, enlarge the file
|
||||
val newID = nextBlockID++
|
||||
val block = Block(newID)
|
||||
block.type = type
|
||||
block.nextBlock = INVALID_BLOCK_INDEX
|
||||
// block.write()
|
||||
return block
|
||||
} else {
|
||||
// we have free block
|
||||
val block = readBlock(freeBlockList)
|
||||
check(block.type == BlockType.FREE) { "Expected block $freeBlockList to be empty, got ${block.type}" }
|
||||
val old = freeBlockList
|
||||
freeBlockList = block.nextBlock
|
||||
check(freeBlockList != old) { "$freeBlockList == $old!!!" }
|
||||
writeHeader()
|
||||
block.nextBlock = INVALID_BLOCK_INDEX
|
||||
block.type = type
|
||||
return block
|
||||
}
|
||||
}
|
||||
|
||||
private fun readBlock(id: Long): Block {
|
||||
val block = Block(id)
|
||||
block.read()
|
||||
return block
|
||||
}
|
||||
|
||||
// from smallest to biggest
|
||||
private inner class TreeData(head: Block) {
|
||||
var head: Block = head
|
||||
private set
|
||||
|
||||
private var rightMostBlock: Long
|
||||
private val children = Object2LongAVLTreeMap<ByteKey>()
|
||||
private var isDirty = false
|
||||
|
||||
val size: Int
|
||||
get() = children.size
|
||||
|
||||
val shouldSplit: Boolean
|
||||
get() = size >= 8 && (children.keys.stream().mapToInt { it.size + 8 }.sum() + 8) >= (blockSize - 9)
|
||||
|
||||
fun children(): LongStream {
|
||||
return LongStream.concat(children.values.longStream(), LongStream.of(rightMostBlock))
|
||||
}
|
||||
|
||||
init {
|
||||
check(head.type == BlockType.TREE) { "TreeData was constructed with ${head.type} block" }
|
||||
children.defaultReturnValue(INVALID_BLOCK_INDEX)
|
||||
val reader = DataInputStream(BlockInputStream(head))
|
||||
val entryCount = reader.readVarInt()
|
||||
|
||||
for (i in 0 until entryCount) {
|
||||
val key = reader.readByteKey()
|
||||
val block = reader.readLong()
|
||||
children.put(key, block)
|
||||
}
|
||||
|
||||
rightMostBlock = reader.readLong()
|
||||
}
|
||||
|
||||
fun flush(inPlace: Boolean = false): Pair<Block, Block?> {
|
||||
if (!isDirty) return head to null
|
||||
val new = if (inPlace) head else allocBlock(BlockType.TREE)
|
||||
|
||||
val stream = DataOutputStream(BlockOutputStream(new))
|
||||
stream.writeVarInt(children.size)
|
||||
|
||||
for ((key, block) in children) {
|
||||
key.write(stream)
|
||||
stream.writeLong(block)
|
||||
}
|
||||
|
||||
stream.writeLong(rightMostBlock)
|
||||
stream.close()
|
||||
check(new.type == BlockType.TREE)
|
||||
isDirty = false
|
||||
|
||||
if (inPlace) {
|
||||
return head to null
|
||||
} else {
|
||||
val old = head
|
||||
head = new
|
||||
return new to old
|
||||
}
|
||||
}
|
||||
|
||||
private fun intersects(other: TreeData): Boolean {
|
||||
return other.rightMostBlock == rightMostBlock || rightMostBlock in other.children.values || children.values.any { it in other.children.values }
|
||||
}
|
||||
|
||||
fun split(): SplitResult {
|
||||
val keys = ObjectArrayList(children.keys)
|
||||
val middle = keys.size / 2
|
||||
val median = keys[middle]
|
||||
|
||||
val tree0 = TreeData(allocBlock(BlockType.TREE))
|
||||
val tree1 = TreeData(allocBlock(BlockType.TREE))
|
||||
|
||||
tree0.rightMostBlock = children.getLong(median)
|
||||
tree1.rightMostBlock = rightMostBlock
|
||||
|
||||
check(tree0.rightMostBlock != INVALID_BLOCK_INDEX)
|
||||
|
||||
for (i in 0 until middle) {
|
||||
tree0.children[keys[i]] = children.getLong(keys[i])
|
||||
}
|
||||
|
||||
for (i in middle + 1 until keys.size) {
|
||||
tree1.children[keys[i]] = children.getLong(keys[i])
|
||||
}
|
||||
|
||||
check(!tree0.intersects(tree1)) { "tree split consistency check failed" }
|
||||
check(!tree1.intersects(tree0)) { "tree split consistency check failed" }
|
||||
|
||||
tree0.isDirty = true
|
||||
tree1.isDirty = true
|
||||
|
||||
tree0.flush(true)
|
||||
tree1.flush(true)
|
||||
|
||||
return SplitResult(tree0.head, tree1.head, median, this.head)
|
||||
}
|
||||
|
||||
fun childrenBlock(key: ByteKey): Block {
|
||||
for ((rkey, block) in children) {
|
||||
if (key < rkey) {
|
||||
return readBlock(block)
|
||||
}
|
||||
}
|
||||
|
||||
return readBlock(rightMostBlock)
|
||||
}
|
||||
|
||||
fun add(data: SplitResult) {
|
||||
if (children.isEmpty()) {
|
||||
// new tree
|
||||
rightMostBlock = data.right.id
|
||||
children[data.median] = data.left.id
|
||||
} else if (rightMostBlock == data.original.id) {
|
||||
children[data.median] = data.left.id
|
||||
rightMostBlock = data.right.id
|
||||
} else {
|
||||
val splitPoint = children.entries.find { it.value == data.original.id }
|
||||
checkNotNull(splitPoint) { "Tree is inconsistent: unable to locate children split point (median: ${data.median}; old children block: ${data.original.id})" }
|
||||
val key = splitPoint.key
|
||||
children[key] = data.right.id
|
||||
children[data.median] = data.left.id
|
||||
}
|
||||
|
||||
isDirty = true
|
||||
}
|
||||
|
||||
fun check(blockIDVisitor: LongConsumer, stack: LongArraySet = LongArraySet()) {
|
||||
if (!stack.add(head.id)) {
|
||||
throw IllegalStateException("Cyclic block reference in search tree (${stack.joinToString(" -> ", transform = { readBlock(it).toString() })} -> $head)")
|
||||
}
|
||||
|
||||
try {
|
||||
blockIDVisitor.accept(head.id)
|
||||
} catch (err: Throwable) {
|
||||
throw IllegalStateException("Tree consistency check failed on path ${stack.joinToString(" -> ", transform = { readBlock(it).toString() })}", err)
|
||||
}
|
||||
|
||||
val ids = LongArrayList(children.values)
|
||||
ids.add(rightMostBlock)
|
||||
|
||||
for (block in ids) {
|
||||
val read = readBlock(block)
|
||||
|
||||
if (read.type == BlockType.LEAF) {
|
||||
LeafData(read).check(blockIDVisitor, stack)
|
||||
} else if (read.type == BlockType.TREE) {
|
||||
TreeData(read).check(blockIDVisitor, stack)
|
||||
} else if (read.type == BlockType.FREE) {
|
||||
throw IllegalStateException("Hit free block while checking tree consistency")
|
||||
} else if (read.type == BlockType.DATA) {
|
||||
throw IllegalStateException("Hit data block while checking tree consistency")
|
||||
}
|
||||
}
|
||||
|
||||
stack.remove(head.id)
|
||||
}
|
||||
|
||||
fun update(oldBlock: Long, newBlock: Long) {
|
||||
if (rightMostBlock == oldBlock) {
|
||||
rightMostBlock = newBlock
|
||||
isDirty = true
|
||||
|
||||
check(!children.values.any { it == oldBlock }) { "Tree contains duplicated entries" }
|
||||
} else {
|
||||
var hit = false
|
||||
|
||||
for ((key, block) in children.object2LongEntrySet()) {
|
||||
if (block == oldBlock) {
|
||||
children[key] = newBlock
|
||||
hit = true
|
||||
isDirty = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if (!hit) {
|
||||
throw IllegalStateException("Can't find block $oldBlock in tree")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* left - less than median
|
||||
* right - greater or equal to median
|
||||
*/
|
||||
private data class SplitResult(val left: Block, val right: Block, val median: ByteKey, val original: Block)
|
||||
|
||||
private inner class LeafData(head: Block) {
|
||||
var head: Block = head
|
||||
private set
|
||||
|
||||
private val keys = Object2LongAVLTreeMap<ByteKey>()
|
||||
private var isDirty = false
|
||||
|
||||
fun keys(): Stream<Pair<ByteKey, Long>> {
|
||||
return keys.entries.stream().map { it.key to it.value }
|
||||
}
|
||||
|
||||
init {
|
||||
check(head.type == BlockType.LEAF) { "LeafData was constructed with ${head.type} block" }
|
||||
keys.defaultReturnValue(INVALID_BLOCK_INDEX)
|
||||
val stream = DataInputStream(BlockInputStream(head))
|
||||
val amount = stream.readVarInt()
|
||||
|
||||
for (i in 0 until amount) {
|
||||
val key = stream.readByteKey()
|
||||
val block = stream.readLong()
|
||||
keys[key] = block
|
||||
}
|
||||
}
|
||||
|
||||
val size: Int
|
||||
get() = keys.size
|
||||
|
||||
val shouldSplit: Boolean
|
||||
get() = size >= 8 && keys.keys.stream().mapToInt { it.size + 8 }.sum() >= (blockSize - 9)
|
||||
|
||||
fun split(): SplitResult {
|
||||
val keys = ObjectArrayList(keys.keys)
|
||||
val median = keys[keys.size / 2]
|
||||
|
||||
val leaf0 = LeafData(allocBlock(BlockType.LEAF))
|
||||
val leaf1 = LeafData(allocBlock(BlockType.LEAF))
|
||||
|
||||
for (i in 0 until keys.size / 2) {
|
||||
val key = keys[i]
|
||||
leaf0.keys.put(key, this.keys.getLong(key))
|
||||
}
|
||||
|
||||
for (i in keys.size / 2 until keys.size) {
|
||||
val key = keys[i]
|
||||
leaf1.keys.put(key, this.keys.getLong(key))
|
||||
}
|
||||
|
||||
leaf0.isDirty = true
|
||||
leaf1.isDirty = true
|
||||
|
||||
leaf0.flush(true)
|
||||
leaf1.flush(true)
|
||||
|
||||
return SplitResult(leaf0.head, leaf1.head, median, this.head)
|
||||
}
|
||||
|
||||
fun check(blockIDVisitor: LongConsumer, stack: LongArraySet = LongArraySet()) {
|
||||
if (!stack.add(head.id)) {
|
||||
throw IllegalStateException("Cyclic block reference in search tree (${stack.joinToString(" -> ", transform = { readBlock(it).toString() })} -> $head)")
|
||||
}
|
||||
|
||||
try {
|
||||
blockIDVisitor.accept(head.id)
|
||||
} catch (err: Throwable) {
|
||||
throw IllegalStateException("Tree consistency check failed on path ${stack.joinToString(" -> ", transform = { readBlock(it).toString() })}", err)
|
||||
}
|
||||
|
||||
for (block in keys.values) {
|
||||
val read = readBlock(block)
|
||||
|
||||
try {
|
||||
blockIDVisitor.accept(block)
|
||||
} catch (err: Throwable) {
|
||||
throw IllegalStateException("Tree consistency check failed on path ${stack.joinToString(" -> ", transform = { readBlock(it).toString() })} -> $read", err)
|
||||
}
|
||||
|
||||
if (read.type != BlockType.DATA) {
|
||||
throw IllegalStateException("Hit ${read.type} block while checking leaf consistency")
|
||||
}
|
||||
}
|
||||
|
||||
stack.remove(head.id)
|
||||
}
|
||||
|
||||
fun remove(key: ByteKey): Block? {
|
||||
val block = keys.removeLong(key)
|
||||
|
||||
if (block != INVALID_BLOCK_INDEX) {
|
||||
isDirty = true
|
||||
return readBlock(block)
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
fun compute(key: ByteKey): Pair<Block, Block?> {
|
||||
val get = keys.getLong(key)
|
||||
val old = if (get == INVALID_BLOCK_INDEX) null else readBlock(get)
|
||||
val alloc = allocBlock(BlockType.DATA)
|
||||
alloc.write()
|
||||
isDirty = true
|
||||
keys.put(key, alloc.id)
|
||||
return alloc to old
|
||||
}
|
||||
|
||||
fun get(key: ByteKey): Block? {
|
||||
val get = keys.getLong(key)
|
||||
if (get == INVALID_BLOCK_INDEX) return null
|
||||
return readBlock(get)
|
||||
}
|
||||
|
||||
// current to old
|
||||
fun flush(inPlace: Boolean = false): Pair<Block, Block?> {
|
||||
if (!isDirty) return head to null
|
||||
val new = if (inPlace) head else allocBlock(BlockType.LEAF)
|
||||
val stream = DataOutputStream(BlockOutputStream(new))
|
||||
stream.writeVarInt(keys.size)
|
||||
|
||||
for ((key, block) in keys) {
|
||||
key.write(stream)
|
||||
stream.writeLong(block)
|
||||
}
|
||||
|
||||
stream.close()
|
||||
isDirty = false
|
||||
check(new.type == BlockType.LEAF)
|
||||
|
||||
if (inPlace) {
|
||||
return head to null
|
||||
} else {
|
||||
val old = head
|
||||
head = new
|
||||
return new to old
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private inner class BlockInputStream(head: Block) : InputStream() {
|
||||
private var currentBlock = head
|
||||
private var currentOffset = 9
|
||||
private var isFinished = false
|
||||
|
||||
override fun read(): Int {
|
||||
if (isFinished) return -1
|
||||
|
||||
if (currentOffset < blockSize) {
|
||||
return currentBlock.bytes[currentOffset++].toInt() and 0xFF
|
||||
} else {
|
||||
if (currentBlock.nextBlock == INVALID_BLOCK_INDEX) {
|
||||
isFinished = true
|
||||
return -1
|
||||
}
|
||||
|
||||
currentBlock = readBlock(currentBlock.nextBlock)
|
||||
currentOffset = 9
|
||||
return read()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private inner class BlockOutputStream(head: Block) : OutputStream() {
|
||||
private val blocks = ArrayList<Block>()
|
||||
private var currentBlock = head
|
||||
private var currentOffset = 9
|
||||
|
||||
init {
|
||||
blocks.add(head)
|
||||
}
|
||||
|
||||
override fun flush() {
|
||||
blocks.forEach { it.write() }
|
||||
blocks.clear()
|
||||
blocks.add(currentBlock)
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
flush()
|
||||
val next = currentBlock.nextBlock
|
||||
|
||||
if (next != INVALID_BLOCK_INDEX) {
|
||||
readBlock(next).freeChain()
|
||||
currentBlock.nextBlock = INVALID_BLOCK_INDEX
|
||||
currentBlock.write()
|
||||
}
|
||||
}
|
||||
|
||||
private fun nextBlock() {
|
||||
val alloc = allocBlock(currentBlock.type)
|
||||
currentBlock.nextBlock = alloc.id
|
||||
blocks.add(alloc)
|
||||
currentOffset = 9
|
||||
currentBlock = alloc
|
||||
}
|
||||
|
||||
override fun write(b: ByteArray, off: Int, len: Int) {
|
||||
Objects.checkFromIndexSize(off, len, b.size)
|
||||
var written = 0
|
||||
|
||||
while (written < len) {
|
||||
if (currentOffset >= blockSize) {
|
||||
nextBlock()
|
||||
}
|
||||
|
||||
val writable = (len - written).coerceAtMost(blockSize - currentOffset)
|
||||
System.arraycopy(b, off + written, currentBlock.bytes, currentOffset, writable)
|
||||
written += writable
|
||||
currentOffset += writable
|
||||
}
|
||||
}
|
||||
|
||||
override fun write(b: Int) {
|
||||
if (currentOffset < blockSize) {
|
||||
currentBlock.bytes[currentOffset++] = b.toByte()
|
||||
} else {
|
||||
nextBlock()
|
||||
currentBlock.bytes[currentOffset++] = b.toByte()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private inner class Block {
|
||||
val bytes: ByteArray
|
||||
|
||||
constructor(id: Long) {
|
||||
this.id = id
|
||||
bytes = ByteArray(blockSize)
|
||||
}
|
||||
|
||||
constructor(copy: Block, id: Long) {
|
||||
this.id = id
|
||||
bytes = copy.bytes.clone()
|
||||
}
|
||||
|
||||
override fun toString(): String {
|
||||
return "BTreeDB6.Block[$id, $type]"
|
||||
}
|
||||
|
||||
val id: Long
|
||||
val offset: Long get() = 4096L + id * blockSize
|
||||
|
||||
var type: BlockType
|
||||
get() = BlockType.e[bytes[0].toInt()]
|
||||
set(value) { bytes[0] = value.ordinal.toByte() }
|
||||
|
||||
var nextBlock: Long
|
||||
get() {
|
||||
val backing = bytes
|
||||
|
||||
val v0 = (backing[1 + 0].toLong() and 0xFFL) shl 56
|
||||
val v1 = (backing[1 + 1].toLong() and 0xFFL) shl 48
|
||||
val v2 = (backing[1 + 2].toLong() and 0xFFL) shl 40
|
||||
val v3 = (backing[1 + 3].toLong() and 0xFFL) shl 32
|
||||
val v4 = (backing[1 + 4].toLong() and 0xFFL) shl 24
|
||||
val v5 = (backing[1 + 5].toLong() and 0xFFL) shl 16
|
||||
val v6 = (backing[1 + 6].toLong() and 0xFFL) shl 8
|
||||
val v7 = (backing[1 + 7].toLong() and 0xFFL)
|
||||
|
||||
return v0 or v1 or v2 or v3 or v4 or v5 or v6 or v7
|
||||
}
|
||||
set(value) {
|
||||
val backing = bytes
|
||||
|
||||
backing[1 + 0] = ((value ushr 56) and 0xFFL).toByte()
|
||||
backing[1 + 1] = ((value ushr 48) and 0xFFL).toByte()
|
||||
backing[1 + 2] = ((value ushr 40) and 0xFFL).toByte()
|
||||
backing[1 + 3] = ((value ushr 32) and 0xFFL).toByte()
|
||||
backing[1 + 4] = ((value ushr 24) and 0xFFL).toByte()
|
||||
backing[1 + 5] = ((value ushr 16) and 0xFFL).toByte()
|
||||
backing[1 + 6] = ((value ushr 8) and 0xFFL).toByte()
|
||||
backing[1 + 7] = ((value) and 0xFFL).toByte()
|
||||
}
|
||||
|
||||
fun read() {
|
||||
reader.seek(offset)
|
||||
reader.readFully(bytes)
|
||||
}
|
||||
|
||||
fun write() {
|
||||
reader.seek(offset)
|
||||
reader.write(bytes)
|
||||
}
|
||||
|
||||
fun free() {
|
||||
if (type == BlockType.FREE)
|
||||
throw IllegalStateException("Block $id is already free")
|
||||
|
||||
bytes.fill(0)
|
||||
type = BlockType.FREE
|
||||
nextBlock = freeBlockList
|
||||
freeBlockList = id
|
||||
write()
|
||||
writeHeader()
|
||||
}
|
||||
|
||||
fun freeChain() {
|
||||
var next = nextBlock
|
||||
free()
|
||||
|
||||
while (next != INVALID_BLOCK_INDEX) {
|
||||
val block = readBlock(next)
|
||||
next = block.nextBlock
|
||||
block.free()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
const val INVALID_BLOCK_INDEX = -1L
|
||||
|
||||
/**
|
||||
* Creates a new [BTreeDB6] file, with block size specified as [blockSize].
|
||||
*
|
||||
* Ideally, block size should match smallest unit of work done by file system, which is typically 4 KiB, or 4096 bytes.
|
||||
* While arbitrary block size is supported (128 bytes and up, until [Int.MAX_VALUE]), be aware that each I/O operation BTreeDB6
|
||||
* performs will always read/write full block. So, block size of BTreeDB6 is smallest unit of work for that database.
|
||||
*/
|
||||
@JvmStatic
|
||||
@JvmOverloads
|
||||
fun create(file: File, pool: Executor, blockSize: Int = 4096): BTreeDB6 {
|
||||
require(blockSize >= 128) { "Degenerate block size: $blockSize" }
|
||||
|
||||
if (file.exists())
|
||||
throw FileAlreadyExistsException(file)
|
||||
|
||||
val writer = RandomAccessFile(file, "rw")
|
||||
|
||||
writer.write("BTreeDB6".toByteArray()) // specify version as 6 to prohibit original game from opening
|
||||
// and corrupting the file
|
||||
|
||||
// block size
|
||||
writer.writeInt(blockSize)
|
||||
|
||||
writer.writeLong(INVALID_BLOCK_INDEX) // first free block index
|
||||
writer.writeLong(INVALID_BLOCK_INDEX) // tree root block index
|
||||
|
||||
return BTreeDB6(file, writer, pool)
|
||||
}
|
||||
}
|
||||
}
|
55
src/main/kotlin/ru/dbotthepony/kommons/io/ByteKey.kt
Normal file
55
src/main/kotlin/ru/dbotthepony/kommons/io/ByteKey.kt
Normal file
@ -0,0 +1,55 @@
|
||||
package ru.dbotthepony.kommons.io
|
||||
|
||||
import java.io.InputStream
|
||||
import java.io.OutputStream
|
||||
|
||||
class ByteKey(private vararg val bytes: Byte) : Comparable<ByteKey> {
|
||||
constructor(key: String) : this(*key.toByteArray())
|
||||
|
||||
override fun equals(other: Any?): Boolean {
|
||||
return this === other || other is ByteKey && other.bytes.contentEquals(bytes)
|
||||
}
|
||||
|
||||
val size: Int
|
||||
get() = bytes.size
|
||||
|
||||
operator fun get(index: Int): Byte {
|
||||
return bytes[index]
|
||||
}
|
||||
|
||||
fun write(stream: OutputStream) {
|
||||
stream.writeVarInt(bytes.size)
|
||||
stream.write(bytes)
|
||||
}
|
||||
|
||||
fun writeRaw(stream: OutputStream) {
|
||||
stream.write(bytes)
|
||||
}
|
||||
|
||||
fun toByteArray(): ByteArray {
|
||||
return bytes.clone()
|
||||
}
|
||||
|
||||
override fun hashCode(): Int {
|
||||
return bytes.contentHashCode()
|
||||
}
|
||||
|
||||
override fun toString(): String {
|
||||
return "ByteKey[${bytes.map { it.toInt() and 0xFF }.joinToString(", ")}]"
|
||||
}
|
||||
|
||||
override fun compareTo(other: ByteKey): Int {
|
||||
val cmp = size.compareTo(other.size)
|
||||
if (cmp != 0) return cmp
|
||||
|
||||
for (i in bytes.indices) {
|
||||
if (bytes[i].toInt() and 0xFF > other.bytes[i].toInt() and 0xFF) {
|
||||
return 1
|
||||
} else if (bytes[i].toInt() and 0xFF < other.bytes[i].toInt() and 0xFF) {
|
||||
return -1
|
||||
}
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
}
|
@ -336,3 +336,11 @@ fun InputStream.readByteArray(maxRead: Int = Int.MAX_VALUE): ByteArray {
|
||||
throw IllegalArgumentException("Trying to read $size bytes when $maxRead is the max allowed")
|
||||
return readOrError(ByteArray(size))
|
||||
}
|
||||
|
||||
fun InputStream.readByteKey(): ByteKey {
|
||||
return ByteKey(*ByteArray(readVarInt()).also { read(it) })
|
||||
}
|
||||
|
||||
fun InputStream.readByteKeyRaw(size: Int): ByteKey {
|
||||
return ByteKey(*ByteArray(size).also { read(it) })
|
||||
}
|
||||
|
@ -322,3 +322,11 @@ fun OutputStream.writeByteArray(value: ByteArray) {
|
||||
writeVarInt(value.size)
|
||||
write(value)
|
||||
}
|
||||
|
||||
fun OutputStream.writeByteKey(key: ByteKey) {
|
||||
key.write(this)
|
||||
}
|
||||
|
||||
fun OutputStream.writeRawByteKey(key: ByteKey) {
|
||||
key.writeRaw(this)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user