diff --git a/gradle.properties b/gradle.properties index d84e7a1..3df6892 100644 --- a/gradle.properties +++ b/gradle.properties @@ -4,7 +4,7 @@ kotlin.code.style=official specifyKotlinAsDependency=false projectGroup=ru.dbotthepony.kommons -projectVersion=2.3.3 +projectVersion=2.4.0 guavaDepVersion=33.0.0 gsonDepVersion=2.8.9 diff --git a/src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB6.kt b/src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB6.kt index 519accb..c63ebbe 100644 --- a/src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB6.kt +++ b/src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB6.kt @@ -1,9 +1,12 @@ package ru.dbotthepony.kommons.io -import it.unimi.dsi.fastutil.longs.LongArrayList -import it.unimi.dsi.fastutil.longs.LongArraySet -import it.unimi.dsi.fastutil.objects.Object2LongAVLTreeMap +import it.unimi.dsi.fastutil.ints.IntArrayList +import it.unimi.dsi.fastutil.ints.IntArraySet +import it.unimi.dsi.fastutil.ints.IntOpenHashSet +import it.unimi.dsi.fastutil.objects.Object2IntAVLTreeMap import it.unimi.dsi.fastutil.objects.ObjectArrayList +import it.unimi.dsi.fastutil.objects.ObjectArraySet +import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet import ru.dbotthepony.kommons.util.CarriedExecutor import ru.dbotthepony.kommons.util.KOptional import java.io.DataInputStream @@ -16,10 +19,11 @@ import java.util.* import java.util.concurrent.CompletableFuture import java.util.concurrent.Executor import java.util.concurrent.TimeUnit -import java.util.function.LongConsumer +import java.util.function.IntConsumer import java.util.function.Supplier -import java.util.stream.LongStream +import java.util.stream.IntStream import java.util.stream.Stream +import kotlin.collections.ArrayList private fun readHeader(reader: RandomAccessFile, required: Char) { val read = reader.read() @@ -40,18 +44,29 @@ private fun readHeader(reader: RandomAccessFile, required: Char) { * block size, rebalancing isn't required since even with hundreds of thousands keys tree * height remains relatively small. * - * This storage does not employ any form of defragmentation, so read/write operations can experience - * degraded performance after prolonged read/writes of blobs which are larger than block size. + * This storage does not employ any form of defragmentation, but writes try to allocate + * (reasonably sized) continuous blocks, to avoid severe internal fragmentation. * * Specified [Executor] is utilized to execute actual I/O operations. * + * [sync] determines whenever writes should be performed atomically. If [sync] is false, in event of + * JVM crash, host system crash or power loss, there is a risk of file becoming unreadable, unless underlying filesystem + * and/or device ensures writes to file are performed in the same order as they were issued by code. + * If performance is priority, specifying sync as false will yield vastly better performance for writes. + * * While it might be tempting to use [java.util.concurrent.ForkJoinPool.commonPool] here, keep in mind that * common ForkJoinPool is meant ot be used by calculation-intense operations. [BTreeDB6] is, on the other hand, * is I/O intense, and can stall executor threads. A good Executor here would be one that can spawn a lot of threads, * for example, [java.util.concurrent.Executors.newCachedThreadPool]. */ -class BTreeDB6 private constructor(override val file: File, private val reader: RandomAccessFile, pool: Executor) : ByteDataBTreeDB() { - constructor(file: File, pool: Executor) : this(file, RandomAccessFile(file, "rws"), pool) +// TODO: Checksums (TREE/LEAF/DATA) +// TODO: Second tree (TREE/LEAF redundancy) +// TODO: Defragmenting +// TODO: Add keys to DATA blocks, so entire tree can be reconstructed from scratch in event of both trees become unreadable +// TODO: Changeset counter (to determine write-in-progress blocks/trees, to ignore them when reconstructing tree) +// TODO: Tree rotations (rebalancing) +class BTreeDB6 private constructor(override val file: File, private var reader: RandomAccessFile, pool: Executor, private val sync: Boolean) : ByteDataBTreeDB() { + constructor(file: File, pool: Executor, sync: Boolean = true) : this(file, RandomAccessFile(file, "rw"), pool, sync) init { reader.seek(0L) @@ -77,14 +92,30 @@ class BTreeDB6 private constructor(override val file: File, private val reader: } override val blockSize = reader.readInt() + private val effectiveBlockSize = blockSize - 9 init { require(blockSize >= 64) { "Degenerate block size: $blockSize" } } + private var occupiedBlocksBitmap = BitSet() + private val headerOffset = reader.filePointer - private var freeBlockList = reader.readLong() - private var rootBlockIndex = reader.readLong() + private var freeBlockList = reader.readInt() + private var rootBlockIndex = reader.readInt() + private var prohibitAllocation = false + + private val pendingFree = IntOpenHashSet() + + init { + if (freeBlockList != INVALID_BLOCK_INDEX) { + val reader = BlockInputStream(readBlock(freeBlockList)) + val size = reader.readVarInt() + if (size > 0) { + occupiedBlocksBitmap = BitSet.valueOf(reader.readNBytes(size)) + } + } + } private enum class BlockType { // block that is unused @@ -94,37 +125,88 @@ class BTreeDB6 private constructor(override val file: File, private val reader: // block contains exact keys and block pointers LEAF, // contains data length field and data itself - DATA; + DATA, + // block contains free space bitmap + BITMAP; companion object { val e = values() } } + /** + * Called when file is considered unreadable. + * + * This method creates new file, and tries to recover current file's contents + * into new file. + */ + private fun emergency() { + + } + private val headerBuf = ByteArray(16) - private fun writeHeader() { + private fun commit() { + if (sync) reader.channel.force(true) + val blocksToFree = ObjectOpenHashSet() + + for (i in pendingFree) { + val block = Block(i) + block.read() + block.freeChain(blocksToFree) + } + + pendingFree.clear() + + if (freeBlockList != INVALID_BLOCK_INDEX) { + val block = Block(freeBlockList) + block.read() + block.freeChain(blocksToFree) + } + + blocksToFree.forEach { + occupiedBlocksBitmap[it.id] = false + } + + val size = occupiedBlocksBitmap.size() / 8 + 1 + val writer = BlockOutputStream(BlockType.BITMAP) + writer.ensureCapacity(size + 4) + + prohibitAllocation = true + + try { + val bytes = occupiedBlocksBitmap.toByteArray() + writer.writeInt(bytes.size) + writer.write(bytes) + + freeBlockList = writer.head + check(freeBlockList != INVALID_BLOCK_INDEX) + + writer.close() + } finally { + prohibitAllocation = false + } + + if (sync) reader.channel.force(true) + reader.seek(headerOffset) - headerBuf[0] = ((freeBlockList ushr 56) and 0xFFL).toByte() - headerBuf[1] = ((freeBlockList ushr 48) and 0xFFL).toByte() - headerBuf[2] = ((freeBlockList ushr 40) and 0xFFL).toByte() - headerBuf[3] = ((freeBlockList ushr 32) and 0xFFL).toByte() - headerBuf[4] = ((freeBlockList ushr 24) and 0xFFL).toByte() - headerBuf[5] = ((freeBlockList ushr 16) and 0xFFL).toByte() - headerBuf[6] = ((freeBlockList ushr 8) and 0xFFL).toByte() - headerBuf[7] = ((freeBlockList ushr 0) and 0xFFL).toByte() + headerBuf[0] = ((freeBlockList ushr 24) and 0xFF).toByte() + headerBuf[1] = ((freeBlockList ushr 16) and 0xFF).toByte() + headerBuf[2] = ((freeBlockList ushr 8) and 0xFF).toByte() + headerBuf[3] = ((freeBlockList ushr 0) and 0xFF).toByte() - headerBuf[8 + 0] = ((rootBlockIndex ushr 56) and 0xFFL).toByte() - headerBuf[8 + 1] = ((rootBlockIndex ushr 48) and 0xFFL).toByte() - headerBuf[8 + 2] = ((rootBlockIndex ushr 40) and 0xFFL).toByte() - headerBuf[8 + 3] = ((rootBlockIndex ushr 32) and 0xFFL).toByte() - headerBuf[8 + 4] = ((rootBlockIndex ushr 24) and 0xFFL).toByte() - headerBuf[8 + 5] = ((rootBlockIndex ushr 16) and 0xFFL).toByte() - headerBuf[8 + 6] = ((rootBlockIndex ushr 8) and 0xFFL).toByte() - headerBuf[8 + 7] = ((rootBlockIndex ushr 0) and 0xFFL).toByte() + headerBuf[4 + 0] = ((rootBlockIndex ushr 24) and 0xFF).toByte() + headerBuf[4 + 1] = ((rootBlockIndex ushr 16) and 0xFF).toByte() + headerBuf[4 + 2] = ((rootBlockIndex ushr 8) and 0xFF).toByte() + headerBuf[4 + 3] = ((rootBlockIndex ushr 0) and 0xFF).toByte() reader.write(headerBuf) + if (sync) reader.channel.force(true) + + blocksToFree.forEach { + it.free() + } } private fun searchForBlock(key: ByteKey): Block? { @@ -142,13 +224,9 @@ class BTreeDB6 private constructor(override val file: File, private val reader: if (block.type == BlockType.LEAF) { val leaf = LeafData(block) return leaf.get(key) - } else if (block.type == BlockType.DATA) { - throw IllegalStateException("Hit data block when scanning index") - } else if (block.type == BlockType.FREE) { - throw IllegalStateException("Hit free block when scanning index") + } else { + throw IllegalStateException("Hit ${block.type} block while scanning index") } - - return null } private fun doRead(key: ByteKey): KOptional { @@ -220,46 +298,34 @@ class BTreeDB6 private constructor(override val file: File, private val reader: val block = allocBlock(BlockType.LEAF) block.write() rootBlockIndex = block.id - writeHeader() } val block = readBlock(rootBlockIndex) if (block.type == BlockType.LEAF) { val leaf = LeafData(block) - val (newData, oldData) = leaf.compute(key) + val newData = leaf.compute(key, length + 4) val stream = DataOutputStream(BlockOutputStream(newData)) stream.writeInt(length) stream.write(data, offset, length) stream.close() - val (newLeaf, oldLeaf) = leaf.flush() - - check(newLeaf.type == BlockType.LEAF) - rootBlockIndex = newLeaf.id - writeHeader() - - oldLeaf?.freeChain() - oldData?.freeChain() + rootBlockIndex = leaf.write() if (leaf.shouldSplit) { // replace root leaf with root tree val split = leaf.split() - val tree = TreeData(allocBlock(BlockType.TREE)) + val tree = TreeData() tree.add(split) - tree.flush(true) - rootBlockIndex = tree.head.id - writeHeader() - - split.original.freeChain() + tree.write() + rootBlockIndex = tree.write() } - } else if (block.type == BlockType.FREE) { - throw IllegalStateException("Root block is free block") + + commit() } else if (block.type == BlockType.TREE) { val trees = ArrayList() - val blocksToFree = ArrayList() var currentBlock = block var tree = TreeData(currentBlock) trees.add(tree) @@ -270,21 +336,15 @@ class BTreeDB6 private constructor(override val file: File, private val reader: if (currentBlock.type == BlockType.TREE) { tree = TreeData(currentBlock) trees.add(tree) - } else if (currentBlock.type == BlockType.FREE) { - throw IllegalStateException("Hit free block when scanning tree") - } else if (currentBlock.type == BlockType.DATA) { - throw IllegalStateException("Hit data block when scanning tree") } else if (currentBlock.type == BlockType.LEAF) { break + } else { + throw IllegalStateException("Hit ${currentBlock.type} block while scanning tree") } } val leaf = LeafData(currentBlock) - val (newData, oldData) = leaf.compute(key) - - if (oldData != null) blocksToFree.add(oldData) - - val stream = DataOutputStream(BlockOutputStream(newData)) + val stream = DataOutputStream(BlockOutputStream(leaf.compute(key, length + 4))) stream.writeInt(length) stream.write(data, offset, length) stream.close() @@ -292,8 +352,6 @@ class BTreeDB6 private constructor(override val file: File, private val reader: if (leaf.shouldSplit) { // splitting is required, which may split upstream trees var split = leaf.split() - blocksToFree.add(split.original) - val itr = trees.asReversed().iterator() for (uptree in itr) { @@ -302,32 +360,29 @@ class BTreeDB6 private constructor(override val file: File, private val reader: if (uptree.shouldSplit) { // split subtree split = uptree.split() - blocksToFree.add(split.original) if (!itr.hasNext()) { // splitted main tree - val newMainTree = TreeData(allocBlock(BlockType.TREE)) + val newMainTree = TreeData() newMainTree.add(split) trees.add(0, newMainTree) - newMainTree.flush(true) + newMainTree.write() break } } else { // splitting has finished, only update parent trees pointers - val (n, o) = uptree.flush() - blocksToFree.add(o!!) - - var newBlock = n.id - var oldBlock = o.id + var oldBlock = uptree.headBlock + check(oldBlock != INVALID_BLOCK_INDEX) + var newBlock = uptree.write() for (uptree2 in itr) { uptree2.update(oldBlock, newBlock) - val (n, o) = uptree2.flush() - blocksToFree.add(o!!) - newBlock = n.id - oldBlock = o.id + + oldBlock = uptree2.headBlock + check(oldBlock != INVALID_BLOCK_INDEX) + newBlock = uptree2.write() } break @@ -335,20 +390,15 @@ class BTreeDB6 private constructor(override val file: File, private val reader: } } else { // no splitting required, simply construct new tree by updating pointers - val (newLeaf, oldLeaf) = leaf.flush() - check(newLeaf.type == BlockType.LEAF) - checkNotNull(oldLeaf) - - blocksToFree.add(oldLeaf) - var newBlock = newLeaf.id - var oldBlock = oldLeaf.id + var oldBlock = leaf.headBlock + check(oldBlock != INVALID_BLOCK_INDEX) + var newBlock = leaf.write() for (uptree in trees.asReversed()) { uptree.update(oldBlock, newBlock) - val (n, o) = uptree.flush() - blocksToFree.add(o!!) - newBlock = n.id - oldBlock = o.id + oldBlock = uptree.headBlock + check(oldBlock != INVALID_BLOCK_INDEX) + newBlock = uptree.write() } } @@ -358,11 +408,10 @@ class BTreeDB6 private constructor(override val file: File, private val reader: } })*/ - rootBlockIndex = trees.first().head.id - reader.channel.force(true) - writeHeader() - reader.channel.force(true) - blocksToFree.forEach { it.freeChain() } + rootBlockIndex = trees.first().headBlock + commit() + } else { + throw IllegalStateException("Hit ${block.type} block in root tree") } } @@ -372,45 +421,125 @@ class BTreeDB6 private constructor(override val file: File, private val reader: }, carrier) } - private var nextBlockID = (reader.channel.size().coerceAtLeast(blockSize.toLong()) - blockSize.toLong()) / blockSize - private fun allocBlock(type: BlockType): Block { - if (freeBlockList == INVALID_BLOCK_INDEX) { - // no free blocks, enlarge the file - val newID = nextBlockID++ - val block = Block(newID) - block.type = type - block.nextBlock = INVALID_BLOCK_INDEX - // block.write() - return block - } else { - // we have free block - val block = readBlock(freeBlockList) - check(block.type == BlockType.FREE) { "Expected block $freeBlockList to be empty, got ${block.type}" } - val old = freeBlockList - freeBlockList = block.nextBlock - check(freeBlockList != old) { "$freeBlockList == $old!!!" } - writeHeader() - block.nextBlock = INVALID_BLOCK_INDEX - block.type = type - return block + return allocBlocks(type, 1)[0] + } + + private fun allocBlocksByBytes(type: BlockType, size: Int, after: Int = INVALID_BLOCK_INDEX): List { + return allocBlocks(type, (size - 1) / effectiveBlockSize + 1, after) + } + + private fun allocBlocks(type: BlockType, size: Int, after: Int = INVALID_BLOCK_INDEX): List { + require(size > 0) { "Invalid amount of blocks to allocate: $size" } + check(!prohibitAllocation) { "Currently writing free block bitmap, can not allocate any blocks!" } + + // if we need to allocate lots of blocks, don't necessarily go for continuous allocation + // otherwise we might end up with ever-growing file when allocating bigger and bigger blocks. + // example: allocating space for free bitmap + if (size > 128) { + // also ignore 'after' request, since this shouldn't matter at this point + val result = ArrayList(size) + var toAllocate = size + + while (toAllocate > 0) { + val toAlloc = toAllocate.coerceAtMost(128) + result.addAll(allocBlocks(type, toAlloc)) + toAllocate -= toAlloc + } + + return result + } + + if (after != INVALID_BLOCK_INDEX) { + // if we can allocate enough blocks after specified index, allocate there (reduce fragmentation) + var correct = true + val find = after + 1 + + for (i2 in 0 until size) { + if (occupiedBlocksBitmap[i2 + find]) { + correct = false + break + } + } + + if (correct) { + val blocks = ArrayList(size) + + for (i2 in 0 until size) { + occupiedBlocksBitmap[i2 + find] = true + val block = Block(i2 + find) + block.nextBlock = INVALID_BLOCK_INDEX + block.type = type + if (i2 > 0) blocks[i2 - 1].nextBlock = block.id + blocks.add(block) + } + + return blocks + } + } + + var i = 0 + + while (true) { + val find = occupiedBlocksBitmap.nextClearBit(i) + var correct = true + + for (i2 in 0 until size) { + if (occupiedBlocksBitmap[i2 + find]) { + correct = false + i = find + i2 + 1 + break + } + } + + if (correct) { + val blocks = ArrayList(size) + + for (i2 in 0 until size) { + occupiedBlocksBitmap[i2 + find] = true + val block = Block(i2 + find) + block.nextBlock = INVALID_BLOCK_INDEX + block.type = type + blocks.add(block) + } + + return blocks + } } } - private fun readBlock(id: Long): Block { + private fun readBlock(id: Int): Block { val block = Block(id) block.read() return block } // from smallest to biggest - private inner class TreeData(head: Block) { - var head: Block = head + private inner class TreeData() { + constructor(head: Block) : this() { + check(head.type == BlockType.TREE) { "TreeData was constructed with ${head.type} block" } + val reader = DataInputStream(BlockInputStream(head)) + val entryCount = reader.readVarInt() + + for (i in 0 until entryCount) { + val key = reader.readByteKey() + val block = reader.readInt() + children.put(key, block) + } + + rightMostBlock = reader.readInt() + headBlock = head.id + } + + var headBlock: Int = INVALID_BLOCK_INDEX private set - private var rightMostBlock: Long - private val children = Object2LongAVLTreeMap() - private var isDirty = false + private var rightMostBlock: Int = INVALID_BLOCK_INDEX + private val children = Object2IntAVLTreeMap() + + init { + children.defaultReturnValue(INVALID_BLOCK_INDEX) + } val size: Int get() = children.size @@ -418,49 +547,28 @@ class BTreeDB6 private constructor(override val file: File, private val reader: val shouldSplit: Boolean get() = size >= 8 && (children.keys.stream().mapToInt { it.size + 8 }.sum() + 8) >= (blockSize - 9) - fun children(): LongStream { - return LongStream.concat(children.values.longStream(), LongStream.of(rightMostBlock)) + fun children(): IntStream { + return IntStream.concat(children.values.intStream(), IntStream.of(rightMostBlock)) } - init { - check(head.type == BlockType.TREE) { "TreeData was constructed with ${head.type} block" } - children.defaultReturnValue(INVALID_BLOCK_INDEX) - val reader = DataInputStream(BlockInputStream(head)) - val entryCount = reader.readVarInt() + fun write(): Int { + if (headBlock != INVALID_BLOCK_INDEX) pendingFree.add(headBlock) - for (i in 0 until entryCount) { - val key = reader.readByteKey() - val block = reader.readLong() - children.put(key, block) - } - - rightMostBlock = reader.readLong() - } - - fun flush(inPlace: Boolean = false): Pair { - if (!isDirty) return head to null - val new = if (inPlace) head else allocBlock(BlockType.TREE) - - val stream = DataOutputStream(BlockOutputStream(new)) + val bytes = BlockOutputStream(BlockType.TREE) + val stream = DataOutputStream(bytes) stream.writeVarInt(children.size) for ((key, block) in children) { key.write(stream) - stream.writeLong(block) + stream.writeInt(block) } - stream.writeLong(rightMostBlock) + stream.writeInt(rightMostBlock) stream.close() - check(new.type == BlockType.TREE) - isDirty = false - if (inPlace) { - return head to null - } else { - val old = head - head = new - return new to old - } + headBlock = bytes.head + check(headBlock != INVALID_BLOCK_INDEX) + return headBlock } private fun intersects(other: TreeData): Boolean { @@ -472,32 +580,32 @@ class BTreeDB6 private constructor(override val file: File, private val reader: val middle = keys.size / 2 val median = keys[middle] - val tree0 = TreeData(allocBlock(BlockType.TREE)) - val tree1 = TreeData(allocBlock(BlockType.TREE)) + val tree0 = TreeData() + val tree1 = TreeData() - tree0.rightMostBlock = children.getLong(median) + tree0.rightMostBlock = children.getInt(median) tree1.rightMostBlock = rightMostBlock check(tree0.rightMostBlock != INVALID_BLOCK_INDEX) for (i in 0 until middle) { - tree0.children[keys[i]] = children.getLong(keys[i]) + tree0.children[keys[i]] = children.getInt(keys[i]) } for (i in middle + 1 until keys.size) { - tree1.children[keys[i]] = children.getLong(keys[i]) + tree1.children[keys[i]] = children.getInt(keys[i]) } check(!tree0.intersects(tree1)) { "tree split consistency check failed" } check(!tree1.intersects(tree0)) { "tree split consistency check failed" } - tree0.isDirty = true - tree1.isDirty = true + tree0.write() + tree1.write() - tree0.flush(true) - tree1.flush(true) + if (this.headBlock != INVALID_BLOCK_INDEX) + pendingFree.add(this.headBlock) - return SplitResult(tree0.head, tree1.head, median, this.head) + return SplitResult(tree0.headBlock, tree1.headBlock, median, this.headBlock) } fun childrenBlock(key: ByteKey): Block { @@ -513,34 +621,34 @@ class BTreeDB6 private constructor(override val file: File, private val reader: fun add(data: SplitResult) { if (children.isEmpty()) { // new tree - rightMostBlock = data.right.id - children[data.median] = data.left.id - } else if (rightMostBlock == data.original.id) { - children[data.median] = data.left.id - rightMostBlock = data.right.id + rightMostBlock = data.right + children[data.median] = data.left + } else if (rightMostBlock == data.original) { + children[data.median] = data.left + rightMostBlock = data.right } else { - val splitPoint = children.entries.find { it.value == data.original.id } - checkNotNull(splitPoint) { "Tree is inconsistent: unable to locate children split point (median: ${data.median}; old children block: ${data.original.id})" } + val splitPoint = children.entries.find { it.value == data.original } + checkNotNull(splitPoint) { "Tree is inconsistent: unable to locate children split point (median: ${data.median}; old children block: ${data.original})" } val key = splitPoint.key - children[key] = data.right.id - children[data.median] = data.left.id + children[key] = data.right + children[data.median] = data.left } - - isDirty = true } - fun check(blockIDVisitor: LongConsumer, stack: LongArraySet = LongArraySet()) { - if (!stack.add(head.id)) { - throw IllegalStateException("Cyclic block reference in search tree (${stack.joinToString(" -> ", transform = { readBlock(it).toString() })} -> $head)") + fun check(blockIDVisitor: IntConsumer, stack: IntArraySet = IntArraySet()) { + check(headBlock != INVALID_BLOCK_INDEX) { "Tree contains 'unallocated' blocks" } + + if (!stack.add(headBlock)) { + throw IllegalStateException("Cyclic block reference in search tree (${stack.joinToString(" -> ", transform = { readBlock(it).toString() })} -> $headBlock)") } try { - blockIDVisitor.accept(head.id) + blockIDVisitor.accept(headBlock) } catch (err: Throwable) { throw IllegalStateException("Tree consistency check failed on path ${stack.joinToString(" -> ", transform = { readBlock(it).toString() })}", err) } - val ids = LongArrayList(children.values) + val ids = IntArrayList(children.values) ids.add(rightMostBlock) for (block in ids) { @@ -557,23 +665,21 @@ class BTreeDB6 private constructor(override val file: File, private val reader: } } - stack.remove(head.id) + stack.remove(headBlock) } - fun update(oldBlock: Long, newBlock: Long) { + fun update(oldBlock: Int, newBlock: Int) { if (rightMostBlock == oldBlock) { rightMostBlock = newBlock - isDirty = true check(!children.values.any { it == oldBlock }) { "Tree contains duplicated entries" } } else { var hit = false - for ((key, block) in children.object2LongEntrySet()) { + for ((key, block) in children.entries) { if (block == oldBlock) { children[key] = newBlock hit = true - isDirty = true break } } @@ -589,30 +695,34 @@ class BTreeDB6 private constructor(override val file: File, private val reader: * left - less than median * right - greater or equal to median */ - private data class SplitResult(val left: Block, val right: Block, val median: ByteKey, val original: Block) + private data class SplitResult(val left: Int, val right: Int, val median: ByteKey, val original: Int) - private inner class LeafData(head: Block) { - var head: Block = head - private set - - private val keys = Object2LongAVLTreeMap() - private var isDirty = false - - fun keys(): Stream> { - return keys.entries.stream().map { it.key to it.value } - } - - init { + private inner class LeafData() { + constructor(head: Block) : this() { check(head.type == BlockType.LEAF) { "LeafData was constructed with ${head.type} block" } - keys.defaultReturnValue(INVALID_BLOCK_INDEX) val stream = DataInputStream(BlockInputStream(head)) val amount = stream.readVarInt() for (i in 0 until amount) { val key = stream.readByteKey() - val block = stream.readLong() + val block = stream.readInt() keys[key] = block } + + this.headBlock = head.id + } + + var headBlock: Int = INVALID_BLOCK_INDEX + private set + + private val keys = Object2IntAVLTreeMap() + + init { + keys.defaultReturnValue(INVALID_BLOCK_INDEX) + } + + fun keys(): Stream> { + return keys.entries.stream().map { it.key to it.value } } val size: Int @@ -625,35 +735,37 @@ class BTreeDB6 private constructor(override val file: File, private val reader: val keys = ObjectArrayList(keys.keys) val median = keys[keys.size / 2] - val leaf0 = LeafData(allocBlock(BlockType.LEAF)) - val leaf1 = LeafData(allocBlock(BlockType.LEAF)) + val leaf0 = LeafData() + val leaf1 = LeafData() for (i in 0 until keys.size / 2) { val key = keys[i] - leaf0.keys.put(key, this.keys.getLong(key)) + leaf0.keys.put(key, this.keys.getInt(key)) } for (i in keys.size / 2 until keys.size) { val key = keys[i] - leaf1.keys.put(key, this.keys.getLong(key)) + leaf1.keys.put(key, this.keys.getInt(key)) } - leaf0.isDirty = true - leaf1.isDirty = true + leaf0.write() + leaf1.write() - leaf0.flush(true) - leaf1.flush(true) + if (this.headBlock != INVALID_BLOCK_INDEX) + pendingFree.add(this.headBlock) - return SplitResult(leaf0.head, leaf1.head, median, this.head) + return SplitResult(leaf0.headBlock, leaf1.headBlock, median, this.headBlock) } - fun check(blockIDVisitor: LongConsumer, stack: LongArraySet = LongArraySet()) { - if (!stack.add(head.id)) { - throw IllegalStateException("Cyclic block reference in search tree (${stack.joinToString(" -> ", transform = { readBlock(it).toString() })} -> $head)") + fun check(blockIDVisitor: IntConsumer, stack: IntArraySet = IntArraySet()) { + check(headBlock != INVALID_BLOCK_INDEX) { "Tree contains 'unallocated' blocks" } + + if (!stack.add(headBlock)) { + throw IllegalStateException("Cyclic block reference in search tree (${stack.joinToString(" -> ", transform = { readBlock(it).toString() })} -> $headBlock)") } try { - blockIDVisitor.accept(head.id) + blockIDVisitor.accept(headBlock) } catch (err: Throwable) { throw IllegalStateException("Tree consistency check failed on path ${stack.joinToString(" -> ", transform = { readBlock(it).toString() })}", err) } @@ -672,59 +784,52 @@ class BTreeDB6 private constructor(override val file: File, private val reader: } } - stack.remove(head.id) + stack.remove(headBlock) } fun remove(key: ByteKey): Block? { - val block = keys.removeLong(key) + val block = keys.removeInt(key) if (block != INVALID_BLOCK_INDEX) { - isDirty = true return readBlock(block) } return null } - fun compute(key: ByteKey): Pair { - val get = keys.getLong(key) - val old = if (get == INVALID_BLOCK_INDEX) null else readBlock(get) - val alloc = allocBlock(BlockType.DATA) - alloc.write() - isDirty = true - keys.put(key, alloc.id) - return alloc to old + fun compute(key: ByteKey, size: Int): List { + val alloc = allocBlocksByBytes(BlockType.DATA, size) + val old = keys.put(key, alloc.first().id) + + if (old != INVALID_BLOCK_INDEX) + pendingFree.add(old) + + return alloc } fun get(key: ByteKey): Block? { - val get = keys.getLong(key) + val get = keys.getInt(key) if (get == INVALID_BLOCK_INDEX) return null return readBlock(get) } - // current to old - fun flush(inPlace: Boolean = false): Pair { - if (!isDirty) return head to null - val new = if (inPlace) head else allocBlock(BlockType.LEAF) - val stream = DataOutputStream(BlockOutputStream(new)) + fun write(): Int { + if (headBlock != INVALID_BLOCK_INDEX) pendingFree.add(headBlock) + + val bytes = BlockOutputStream(BlockType.LEAF) + val stream = DataOutputStream(bytes) stream.writeVarInt(keys.size) for ((key, block) in keys) { key.write(stream) - stream.writeLong(block) + stream.writeInt(block) } stream.close() - isDirty = false - check(new.type == BlockType.LEAF) - if (inPlace) { - return head to null - } else { - val old = head - head = new - return new to old - } + headBlock = bytes.head + check(headBlock != INVALID_BLOCK_INDEX) + return headBlock } } @@ -751,35 +856,65 @@ class BTreeDB6 private constructor(override val file: File, private val reader: } } - private inner class BlockOutputStream(head: Block) : OutputStream() { + private inner class BlockOutputStream(val type: BlockType) : OutputStream() { + constructor(block: Block) : this(block.type) { + currentBlock = block + head = block.id + blocks.add(block) + } + + constructor(blocks: Collection) : this(blocks.first()) { + val itr = blocks.iterator() + itr.next() + + for (block in itr) { + preallocatedBlocks.add(block) + } + } + private val blocks = ArrayList() - private var currentBlock = head + private val preallocatedBlocks = ArrayList() + private var currentBlock: Block? = null private var currentOffset = 9 - init { - blocks.add(head) + var head = INVALID_BLOCK_INDEX + private set + + fun ensureCapacity(bytes: Int): BlockOutputStream { + if (bytes == 0) return this + require(bytes > 0) { "Negative amount of bytes: $bytes" } + + val alloc = if (currentBlock == null) { + allocBlocks(type, (bytes - 1) / effectiveBlockSize + 1) + } else { + allocBlocks(type, (bytes - 1) / effectiveBlockSize + 1, currentBlock!!.id) + } + + preallocatedBlocks.addAll(alloc) + currentBlock?.nextBlock = alloc.first().id + + return this } override fun flush() { blocks.forEach { it.write() } blocks.clear() - blocks.add(currentBlock) + if (currentBlock != null) blocks.add(currentBlock!!) } override fun close() { flush() - val next = currentBlock.nextBlock - - if (next != INVALID_BLOCK_INDEX) { - readBlock(next).freeChain() - currentBlock.nextBlock = INVALID_BLOCK_INDEX - currentBlock.write() - } } private fun nextBlock() { - val alloc = allocBlock(currentBlock.type) - currentBlock.nextBlock = alloc.id + val alloc = if (preallocatedBlocks.isNotEmpty()) { + preallocatedBlocks.removeAt(0) + } else { + allocBlocks(type, 1, currentBlock?.id ?: INVALID_BLOCK_INDEX)[0] + } + + if (head == INVALID_BLOCK_INDEX) head = alloc.id + currentBlock?.nextBlock = alloc.id blocks.add(alloc) currentOffset = 9 currentBlock = alloc @@ -790,77 +925,64 @@ class BTreeDB6 private constructor(override val file: File, private val reader: var written = 0 while (written < len) { - if (currentOffset >= blockSize) { + if (currentBlock == null || currentOffset >= blockSize) { nextBlock() } val writable = (len - written).coerceAtMost(blockSize - currentOffset) - System.arraycopy(b, off + written, currentBlock.bytes, currentOffset, writable) + System.arraycopy(b, off + written, currentBlock!!.bytes, currentOffset, writable) written += writable currentOffset += writable } } override fun write(b: Int) { - if (currentOffset < blockSize) { - currentBlock.bytes[currentOffset++] = b.toByte() + if (currentBlock != null && currentOffset < blockSize) { + currentBlock!!.bytes[currentOffset++] = b.toByte() } else { nextBlock() - currentBlock.bytes[currentOffset++] = b.toByte() + currentBlock!!.bytes[currentOffset++] = b.toByte() } } } - private inner class Block { - val bytes: ByteArray - - constructor(id: Long) { - this.id = id - bytes = ByteArray(blockSize) + /** + * represents a block in file + */ + private inner class Block(val id: Int) { + init { + require(id >= 0) { "Invalid block ID: $id" } } - constructor(copy: Block, id: Long) { - this.id = id - bytes = copy.bytes.clone() - } + val bytes = ByteArray(blockSize) override fun toString(): String { return "BTreeDB6.Block[$id, $type]" } - val id: Long - val offset: Long get() = 4096L + id * blockSize + val offset: Long get() = (id + 1L) * blockSize var type: BlockType get() = BlockType.e[bytes[0].toInt()] set(value) { bytes[0] = value.ordinal.toByte() } - var nextBlock: Long + var nextBlock: Int get() { val backing = bytes - val v0 = (backing[1 + 0].toLong() and 0xFFL) shl 56 - val v1 = (backing[1 + 1].toLong() and 0xFFL) shl 48 - val v2 = (backing[1 + 2].toLong() and 0xFFL) shl 40 - val v3 = (backing[1 + 3].toLong() and 0xFFL) shl 32 - val v4 = (backing[1 + 4].toLong() and 0xFFL) shl 24 - val v5 = (backing[1 + 5].toLong() and 0xFFL) shl 16 - val v6 = (backing[1 + 6].toLong() and 0xFFL) shl 8 - val v7 = (backing[1 + 7].toLong() and 0xFFL) + val v0 = (backing[1 + 0].toInt() and 0xFF) shl 24 + val v1 = (backing[1 + 1].toInt() and 0xFF) shl 16 + val v2 = (backing[1 + 2].toInt() and 0xFF) shl 8 + val v3 = (backing[1 + 3].toInt() and 0xFF) - return v0 or v1 or v2 or v3 or v4 or v5 or v6 or v7 + return v0 or v1 or v2 or v3 } set(value) { val backing = bytes - - backing[1 + 0] = ((value ushr 56) and 0xFFL).toByte() - backing[1 + 1] = ((value ushr 48) and 0xFFL).toByte() - backing[1 + 2] = ((value ushr 40) and 0xFFL).toByte() - backing[1 + 3] = ((value ushr 32) and 0xFFL).toByte() - backing[1 + 4] = ((value ushr 24) and 0xFFL).toByte() - backing[1 + 5] = ((value ushr 16) and 0xFFL).toByte() - backing[1 + 6] = ((value ushr 8) and 0xFFL).toByte() - backing[1 + 7] = ((value) and 0xFFL).toByte() + backing[1 + 0] = ((value ushr 24) and 0xFF).toByte() + backing[1 + 1] = ((value ushr 16) and 0xFF).toByte() + backing[1 + 2] = ((value ushr 8) and 0xFF).toByte() + backing[1 + 3] = ((value) and 0xFF).toByte() } fun read() { @@ -874,31 +996,35 @@ class BTreeDB6 private constructor(override val file: File, private val reader: } fun free() { - if (type == BlockType.FREE) - throw IllegalStateException("Block $id is already free") - bytes.fill(0) type = BlockType.FREE - nextBlock = freeBlockList - freeBlockList = id + nextBlock = INVALID_BLOCK_INDEX write() - writeHeader() } - fun freeChain() { + fun freeChain(into: MutableCollection) { + check(into.add(this)) var next = nextBlock - free() while (next != INVALID_BLOCK_INDEX) { - val block = readBlock(next) + val block = Block(next) + block.read() next = block.nextBlock - block.free() + check(into.add(block)) } } + + override fun equals(other: Any?): Boolean { + return other === this || other is Block && id == other.id + } + + override fun hashCode(): Int { + return id.hashCode() + } } companion object { - const val INVALID_BLOCK_INDEX = -1L + const val INVALID_BLOCK_INDEX = -1 /** * Creates a new [BTreeDB6] file, with block size specified as [blockSize]. @@ -909,7 +1035,7 @@ class BTreeDB6 private constructor(override val file: File, private val reader: */ @JvmStatic @JvmOverloads - fun create(file: File, pool: Executor, blockSize: Int = 4096): BTreeDB6 { + fun create(file: File, pool: Executor, blockSize: Int = 4096, sync: Boolean = true): BTreeDB6 { require(blockSize >= 128) { "Degenerate block size: $blockSize" } if (file.exists()) @@ -923,10 +1049,10 @@ class BTreeDB6 private constructor(override val file: File, private val reader: // block size writer.writeInt(blockSize) - writer.writeLong(INVALID_BLOCK_INDEX) // first free block index - writer.writeLong(INVALID_BLOCK_INDEX) // tree root block index + writer.writeInt(INVALID_BLOCK_INDEX) // free bitmap block index + writer.writeInt(INVALID_BLOCK_INDEX) // tree root block index - return BTreeDB6(file, writer, pool) + return BTreeDB6(file, writer, pool, sync) } } } diff --git a/src/test/kotlin/ru/dbotthepony/kommons/test/BTreeDB6Tests.kt b/src/test/kotlin/ru/dbotthepony/kommons/test/BTreeDB6Tests.kt new file mode 100644 index 0000000..c6b6be0 --- /dev/null +++ b/src/test/kotlin/ru/dbotthepony/kommons/test/BTreeDB6Tests.kt @@ -0,0 +1,29 @@ +package ru.dbotthepony.kommons.test + +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import ru.dbotthepony.kommons.io.BTreeDB6 +import ru.dbotthepony.kommons.io.ByteKey +import java.io.File +import java.util.concurrent.Executor +import kotlin.test.assertEquals + +object BTreeDB6Tests { + @Test + @DisplayName("BTreeDB6 Tests") + fun test() { + val file = File("dbtest.bdb") + if (file.exists()) file.delete() + val create = BTreeDB6.create(file, Executor { it.run() }, 4096, sync = false) + + for (i in 0 .. 8000) { + val s = "This is key $i" + val k = ByteKey("This is key $i") + println("This is key $i") + create.write(k, s.toByteArray()) + assertEquals(s, String(create.read(k).get().get())) + } + + create.close() + } +} diff --git a/src/test/kotlin/ru/dbotthepony/kommons/test/StreamTests.kt b/src/test/kotlin/ru/dbotthepony/kommons/test/StreamTests.kt index cb7bccb..a4aa707 100644 --- a/src/test/kotlin/ru/dbotthepony/kommons/test/StreamTests.kt +++ b/src/test/kotlin/ru/dbotthepony/kommons/test/StreamTests.kt @@ -11,6 +11,7 @@ import kotlin.test.assertEquals object StreamTests { private fun readWrite(value: Int) { val output = FastByteArrayOutputStream() + output.writeVarInt(value) val input = FastByteArrayInputStream(output.array, 0, output.length) val r = input.readVarIntInfo() assertEquals(value, r.value)