diff --git a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/commands/BlockChangeCommand.kt b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/commands/BlockChangeCommand.kt index 6c1c0ec..21a1f7c 100644 --- a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/commands/BlockChangeCommand.kt +++ b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/commands/BlockChangeCommand.kt @@ -4,9 +4,7 @@ import cloud.kubelet.foundation.gjallarhorn.render.BlockDiversityRenderer import cloud.kubelet.foundation.gjallarhorn.render.BlockHeightMapRenderer import cloud.kubelet.foundation.gjallarhorn.render.BlockImageRenderer import cloud.kubelet.foundation.gjallarhorn.state.* -import cloud.kubelet.foundation.gjallarhorn.util.compose import cloud.kubelet.foundation.gjallarhorn.util.savePngFile -import cloud.kubelet.foundation.heimdall.view.BlockChangeView import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.core.requireObject import com.github.ajalt.clikt.parameters.options.flag @@ -15,20 +13,15 @@ import com.github.ajalt.clikt.parameters.options.required import com.github.ajalt.clikt.parameters.types.enum import com.github.ajalt.clikt.parameters.types.int import org.jetbrains.exposed.sql.Database -import org.jetbrains.exposed.sql.SqlExpressionBuilder.lessEq -import org.jetbrains.exposed.sql.and import org.slf4j.LoggerFactory +import java.awt.image.BufferedImage import java.time.Duration -import java.time.Instant -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ScheduledThreadPoolExecutor -import java.util.concurrent.TimeUnit class BlockChangeCommand : CliktCommand("Block Changes", name = "block-changes") { private val db by requireObject() - private val exactTimeAsString by option("--time", help = "Replay Time") - private val timelapseMode by option("--timelapse", help = "Timelapse Mode").enum { it.id } private val timelapseIntervalLimit by option("--timelapse-limit", help = "Timelapse Limit Intervals").int() + private val timelapseMode by option("--timelapse", help = "Timelapse Mode").enum { it.id }.required() private val render by option("--render", help = "Render Top Down Image").enum { it.id }.required() private val considerAirBlocks by option("--consider-air-blocks", help = "Enable Air Block Consideration").flag() @@ -39,99 +32,31 @@ class BlockChangeCommand : CliktCommand("Block Changes", name = "block-changes") private val logger = LoggerFactory.getLogger(BlockChangeCommand::class.java) override fun run() { - if (timelapseMode != null) { - val changelog = BlockChangelog.query(db) - val (start, end) = changelog.changeTimeRange - var intervals = mutableListOf() - var current = start - while (!current.isAfter(end)) { - intervals.add(current) - current = current.plus(timelapseMode!!.interval) - } - - if (timelapseIntervalLimit != null) { - intervals = intervals.takeLast(timelapseIntervalLimit!!).toMutableList() - } - - val trackerPool = ScheduledThreadPoolExecutor(8) - val trackers = ConcurrentHashMap() - for (time in intervals) { - trackerPool.submit { - val index = intervals.indexOf(time) + 1 - val tracker = - buildTrackerState(changelog.slice(time.minus(timelapseMode!!.interval) to time), "Timelapse-${index}") - if (tracker.isEmpty()) { - return@submit - } - trackers[index] = tracker - } - } - trackerPool.shutdown() - if (!trackerPool.awaitTermination(12, TimeUnit.HOURS)) { - throw RuntimeException("Failed to wait for tracker pool.") - } - logger.info("State Tracking Completed") - val allBlockOffsets = trackers.map { it.value.calculateZeroBlockOffset() } - val globalBlockOffset = BlockCoordinate.maxOf(allBlockOffsets) - val allBlockMaxes = trackers.map { it.value.calculateMaxBlock() } - val globalBlockMax = BlockCoordinate.maxOf(allBlockMaxes) - val globalBlockExpanse = BlockExpanse.offsetAndMax(globalBlockOffset, globalBlockMax) - - logger.info("Calculations Completed") - - val renderer = render.create(globalBlockExpanse) - val renderPool = ScheduledThreadPoolExecutor(16) - val imagePadCount = trackers.size.toString().length - for ((i, tracker) in trackers.entries) { - renderPool.submit { - val suffix = "-${i.toString().padStart(imagePadCount, '0')}" - saveRenderImage(renderer, tracker, globalBlockExpanse, suffix) - logger.info("Rendered Timelapse $i") - } - } - renderPool.shutdown() - if (!renderPool.awaitTermination(12, TimeUnit.HOURS)) { - throw RuntimeException("Failed to wait for render pool.") - } - logger.info("Rendering Completed") - } else { - val time = if (exactTimeAsString != null) Instant.parse(exactTimeAsString) else null - val filter = compose( - combine = { a, b -> a and b }, - { time != null } to { BlockChangeView.time lessEq time!! } - ) - val changelog = BlockChangelog.query(db, filter) - val tracker = buildTrackerState(changelog, "Single-Time") - val expanse = BlockExpanse.offsetAndMax(tracker.calculateZeroBlockOffset(), tracker.calculateMaxBlock()) - saveRenderImage(render.create(expanse), tracker, expanse) + val threadPoolExecutor = ScheduledThreadPoolExecutor(8) + val changelog = BlockChangelog.query(db) + val timelapse = BlockMapTimelapse(maybeBuildTrim()) + val slices = timelapse.calculateChangelogSlices(changelog, timelapseMode.interval, timelapseIntervalLimit) + val imagePadCount = slices.size.toString().length + val pool = BlockMapRenderPool( + changelog = changelog, + blockTrackMode = if (considerAirBlocks) BlockTrackMode.AirOnDelete else BlockTrackMode.RemoveOnDelete, + delegate = timelapse, + rendererFactory = { expanse -> render.create(expanse) }, + threadPoolExecutor = threadPoolExecutor + ) { slice, result -> + val index = slices.indexOf(slice) + 1 + val suffix = "-${index.toString().padStart(imagePadCount, '0')}" + result.savePngFile("${render.id}${suffix}.png") + logger.info("Rendered Timelapse $index") } + + pool.render(slices) + threadPoolExecutor.shutdown() } - fun saveRenderImage( - renderer: BlockImageRenderer, - tracker: BlockLogTracker, - expanse: BlockExpanse, - suffix: String = "" - ) { - val map = tracker.buildBlockMap(expanse.offset) - val image = renderer.render(map) - image.savePngFile("${render.id}${suffix}.png") - } - - fun buildTrackerState(changelog: BlockChangelog, job: String): BlockLogTracker { - val tracker = - BlockLogTracker(if (considerAirBlocks) BlockTrackMode.AirOnDelete else BlockTrackMode.RemoveOnDelete) - tracker.replay(changelog) - logger.info("Job $job Total Block Changes... ${changelog.changes.size}") - val uniqueBlockPositions = tracker.blocks.size - logger.info("Job $job Unique Block Positions... $uniqueBlockPositions") - maybeTrimState(tracker) - return tracker - } - - fun maybeTrimState(tracker: BlockLogTracker) { + fun maybeBuildTrim(): Pair? { if (fromCoordinate == null || toCoordinate == null) { - return + return null } val from = fromCoordinate!!.split(",").map { it.toLong() } @@ -139,8 +64,7 @@ class BlockChangeCommand : CliktCommand("Block Changes", name = "block-changes") val fromBlock = BlockCoordinate(from[0], 0, from[1]) val toBlock = BlockCoordinate(to[0], 0, to[1]) - - tracker.trimOutsideXAndZRange(fromBlock, toBlock) + return fromBlock to toBlock } @Suppress("unused") diff --git a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockChangelog.kt b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockChangelog.kt index 7bc5007..d076991 100644 --- a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockChangelog.kt +++ b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockChangelog.kt @@ -5,17 +5,16 @@ import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Op import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.transactions.transaction -import java.time.Instant class BlockChangelog( val changes: List ) { - fun slice(range: Pair): BlockChangelog = BlockChangelog(changes.filter { - it.time >= range.first && - it.time <= range.second + fun slice(slice: BlockChangelogSlice): BlockChangelog = BlockChangelog(changes.filter { + it.time >= slice.first && + it.time <= slice.second }) - val changeTimeRange: Pair + val changeTimeRange: BlockChangelogSlice get() = changes.minOf { it.time } to changes.maxOf { it.time } companion object { diff --git a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockChangelogSlice.kt b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockChangelogSlice.kt new file mode 100644 index 0000000..7933f8d --- /dev/null +++ b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockChangelogSlice.kt @@ -0,0 +1,5 @@ +package cloud.kubelet.foundation.gjallarhorn.state + +import java.time.Instant + +typealias BlockChangelogSlice = Pair diff --git a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapRenderPool.kt b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapRenderPool.kt new file mode 100644 index 0000000..83bcf8f --- /dev/null +++ b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapRenderPool.kt @@ -0,0 +1,72 @@ +package cloud.kubelet.foundation.gjallarhorn.state + +import cloud.kubelet.foundation.gjallarhorn.render.BlockMapRenderer +import org.slf4j.LoggerFactory +import java.util.concurrent.* + +class BlockMapRenderPool( + val changelog: BlockChangelog, + val blockTrackMode: BlockTrackMode, + val rendererFactory: (BlockExpanse) -> BlockMapRenderer, + val delegate: RenderPoolDelegate, + val threadPoolExecutor: ThreadPoolExecutor, + val renderResultCallback: (BlockChangelogSlice, T) -> Unit +) { + private val trackers = ConcurrentHashMap() + private val playbackJobFutures = ConcurrentHashMap>() + private val renderJobFutures = ConcurrentHashMap>() + + fun submitPlaybackJob(slice: BlockChangelogSlice) { + val future = threadPoolExecutor.submit { + try { + runPlaybackSlice(slice) + } catch (e: Exception) { + logger.error("Failed to run playback job for slice $slice", e) + } + } + playbackJobFutures[slice] = future + } + + fun submitRenderJob(slice: BlockChangelogSlice, callback: () -> T) { + val future = threadPoolExecutor.submit { + try { + val result = callback() + renderResultCallback(slice, result) + } catch (e: Exception) { + logger.error("Failed to run render job for slice $slice", e) + } + } + renderJobFutures[slice] = future + } + + fun render(slices: List) { + for (slice in slices) { + submitPlaybackJob(slice) + } + + for (future in playbackJobFutures.values) { + future.get() + } + + delegate.buildRenderJobs(this, trackers) + + for (future in renderJobFutures.values) { + future.get() + } + } + + private fun runPlaybackSlice(slice: BlockChangelogSlice) { + val sliced = changelog.slice(slice) + val tracker = BlockLogTracker(blockTrackMode) + tracker.replay(sliced) + trackers[slice] = tracker + } + + interface RenderPoolDelegate { + fun buildRenderJobs(pool: BlockMapRenderPool, trackers: Map) + } + + companion object { + private val logger = LoggerFactory.getLogger(BlockMapRenderPool::class.java) + } +} diff --git a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapTimelapse.kt b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapTimelapse.kt new file mode 100644 index 0000000..35519d9 --- /dev/null +++ b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapTimelapse.kt @@ -0,0 +1,44 @@ +package cloud.kubelet.foundation.gjallarhorn.state + +import java.time.Duration +import java.time.Instant + +class BlockMapTimelapse(val trim: Pair? = null) : + BlockMapRenderPool.RenderPoolDelegate { + fun calculateChangelogSlices( + changelog: BlockChangelog, interval: Duration, limit: Int? = null + ): List { + val (start, end) = changelog.changeTimeRange + var intervals = mutableListOf() + var current = start + while (!current.isAfter(end)) { + intervals.add(current) + current = current.plus(interval) + } + + if (limit != null) { + intervals = intervals.takeLast(limit).toMutableList() + } + return intervals.map { it.minus(interval) to it } + } + + override fun buildRenderJobs(pool: BlockMapRenderPool, trackers: Map) { + val allBlockOffsets = trackers.map { it.value.calculateZeroBlockOffset() } + val globalBlockOffset = BlockCoordinate.maxOf(allBlockOffsets) + val allBlockMaxes = trackers.map { it.value.calculateMaxBlock() } + val globalBlockMax = BlockCoordinate.maxOf(allBlockMaxes) + val globalBlockExpanse = BlockExpanse.offsetAndMax(globalBlockOffset, globalBlockMax) + + val renderer = pool.rendererFactory(globalBlockExpanse) + for ((slice, tracker) in trackers) { + if (trim != null) { + tracker.trimOutsideXAndZRange(trim.first, trim.second) + } + + pool.submitRenderJob(slice) { + val map = tracker.buildBlockMap(globalBlockExpanse.offset) + renderer.render(map) + } + } + } +}