From 748577d9f5f46b08f06b156ffaeda710d1438aea Mon Sep 17 00:00:00 2001 From: DBotThePony Date: Thu, 15 Feb 2024 13:49:42 +0700 Subject: [PATCH] BTreeDB --- gradle.properties | 2 +- .../ru/dbotthepony/kommons/io/BTreeDB.kt | 32 + .../ru/dbotthepony/kommons/io/BTreeDB6.kt | 916 ++++++++++++++++++ .../ru/dbotthepony/kommons/io/ByteKey.kt | 55 ++ .../kommons/io/InputStreamUtils.kt | 8 + .../kommons/io/OutputStreamUtils.kt | 8 + 6 files changed, 1020 insertions(+), 1 deletion(-) create mode 100644 src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB.kt create mode 100644 src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB6.kt create mode 100644 src/main/kotlin/ru/dbotthepony/kommons/io/ByteKey.kt diff --git a/gradle.properties b/gradle.properties index a3318c9..6754a33 100644 --- a/gradle.properties +++ b/gradle.properties @@ -4,7 +4,7 @@ kotlin.code.style=official specifyKotlinAsDependency=false projectGroup=ru.dbotthepony.kommons -projectVersion=2.2.1 +projectVersion=2.3.0 guavaDepVersion=33.0.0 gsonDepVersion=2.8.9 diff --git a/src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB.kt b/src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB.kt new file mode 100644 index 0000000..dd2c442 --- /dev/null +++ b/src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB.kt @@ -0,0 +1,32 @@ +package ru.dbotthepony.kommons.io + +import ru.dbotthepony.kommons.util.KOptional +import java.io.Closeable +import java.io.File +import java.util.concurrent.CompletableFuture + +/** + * For actual implementation, see [BTreeDB6]. + * + * + */ +abstract class BTreeDB : Closeable { + abstract val file: File + abstract val blockSize: Int + + /** + * Reads data at specified [key] + */ + abstract fun read(key: K): CompletableFuture> + abstract fun findAllKeys(): CompletableFuture> + + abstract fun write(key: K, value: V): CompletableFuture<*> +} + +abstract class ByteDataBTreeDB : BTreeDB() { + abstract fun write(key: K, value: ByteArray, offset: Int, length: Int): CompletableFuture<*> + + final override fun write(key: K, value: ByteArray): CompletableFuture<*> { + return write(key, value, 0, value.size) + } +} diff --git a/src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB6.kt b/src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB6.kt new file mode 100644 index 0000000..08fadae --- /dev/null +++ b/src/main/kotlin/ru/dbotthepony/kommons/io/BTreeDB6.kt @@ -0,0 +1,916 @@ +package ru.dbotthepony.kommons.io + +import it.unimi.dsi.fastutil.longs.LongArrayList +import it.unimi.dsi.fastutil.longs.LongArraySet +import it.unimi.dsi.fastutil.objects.Object2LongAVLTreeMap +import it.unimi.dsi.fastutil.objects.ObjectArrayList +import ru.dbotthepony.kommons.util.CarriedExecutor +import ru.dbotthepony.kommons.util.KOptional +import java.io.DataInputStream +import java.io.DataOutputStream +import java.io.File +import java.io.InputStream +import java.io.OutputStream +import java.io.RandomAccessFile +import java.util.* +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executor +import java.util.concurrent.TimeUnit +import java.util.function.LongConsumer +import java.util.function.Supplier +import java.util.stream.LongStream +import java.util.stream.Stream + +private fun readHeader(reader: RandomAccessFile, required: Char) { + val read = reader.read() + require(read.toChar() == required) { "Bad header, expected ${required.code}, got $read" } +} + +/** + * General purpose binary object storage, employing copy-on-write updates. + * + * This database is designed to read/write *small* files, since any read/write operation + * must be able to fully store the file in question in memory. + * + * Smallest unit of work is defined as block. Default block size is 4096 bytes (which includes block header and pointer to next block). + * See [create] for more details. + * + * While attempt at minimizing tree read/writes is made, there is no tree rebalancing, + * so it is possible to force tree to degenerate into linked list. But with relatively large + * block size, rebalancing isn't required since even with hundreds of thousands keys tree + * height remains relatively small. + * + * This storage does not employ any form of defragmentation, so read/write operations can experience + * degraded performance after prolonged read/writes of blobs which are larger than block size. + * + * Specified [Executor] is utilized to execute actual I/O operations. + * + * While it might be tempting to use [java.util.concurrent.ForkJoinPool.commonPool] here, keep in mind that + * common ForkJoinPool is meant ot be used by calculation-intense operations. [BTreeDB6] is, on the other hand, + * is I/O intense, and can stall executor threads. A good Executor here would be one that can spawn a lot of threads, + * for example, [java.util.concurrent.Executors.newCachedThreadPool]. + */ +class BTreeDB6 private constructor(override val file: File, private val reader: RandomAccessFile, pool: Executor) : ByteDataBTreeDB() { + constructor(file: File, pool: Executor) : this(file, RandomAccessFile(file, "rws"), pool) + + private val carrier = CarriedExecutor(pool) + + init { + readHeader(reader, 'B') + readHeader(reader, 'T') + readHeader(reader, 'r') + readHeader(reader, 'e') + readHeader(reader, 'e') + readHeader(reader, 'D') + readHeader(reader, 'B') + readHeader(reader, '6') + } + + override fun close() { + carrier.execute { reader.close() } + carrier.shutdown() + carrier.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS) + } + + override val blockSize = reader.readInt() + + init { + require(blockSize >= 64) { "Degenerate block size: $blockSize" } + } + + private val headerOffset = reader.filePointer + private var freeBlockList = reader.readLong() + private var rootBlockIndex = reader.readLong() + + private enum class BlockType { + // block that is unused + FREE, + // block that is part of search tree, contains relative keys and block pointers to either other trees or leafs + TREE, + // block contains exact keys and block pointers + LEAF, + // contains data length field and data itself + DATA; + + companion object { + val e = values() + } + } + + private val headerBuf = ByteArray(16) + + private fun writeHeader() { + reader.seek(headerOffset) + + headerBuf[0] = ((freeBlockList ushr 56) and 0xFFL).toByte() + headerBuf[1] = ((freeBlockList ushr 48) and 0xFFL).toByte() + headerBuf[2] = ((freeBlockList ushr 40) and 0xFFL).toByte() + headerBuf[3] = ((freeBlockList ushr 32) and 0xFFL).toByte() + headerBuf[4] = ((freeBlockList ushr 24) and 0xFFL).toByte() + headerBuf[5] = ((freeBlockList ushr 16) and 0xFFL).toByte() + headerBuf[6] = ((freeBlockList ushr 8) and 0xFFL).toByte() + headerBuf[7] = ((freeBlockList ushr 0) and 0xFFL).toByte() + + headerBuf[8 + 0] = ((rootBlockIndex ushr 56) and 0xFFL).toByte() + headerBuf[8 + 1] = ((rootBlockIndex ushr 48) and 0xFFL).toByte() + headerBuf[8 + 2] = ((rootBlockIndex ushr 40) and 0xFFL).toByte() + headerBuf[8 + 3] = ((rootBlockIndex ushr 32) and 0xFFL).toByte() + headerBuf[8 + 4] = ((rootBlockIndex ushr 24) and 0xFFL).toByte() + headerBuf[8 + 5] = ((rootBlockIndex ushr 16) and 0xFFL).toByte() + headerBuf[8 + 6] = ((rootBlockIndex ushr 8) and 0xFFL).toByte() + headerBuf[8 + 7] = ((rootBlockIndex ushr 0) and 0xFFL).toByte() + + reader.write(headerBuf) + } + + private fun doRead(key: ByteKey): KOptional { + if (rootBlockIndex == INVALID_BLOCK_INDEX) { + return KOptional.empty() + } + + var block = readBlock(rootBlockIndex) + + while (block.type == BlockType.TREE) { + val tree = TreeData(block) + block = tree.childrenBlock(key) + } + + if (block.type == BlockType.LEAF) { + val leaf = LeafData(block) + val data = leaf.get(key) + + if (data != null) { + val stream = DataInputStream(BlockInputStream(data)) + val output = ByteArray(stream.readInt()) + stream.readFully(output) + return KOptional(output) + } + } else if (block.type == BlockType.DATA) { + throw IllegalStateException("Hit data block when scanning index") + } else if (block.type == BlockType.FREE) { + throw IllegalStateException("Hit free block when scanning index") + } + + return KOptional.empty() + } + + override fun read(key: ByteKey): CompletableFuture> { + return CompletableFuture.supplyAsync(Supplier { + doRead(key) + }, carrier) + } + + private fun readKeysInto(block: Block, result: MutableList) { + when (block.type) { + BlockType.TREE -> { + val tree = TreeData(block) + + for (sub in tree.children()) { + readKeysInto(readBlock(sub), result) + } + } + + BlockType.LEAF -> { + val leaf = LeafData(block) + + for ((key) in leaf.keys()) { + result.add(key) + } + } + + else -> throw IllegalStateException("Hit $block while scanning tree index") + } + } + + private fun doFindAllKeys(): List { + if (rootBlockIndex == INVALID_BLOCK_INDEX) { + return emptyList() + } + + val result = ArrayList() + readKeysInto(readBlock(rootBlockIndex), result) + return result + } + + override fun findAllKeys(): CompletableFuture> { + return CompletableFuture.supplyAsync(Supplier { + doFindAllKeys() + }, carrier) + } + + private fun doWriteData(key: ByteKey, data: ByteArray, offset: Int, length: Int) { + if (rootBlockIndex == INVALID_BLOCK_INDEX) { + // create LEAF node for root + val block = allocBlock(BlockType.LEAF) + block.write() + rootBlockIndex = block.id + writeHeader() + } + + val block = readBlock(rootBlockIndex) + + if (block.type == BlockType.LEAF) { + val leaf = LeafData(block) + val (newData, oldData) = leaf.compute(key) + + val stream = DataOutputStream(BlockOutputStream(newData)) + stream.writeInt(length) + stream.write(data, offset, length) + stream.close() + + val (newLeaf, oldLeaf) = leaf.flush() + + check(newLeaf.type == BlockType.LEAF) + rootBlockIndex = newLeaf.id + writeHeader() + + oldLeaf?.freeChain() + oldData?.freeChain() + + if (leaf.shouldSplit) { + // replace root leaf with root tree + val split = leaf.split() + val tree = TreeData(allocBlock(BlockType.TREE)) + + tree.add(split) + tree.flush(true) + rootBlockIndex = tree.head.id + writeHeader() + + split.original.freeChain() + } + } else if (block.type == BlockType.FREE) { + throw IllegalStateException("Root block is free block") + } else if (block.type == BlockType.TREE) { + val trees = ArrayList() + val blocksToFree = ArrayList() + var currentBlock = block + var tree = TreeData(currentBlock) + trees.add(tree) + + while (true) { + currentBlock = tree.childrenBlock(key) + + if (currentBlock.type == BlockType.TREE) { + tree = TreeData(currentBlock) + trees.add(tree) + } else if (currentBlock.type == BlockType.FREE) { + throw IllegalStateException("Hit free block when scanning tree") + } else if (currentBlock.type == BlockType.DATA) { + throw IllegalStateException("Hit data block when scanning tree") + } else if (currentBlock.type == BlockType.LEAF) { + break + } + } + + val leaf = LeafData(currentBlock) + val (newData, oldData) = leaf.compute(key) + + if (oldData != null) blocksToFree.add(oldData) + + val stream = DataOutputStream(BlockOutputStream(newData)) + stream.writeInt(length) + stream.write(data, offset, length) + stream.close() + + if (leaf.shouldSplit) { + // splitting is required, which may split upstream trees + var split = leaf.split() + blocksToFree.add(split.original) + + val itr = trees.asReversed().iterator() + + for (uptree in itr) { + uptree.add(split) + + if (uptree.shouldSplit) { + // split subtree + split = uptree.split() + blocksToFree.add(split.original) + + if (!itr.hasNext()) { + // splitted main tree + val newMainTree = TreeData(allocBlock(BlockType.TREE)) + newMainTree.add(split) + trees.add(0, newMainTree) + newMainTree.flush(true) + + break + } + } else { + // splitting has finished, only update parent trees pointers + + val (n, o) = uptree.flush() + blocksToFree.add(o!!) + + var newBlock = n.id + var oldBlock = o.id + + for (uptree2 in itr) { + uptree2.update(oldBlock, newBlock) + val (n, o) = uptree2.flush() + blocksToFree.add(o!!) + newBlock = n.id + oldBlock = o.id + } + + break + } + } + } else { + // no splitting required, simply construct new tree by updating pointers + val (newLeaf, oldLeaf) = leaf.flush() + check(newLeaf.type == BlockType.LEAF) + checkNotNull(oldLeaf) + + blocksToFree.add(oldLeaf) + var newBlock = newLeaf.id + var oldBlock = oldLeaf.id + + for (uptree in trees.asReversed()) { + uptree.update(oldBlock, newBlock) + val (n, o) = uptree.flush() + blocksToFree.add(o!!) + newBlock = n.id + oldBlock = o.id + } + } + + /*trees.first().check(LongConsumer { + if (blocksToFree.any { b -> b.id == it }) { + throw IllegalStateException("Tree is referencing dead blocks") + } + })*/ + + rootBlockIndex = trees.first().head.id + reader.channel.force(true) + writeHeader() + reader.channel.force(true) + blocksToFree.forEach { it.freeChain() } + } + } + + override fun write(key: ByteKey, value: ByteArray, offset: Int, length: Int): CompletableFuture { + return CompletableFuture.runAsync(Runnable { + doWriteData(key, value, offset, length) + }, carrier) + } + + private var nextBlockID = (reader.channel.size().coerceAtLeast(blockSize.toLong()) - blockSize.toLong()) / blockSize + + private fun allocBlock(type: BlockType): Block { + if (freeBlockList == INVALID_BLOCK_INDEX) { + // no free blocks, enlarge the file + val newID = nextBlockID++ + val block = Block(newID) + block.type = type + block.nextBlock = INVALID_BLOCK_INDEX + // block.write() + return block + } else { + // we have free block + val block = readBlock(freeBlockList) + check(block.type == BlockType.FREE) { "Expected block $freeBlockList to be empty, got ${block.type}" } + val old = freeBlockList + freeBlockList = block.nextBlock + check(freeBlockList != old) { "$freeBlockList == $old!!!" } + writeHeader() + block.nextBlock = INVALID_BLOCK_INDEX + block.type = type + return block + } + } + + private fun readBlock(id: Long): Block { + val block = Block(id) + block.read() + return block + } + + // from smallest to biggest + private inner class TreeData(head: Block) { + var head: Block = head + private set + + private var rightMostBlock: Long + private val children = Object2LongAVLTreeMap() + private var isDirty = false + + val size: Int + get() = children.size + + val shouldSplit: Boolean + get() = size >= 8 && (children.keys.stream().mapToInt { it.size + 8 }.sum() + 8) >= (blockSize - 9) + + fun children(): LongStream { + return LongStream.concat(children.values.longStream(), LongStream.of(rightMostBlock)) + } + + init { + check(head.type == BlockType.TREE) { "TreeData was constructed with ${head.type} block" } + children.defaultReturnValue(INVALID_BLOCK_INDEX) + val reader = DataInputStream(BlockInputStream(head)) + val entryCount = reader.readVarInt() + + for (i in 0 until entryCount) { + val key = reader.readByteKey() + val block = reader.readLong() + children.put(key, block) + } + + rightMostBlock = reader.readLong() + } + + fun flush(inPlace: Boolean = false): Pair { + if (!isDirty) return head to null + val new = if (inPlace) head else allocBlock(BlockType.TREE) + + val stream = DataOutputStream(BlockOutputStream(new)) + stream.writeVarInt(children.size) + + for ((key, block) in children) { + key.write(stream) + stream.writeLong(block) + } + + stream.writeLong(rightMostBlock) + stream.close() + check(new.type == BlockType.TREE) + isDirty = false + + if (inPlace) { + return head to null + } else { + val old = head + head = new + return new to old + } + } + + private fun intersects(other: TreeData): Boolean { + return other.rightMostBlock == rightMostBlock || rightMostBlock in other.children.values || children.values.any { it in other.children.values } + } + + fun split(): SplitResult { + val keys = ObjectArrayList(children.keys) + val middle = keys.size / 2 + val median = keys[middle] + + val tree0 = TreeData(allocBlock(BlockType.TREE)) + val tree1 = TreeData(allocBlock(BlockType.TREE)) + + tree0.rightMostBlock = children.getLong(median) + tree1.rightMostBlock = rightMostBlock + + check(tree0.rightMostBlock != INVALID_BLOCK_INDEX) + + for (i in 0 until middle) { + tree0.children[keys[i]] = children.getLong(keys[i]) + } + + for (i in middle + 1 until keys.size) { + tree1.children[keys[i]] = children.getLong(keys[i]) + } + + check(!tree0.intersects(tree1)) { "tree split consistency check failed" } + check(!tree1.intersects(tree0)) { "tree split consistency check failed" } + + tree0.isDirty = true + tree1.isDirty = true + + tree0.flush(true) + tree1.flush(true) + + return SplitResult(tree0.head, tree1.head, median, this.head) + } + + fun childrenBlock(key: ByteKey): Block { + for ((rkey, block) in children) { + if (key < rkey) { + return readBlock(block) + } + } + + return readBlock(rightMostBlock) + } + + fun add(data: SplitResult) { + if (children.isEmpty()) { + // new tree + rightMostBlock = data.right.id + children[data.median] = data.left.id + } else if (rightMostBlock == data.original.id) { + children[data.median] = data.left.id + rightMostBlock = data.right.id + } else { + val splitPoint = children.entries.find { it.value == data.original.id } + checkNotNull(splitPoint) { "Tree is inconsistent: unable to locate children split point (median: ${data.median}; old children block: ${data.original.id})" } + val key = splitPoint.key + children[key] = data.right.id + children[data.median] = data.left.id + } + + isDirty = true + } + + fun check(blockIDVisitor: LongConsumer, stack: LongArraySet = LongArraySet()) { + if (!stack.add(head.id)) { + throw IllegalStateException("Cyclic block reference in search tree (${stack.joinToString(" -> ", transform = { readBlock(it).toString() })} -> $head)") + } + + try { + blockIDVisitor.accept(head.id) + } catch (err: Throwable) { + throw IllegalStateException("Tree consistency check failed on path ${stack.joinToString(" -> ", transform = { readBlock(it).toString() })}", err) + } + + val ids = LongArrayList(children.values) + ids.add(rightMostBlock) + + for (block in ids) { + val read = readBlock(block) + + if (read.type == BlockType.LEAF) { + LeafData(read).check(blockIDVisitor, stack) + } else if (read.type == BlockType.TREE) { + TreeData(read).check(blockIDVisitor, stack) + } else if (read.type == BlockType.FREE) { + throw IllegalStateException("Hit free block while checking tree consistency") + } else if (read.type == BlockType.DATA) { + throw IllegalStateException("Hit data block while checking tree consistency") + } + } + + stack.remove(head.id) + } + + fun update(oldBlock: Long, newBlock: Long) { + if (rightMostBlock == oldBlock) { + rightMostBlock = newBlock + isDirty = true + + check(!children.values.any { it == oldBlock }) { "Tree contains duplicated entries" } + } else { + var hit = false + + for ((key, block) in children.object2LongEntrySet()) { + if (block == oldBlock) { + children[key] = newBlock + hit = true + isDirty = true + break + } + } + + if (!hit) { + throw IllegalStateException("Can't find block $oldBlock in tree") + } + } + } + } + + /** + * left - less than median + * right - greater or equal to median + */ + private data class SplitResult(val left: Block, val right: Block, val median: ByteKey, val original: Block) + + private inner class LeafData(head: Block) { + var head: Block = head + private set + + private val keys = Object2LongAVLTreeMap() + private var isDirty = false + + fun keys(): Stream> { + return keys.entries.stream().map { it.key to it.value } + } + + init { + check(head.type == BlockType.LEAF) { "LeafData was constructed with ${head.type} block" } + keys.defaultReturnValue(INVALID_BLOCK_INDEX) + val stream = DataInputStream(BlockInputStream(head)) + val amount = stream.readVarInt() + + for (i in 0 until amount) { + val key = stream.readByteKey() + val block = stream.readLong() + keys[key] = block + } + } + + val size: Int + get() = keys.size + + val shouldSplit: Boolean + get() = size >= 8 && keys.keys.stream().mapToInt { it.size + 8 }.sum() >= (blockSize - 9) + + fun split(): SplitResult { + val keys = ObjectArrayList(keys.keys) + val median = keys[keys.size / 2] + + val leaf0 = LeafData(allocBlock(BlockType.LEAF)) + val leaf1 = LeafData(allocBlock(BlockType.LEAF)) + + for (i in 0 until keys.size / 2) { + val key = keys[i] + leaf0.keys.put(key, this.keys.getLong(key)) + } + + for (i in keys.size / 2 until keys.size) { + val key = keys[i] + leaf1.keys.put(key, this.keys.getLong(key)) + } + + leaf0.isDirty = true + leaf1.isDirty = true + + leaf0.flush(true) + leaf1.flush(true) + + return SplitResult(leaf0.head, leaf1.head, median, this.head) + } + + fun check(blockIDVisitor: LongConsumer, stack: LongArraySet = LongArraySet()) { + if (!stack.add(head.id)) { + throw IllegalStateException("Cyclic block reference in search tree (${stack.joinToString(" -> ", transform = { readBlock(it).toString() })} -> $head)") + } + + try { + blockIDVisitor.accept(head.id) + } catch (err: Throwable) { + throw IllegalStateException("Tree consistency check failed on path ${stack.joinToString(" -> ", transform = { readBlock(it).toString() })}", err) + } + + for (block in keys.values) { + val read = readBlock(block) + + try { + blockIDVisitor.accept(block) + } catch (err: Throwable) { + throw IllegalStateException("Tree consistency check failed on path ${stack.joinToString(" -> ", transform = { readBlock(it).toString() })} -> $read", err) + } + + if (read.type != BlockType.DATA) { + throw IllegalStateException("Hit ${read.type} block while checking leaf consistency") + } + } + + stack.remove(head.id) + } + + fun remove(key: ByteKey): Block? { + val block = keys.removeLong(key) + + if (block != INVALID_BLOCK_INDEX) { + isDirty = true + return readBlock(block) + } + + return null + } + + fun compute(key: ByteKey): Pair { + val get = keys.getLong(key) + val old = if (get == INVALID_BLOCK_INDEX) null else readBlock(get) + val alloc = allocBlock(BlockType.DATA) + alloc.write() + isDirty = true + keys.put(key, alloc.id) + return alloc to old + } + + fun get(key: ByteKey): Block? { + val get = keys.getLong(key) + if (get == INVALID_BLOCK_INDEX) return null + return readBlock(get) + } + + // current to old + fun flush(inPlace: Boolean = false): Pair { + if (!isDirty) return head to null + val new = if (inPlace) head else allocBlock(BlockType.LEAF) + val stream = DataOutputStream(BlockOutputStream(new)) + stream.writeVarInt(keys.size) + + for ((key, block) in keys) { + key.write(stream) + stream.writeLong(block) + } + + stream.close() + isDirty = false + check(new.type == BlockType.LEAF) + + if (inPlace) { + return head to null + } else { + val old = head + head = new + return new to old + } + } + } + + private inner class BlockInputStream(head: Block) : InputStream() { + private var currentBlock = head + private var currentOffset = 9 + private var isFinished = false + + override fun read(): Int { + if (isFinished) return -1 + + if (currentOffset < blockSize) { + return currentBlock.bytes[currentOffset++].toInt() and 0xFF + } else { + if (currentBlock.nextBlock == INVALID_BLOCK_INDEX) { + isFinished = true + return -1 + } + + currentBlock = readBlock(currentBlock.nextBlock) + currentOffset = 9 + return read() + } + } + } + + private inner class BlockOutputStream(head: Block) : OutputStream() { + private val blocks = ArrayList() + private var currentBlock = head + private var currentOffset = 9 + + init { + blocks.add(head) + } + + override fun flush() { + blocks.forEach { it.write() } + blocks.clear() + blocks.add(currentBlock) + } + + override fun close() { + flush() + val next = currentBlock.nextBlock + + if (next != INVALID_BLOCK_INDEX) { + readBlock(next).freeChain() + currentBlock.nextBlock = INVALID_BLOCK_INDEX + currentBlock.write() + } + } + + private fun nextBlock() { + val alloc = allocBlock(currentBlock.type) + currentBlock.nextBlock = alloc.id + blocks.add(alloc) + currentOffset = 9 + currentBlock = alloc + } + + override fun write(b: ByteArray, off: Int, len: Int) { + Objects.checkFromIndexSize(off, len, b.size) + var written = 0 + + while (written < len) { + if (currentOffset >= blockSize) { + nextBlock() + } + + val writable = (len - written).coerceAtMost(blockSize - currentOffset) + System.arraycopy(b, off + written, currentBlock.bytes, currentOffset, writable) + written += writable + currentOffset += writable + } + } + + override fun write(b: Int) { + if (currentOffset < blockSize) { + currentBlock.bytes[currentOffset++] = b.toByte() + } else { + nextBlock() + currentBlock.bytes[currentOffset++] = b.toByte() + } + } + } + + private inner class Block { + val bytes: ByteArray + + constructor(id: Long) { + this.id = id + bytes = ByteArray(blockSize) + } + + constructor(copy: Block, id: Long) { + this.id = id + bytes = copy.bytes.clone() + } + + override fun toString(): String { + return "BTreeDB6.Block[$id, $type]" + } + + val id: Long + val offset: Long get() = 4096L + id * blockSize + + var type: BlockType + get() = BlockType.e[bytes[0].toInt()] + set(value) { bytes[0] = value.ordinal.toByte() } + + var nextBlock: Long + get() { + val backing = bytes + + val v0 = (backing[1 + 0].toLong() and 0xFFL) shl 56 + val v1 = (backing[1 + 1].toLong() and 0xFFL) shl 48 + val v2 = (backing[1 + 2].toLong() and 0xFFL) shl 40 + val v3 = (backing[1 + 3].toLong() and 0xFFL) shl 32 + val v4 = (backing[1 + 4].toLong() and 0xFFL) shl 24 + val v5 = (backing[1 + 5].toLong() and 0xFFL) shl 16 + val v6 = (backing[1 + 6].toLong() and 0xFFL) shl 8 + val v7 = (backing[1 + 7].toLong() and 0xFFL) + + return v0 or v1 or v2 or v3 or v4 or v5 or v6 or v7 + } + set(value) { + val backing = bytes + + backing[1 + 0] = ((value ushr 56) and 0xFFL).toByte() + backing[1 + 1] = ((value ushr 48) and 0xFFL).toByte() + backing[1 + 2] = ((value ushr 40) and 0xFFL).toByte() + backing[1 + 3] = ((value ushr 32) and 0xFFL).toByte() + backing[1 + 4] = ((value ushr 24) and 0xFFL).toByte() + backing[1 + 5] = ((value ushr 16) and 0xFFL).toByte() + backing[1 + 6] = ((value ushr 8) and 0xFFL).toByte() + backing[1 + 7] = ((value) and 0xFFL).toByte() + } + + fun read() { + reader.seek(offset) + reader.readFully(bytes) + } + + fun write() { + reader.seek(offset) + reader.write(bytes) + } + + fun free() { + if (type == BlockType.FREE) + throw IllegalStateException("Block $id is already free") + + bytes.fill(0) + type = BlockType.FREE + nextBlock = freeBlockList + freeBlockList = id + write() + writeHeader() + } + + fun freeChain() { + var next = nextBlock + free() + + while (next != INVALID_BLOCK_INDEX) { + val block = readBlock(next) + next = block.nextBlock + block.free() + } + } + } + + companion object { + const val INVALID_BLOCK_INDEX = -1L + + /** + * Creates a new [BTreeDB6] file, with block size specified as [blockSize]. + * + * Ideally, block size should match smallest unit of work done by file system, which is typically 4 KiB, or 4096 bytes. + * While arbitrary block size is supported (128 bytes and up, until [Int.MAX_VALUE]), be aware that each I/O operation BTreeDB6 + * performs will always read/write full block. So, block size of BTreeDB6 is smallest unit of work for that database. + */ + @JvmStatic + @JvmOverloads + fun create(file: File, pool: Executor, blockSize: Int = 4096): BTreeDB6 { + require(blockSize >= 128) { "Degenerate block size: $blockSize" } + + if (file.exists()) + throw FileAlreadyExistsException(file) + + val writer = RandomAccessFile(file, "rw") + + writer.write("BTreeDB6".toByteArray()) // specify version as 6 to prohibit original game from opening + // and corrupting the file + + // block size + writer.writeInt(blockSize) + + writer.writeLong(INVALID_BLOCK_INDEX) // first free block index + writer.writeLong(INVALID_BLOCK_INDEX) // tree root block index + + return BTreeDB6(file, writer, pool) + } + } +} diff --git a/src/main/kotlin/ru/dbotthepony/kommons/io/ByteKey.kt b/src/main/kotlin/ru/dbotthepony/kommons/io/ByteKey.kt new file mode 100644 index 0000000..538233a --- /dev/null +++ b/src/main/kotlin/ru/dbotthepony/kommons/io/ByteKey.kt @@ -0,0 +1,55 @@ +package ru.dbotthepony.kommons.io + +import java.io.InputStream +import java.io.OutputStream + +class ByteKey(private vararg val bytes: Byte) : Comparable { + constructor(key: String) : this(*key.toByteArray()) + + override fun equals(other: Any?): Boolean { + return this === other || other is ByteKey && other.bytes.contentEquals(bytes) + } + + val size: Int + get() = bytes.size + + operator fun get(index: Int): Byte { + return bytes[index] + } + + fun write(stream: OutputStream) { + stream.writeVarInt(bytes.size) + stream.write(bytes) + } + + fun writeRaw(stream: OutputStream) { + stream.write(bytes) + } + + fun toByteArray(): ByteArray { + return bytes.clone() + } + + override fun hashCode(): Int { + return bytes.contentHashCode() + } + + override fun toString(): String { + return "ByteKey[${bytes.map { it.toInt() and 0xFF }.joinToString(", ")}]" + } + + override fun compareTo(other: ByteKey): Int { + val cmp = size.compareTo(other.size) + if (cmp != 0) return cmp + + for (i in bytes.indices) { + if (bytes[i].toInt() and 0xFF > other.bytes[i].toInt() and 0xFF) { + return 1 + } else if (bytes[i].toInt() and 0xFF < other.bytes[i].toInt() and 0xFF) { + return -1 + } + } + + return 0 + } +} diff --git a/src/main/kotlin/ru/dbotthepony/kommons/io/InputStreamUtils.kt b/src/main/kotlin/ru/dbotthepony/kommons/io/InputStreamUtils.kt index f9d94d9..c4cf63d 100644 --- a/src/main/kotlin/ru/dbotthepony/kommons/io/InputStreamUtils.kt +++ b/src/main/kotlin/ru/dbotthepony/kommons/io/InputStreamUtils.kt @@ -336,3 +336,11 @@ fun InputStream.readByteArray(maxRead: Int = Int.MAX_VALUE): ByteArray { throw IllegalArgumentException("Trying to read $size bytes when $maxRead is the max allowed") return readOrError(ByteArray(size)) } + +fun InputStream.readByteKey(): ByteKey { + return ByteKey(*ByteArray(readVarInt()).also { read(it) }) +} + +fun InputStream.readByteKeyRaw(size: Int): ByteKey { + return ByteKey(*ByteArray(size).also { read(it) }) +} diff --git a/src/main/kotlin/ru/dbotthepony/kommons/io/OutputStreamUtils.kt b/src/main/kotlin/ru/dbotthepony/kommons/io/OutputStreamUtils.kt index 92ec3f0..06bb611 100644 --- a/src/main/kotlin/ru/dbotthepony/kommons/io/OutputStreamUtils.kt +++ b/src/main/kotlin/ru/dbotthepony/kommons/io/OutputStreamUtils.kt @@ -322,3 +322,11 @@ fun OutputStream.writeByteArray(value: ByteArray) { writeVarInt(value.size) write(value) } + +fun OutputStream.writeByteKey(key: ByteKey) { + key.write(this) +} + +fun OutputStream.writeRawByteKey(key: ByteKey) { + key.writeRaw(this) +}