package ru.dbotthepony.kstarbound.network import com.google.common.collect.ImmutableList import com.google.gson.JsonElement import com.google.gson.JsonNull import com.google.gson.JsonObject import com.google.gson.JsonSyntaxException import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap import org.apache.logging.log4j.LogManager import ru.dbotthepony.kommons.gson.set import ru.dbotthepony.kommons.io.readBinaryString import ru.dbotthepony.kommons.io.readKOptional import ru.dbotthepony.kommons.io.writeBinaryString import ru.dbotthepony.kommons.io.writeKOptional import ru.dbotthepony.kommons.util.KOptional import ru.dbotthepony.kstarbound.json.readJsonElement import ru.dbotthepony.kstarbound.json.writeJsonElement import java.io.DataInputStream import java.io.DataOutputStream import java.util.concurrent.CompletableFuture import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock class JsonRPC { enum class Command { REQUEST, RESPONSE, FAIL; } data class Entry(val command: Command, val id: Int, val handler: KOptional, val arguments: KOptional) { fun write(stream: DataOutputStream, isLegacy: Boolean) { if (isLegacy) { stream.writeJsonElement(JsonObject().also { it["command"] = command.name.lowercase() it["id"] = id handler.ifPresent { v -> it["handler"] = v } arguments.ifPresent { v -> if (command == Command.RESPONSE) it["result"] = v else it["arguments"] = v } }) } else { stream.write(command.ordinal) stream.writeInt(id) stream.writeKOptional(handler) { writeBinaryString(it) } stream.writeKOptional(arguments) { writeJsonElement(it) } } } companion object { fun native(stream: DataInputStream): Entry { return Entry(Command.entries[stream.read()], stream.readInt(), stream.readKOptional { readBinaryString() }, stream.readKOptional { readJsonElement() }) } fun legacy(stream: DataInputStream): Entry { val data = stream.readJsonElement() check(data is JsonObject) { "Expected JsonObject, got ${data::class}" } val command = data["command"]?.asString?.uppercase() ?: throw JsonSyntaxException("Missing 'command' in RPC data") val id = data["id"]?.asInt ?: throw JsonSyntaxException("Missing 'id' in RPC data") val handler = KOptional.ofNullable(data["handler"]?.asString) val arguments = KOptional.ofNullable(data["arguments"]) return Entry(Command.entries.firstOrNull { it.name == command } ?: throw JsonSyntaxException("Invalid 'command': $command"), id, handler, arguments) } fun read(stream: DataInputStream, isLegacy: Boolean): Entry { if (isLegacy) return legacy(stream) else return native(stream) } } } fun interface Callback { operator fun invoke(arguments: JsonElement): JsonElement } private var commandCounter = 0 private val pendingWrite = ArrayList() private val responses = Int2ObjectOpenHashMap>() private val lock = ReentrantLock() private val handlers = Object2ObjectOpenHashMap() fun add(name: String, callback: Callback): (JsonElement) -> CompletableFuture { val old = handlers.put(name, callback) check(old == null) { "Duplicate RPC handler: $name" } return { invoke(name, it) } } operator fun invoke(handler: String, arguments: JsonElement): CompletableFuture { lock.withLock { val id = commandCounter++ val response = CompletableFuture() responses[id] = response pendingWrite.add(Entry(Command.REQUEST, id, KOptional(handler), KOptional(arguments.deepCopy()))) return response } } fun write(): List? { lock.withLock { if (pendingWrite.isEmpty()) return null val result = ImmutableList.copyOf(pendingWrite) pendingWrite.clear() return result } } fun read(data: List) { lock.withLock { for (entry in data) { try { when (entry.command) { Command.REQUEST -> { val handler = handlers[entry.handler.value] ?: throw IllegalArgumentException("No such handler ${entry.handler.value}") pendingWrite.add(Entry(Command.RESPONSE, entry.id, KOptional(), KOptional(handler(entry.arguments.value)))) } Command.RESPONSE -> { responses.remove(entry.id)?.complete(entry.arguments.orElse { JsonNull.INSTANCE }) } Command.FAIL -> { responses.remove(entry.id)?.completeExceptionally(RuntimeException("Remote RPC call failed")) } } } catch (err: Throwable) { LOGGER.error("Error while handling RPC call", err) pendingWrite.add(Entry(Command.FAIL, entry.id, KOptional(), KOptional())) } } } } companion object { private val LOGGER = LogManager.getLogger() } }