Gjallarhorn: Create render pool concept.

This commit is contained in:
Kenneth Endfinger
2022-01-08 14:57:56 -05:00
parent 3350034060
commit 81a76da809
5 changed files with 149 additions and 105 deletions

View File

@ -4,9 +4,7 @@ import cloud.kubelet.foundation.gjallarhorn.render.BlockDiversityRenderer
import cloud.kubelet.foundation.gjallarhorn.render.BlockHeightMapRenderer
import cloud.kubelet.foundation.gjallarhorn.render.BlockImageRenderer
import cloud.kubelet.foundation.gjallarhorn.state.*
import cloud.kubelet.foundation.gjallarhorn.util.compose
import cloud.kubelet.foundation.gjallarhorn.util.savePngFile
import cloud.kubelet.foundation.heimdall.view.BlockChangeView
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.core.requireObject
import com.github.ajalt.clikt.parameters.options.flag
@ -15,20 +13,15 @@ import com.github.ajalt.clikt.parameters.options.required
import com.github.ajalt.clikt.parameters.types.enum
import com.github.ajalt.clikt.parameters.types.int
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.SqlExpressionBuilder.lessEq
import org.jetbrains.exposed.sql.and
import org.slf4j.LoggerFactory
import java.awt.image.BufferedImage
import java.time.Duration
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.TimeUnit
class BlockChangeCommand : CliktCommand("Block Changes", name = "block-changes") {
private val db by requireObject<Database>()
private val exactTimeAsString by option("--time", help = "Replay Time")
private val timelapseMode by option("--timelapse", help = "Timelapse Mode").enum<TimelapseMode> { it.id }
private val timelapseIntervalLimit by option("--timelapse-limit", help = "Timelapse Limit Intervals").int()
private val timelapseMode by option("--timelapse", help = "Timelapse Mode").enum<TimelapseMode> { it.id }.required()
private val render by option("--render", help = "Render Top Down Image").enum<RenderType> { it.id }.required()
private val considerAirBlocks by option("--consider-air-blocks", help = "Enable Air Block Consideration").flag()
@ -39,99 +32,31 @@ class BlockChangeCommand : CliktCommand("Block Changes", name = "block-changes")
private val logger = LoggerFactory.getLogger(BlockChangeCommand::class.java)
override fun run() {
if (timelapseMode != null) {
val changelog = BlockChangelog.query(db)
val (start, end) = changelog.changeTimeRange
var intervals = mutableListOf<Instant>()
var current = start
while (!current.isAfter(end)) {
intervals.add(current)
current = current.plus(timelapseMode!!.interval)
}
if (timelapseIntervalLimit != null) {
intervals = intervals.takeLast(timelapseIntervalLimit!!).toMutableList()
}
val trackerPool = ScheduledThreadPoolExecutor(8)
val trackers = ConcurrentHashMap<Int, BlockLogTracker>()
for (time in intervals) {
trackerPool.submit {
val index = intervals.indexOf(time) + 1
val tracker =
buildTrackerState(changelog.slice(time.minus(timelapseMode!!.interval) to time), "Timelapse-${index}")
if (tracker.isEmpty()) {
return@submit
}
trackers[index] = tracker
}
}
trackerPool.shutdown()
if (!trackerPool.awaitTermination(12, TimeUnit.HOURS)) {
throw RuntimeException("Failed to wait for tracker pool.")
}
logger.info("State Tracking Completed")
val allBlockOffsets = trackers.map { it.value.calculateZeroBlockOffset() }
val globalBlockOffset = BlockCoordinate.maxOf(allBlockOffsets)
val allBlockMaxes = trackers.map { it.value.calculateMaxBlock() }
val globalBlockMax = BlockCoordinate.maxOf(allBlockMaxes)
val globalBlockExpanse = BlockExpanse.offsetAndMax(globalBlockOffset, globalBlockMax)
logger.info("Calculations Completed")
val renderer = render.create(globalBlockExpanse)
val renderPool = ScheduledThreadPoolExecutor(16)
val imagePadCount = trackers.size.toString().length
for ((i, tracker) in trackers.entries) {
renderPool.submit {
val suffix = "-${i.toString().padStart(imagePadCount, '0')}"
saveRenderImage(renderer, tracker, globalBlockExpanse, suffix)
logger.info("Rendered Timelapse $i")
}
}
renderPool.shutdown()
if (!renderPool.awaitTermination(12, TimeUnit.HOURS)) {
throw RuntimeException("Failed to wait for render pool.")
}
logger.info("Rendering Completed")
} else {
val time = if (exactTimeAsString != null) Instant.parse(exactTimeAsString) else null
val filter = compose(
combine = { a, b -> a and b },
{ time != null } to { BlockChangeView.time lessEq time!! }
)
val changelog = BlockChangelog.query(db, filter)
val tracker = buildTrackerState(changelog, "Single-Time")
val expanse = BlockExpanse.offsetAndMax(tracker.calculateZeroBlockOffset(), tracker.calculateMaxBlock())
saveRenderImage(render.create(expanse), tracker, expanse)
val threadPoolExecutor = ScheduledThreadPoolExecutor(8)
val changelog = BlockChangelog.query(db)
val timelapse = BlockMapTimelapse<BufferedImage>(maybeBuildTrim())
val slices = timelapse.calculateChangelogSlices(changelog, timelapseMode.interval, timelapseIntervalLimit)
val imagePadCount = slices.size.toString().length
val pool = BlockMapRenderPool(
changelog = changelog,
blockTrackMode = if (considerAirBlocks) BlockTrackMode.AirOnDelete else BlockTrackMode.RemoveOnDelete,
delegate = timelapse,
rendererFactory = { expanse -> render.create(expanse) },
threadPoolExecutor = threadPoolExecutor
) { slice, result ->
val index = slices.indexOf(slice) + 1
val suffix = "-${index.toString().padStart(imagePadCount, '0')}"
result.savePngFile("${render.id}${suffix}.png")
logger.info("Rendered Timelapse $index")
}
pool.render(slices)
threadPoolExecutor.shutdown()
}
fun saveRenderImage(
renderer: BlockImageRenderer,
tracker: BlockLogTracker,
expanse: BlockExpanse,
suffix: String = ""
) {
val map = tracker.buildBlockMap(expanse.offset)
val image = renderer.render(map)
image.savePngFile("${render.id}${suffix}.png")
}
fun buildTrackerState(changelog: BlockChangelog, job: String): BlockLogTracker {
val tracker =
BlockLogTracker(if (considerAirBlocks) BlockTrackMode.AirOnDelete else BlockTrackMode.RemoveOnDelete)
tracker.replay(changelog)
logger.info("Job $job Total Block Changes... ${changelog.changes.size}")
val uniqueBlockPositions = tracker.blocks.size
logger.info("Job $job Unique Block Positions... $uniqueBlockPositions")
maybeTrimState(tracker)
return tracker
}
fun maybeTrimState(tracker: BlockLogTracker) {
fun maybeBuildTrim(): Pair<BlockCoordinate, BlockCoordinate>? {
if (fromCoordinate == null || toCoordinate == null) {
return
return null
}
val from = fromCoordinate!!.split(",").map { it.toLong() }
@ -139,8 +64,7 @@ class BlockChangeCommand : CliktCommand("Block Changes", name = "block-changes")
val fromBlock = BlockCoordinate(from[0], 0, from[1])
val toBlock = BlockCoordinate(to[0], 0, to[1])
tracker.trimOutsideXAndZRange(fromBlock, toBlock)
return fromBlock to toBlock
}
@Suppress("unused")

View File

@ -5,17 +5,16 @@ import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Op
import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.transactions.transaction
import java.time.Instant
class BlockChangelog(
val changes: List<BlockChange>
) {
fun slice(range: Pair<Instant, Instant>): BlockChangelog = BlockChangelog(changes.filter {
it.time >= range.first &&
it.time <= range.second
fun slice(slice: BlockChangelogSlice): BlockChangelog = BlockChangelog(changes.filter {
it.time >= slice.first &&
it.time <= slice.second
})
val changeTimeRange: Pair<Instant, Instant>
val changeTimeRange: BlockChangelogSlice
get() = changes.minOf { it.time } to changes.maxOf { it.time }
companion object {

View File

@ -0,0 +1,5 @@
package cloud.kubelet.foundation.gjallarhorn.state
import java.time.Instant
typealias BlockChangelogSlice = Pair<Instant, Instant>

View File

@ -0,0 +1,72 @@
package cloud.kubelet.foundation.gjallarhorn.state
import cloud.kubelet.foundation.gjallarhorn.render.BlockMapRenderer
import org.slf4j.LoggerFactory
import java.util.concurrent.*
class BlockMapRenderPool<T>(
val changelog: BlockChangelog,
val blockTrackMode: BlockTrackMode,
val rendererFactory: (BlockExpanse) -> BlockMapRenderer<T>,
val delegate: RenderPoolDelegate<T>,
val threadPoolExecutor: ThreadPoolExecutor,
val renderResultCallback: (BlockChangelogSlice, T) -> Unit
) {
private val trackers = ConcurrentHashMap<BlockChangelogSlice, BlockLogTracker>()
private val playbackJobFutures = ConcurrentHashMap<BlockChangelogSlice, Future<*>>()
private val renderJobFutures = ConcurrentHashMap<BlockChangelogSlice, Future<*>>()
fun submitPlaybackJob(slice: BlockChangelogSlice) {
val future = threadPoolExecutor.submit {
try {
runPlaybackSlice(slice)
} catch (e: Exception) {
logger.error("Failed to run playback job for slice $slice", e)
}
}
playbackJobFutures[slice] = future
}
fun submitRenderJob(slice: BlockChangelogSlice, callback: () -> T) {
val future = threadPoolExecutor.submit {
try {
val result = callback()
renderResultCallback(slice, result)
} catch (e: Exception) {
logger.error("Failed to run render job for slice $slice", e)
}
}
renderJobFutures[slice] = future
}
fun render(slices: List<BlockChangelogSlice>) {
for (slice in slices) {
submitPlaybackJob(slice)
}
for (future in playbackJobFutures.values) {
future.get()
}
delegate.buildRenderJobs(this, trackers)
for (future in renderJobFutures.values) {
future.get()
}
}
private fun runPlaybackSlice(slice: BlockChangelogSlice) {
val sliced = changelog.slice(slice)
val tracker = BlockLogTracker(blockTrackMode)
tracker.replay(sliced)
trackers[slice] = tracker
}
interface RenderPoolDelegate<T> {
fun buildRenderJobs(pool: BlockMapRenderPool<T>, trackers: Map<BlockChangelogSlice, BlockLogTracker>)
}
companion object {
private val logger = LoggerFactory.getLogger(BlockMapRenderPool::class.java)
}
}

View File

@ -0,0 +1,44 @@
package cloud.kubelet.foundation.gjallarhorn.state
import java.time.Duration
import java.time.Instant
class BlockMapTimelapse<T>(val trim: Pair<BlockCoordinate, BlockCoordinate>? = null) :
BlockMapRenderPool.RenderPoolDelegate<T> {
fun calculateChangelogSlices(
changelog: BlockChangelog, interval: Duration, limit: Int? = null
): List<BlockChangelogSlice> {
val (start, end) = changelog.changeTimeRange
var intervals = mutableListOf<Instant>()
var current = start
while (!current.isAfter(end)) {
intervals.add(current)
current = current.plus(interval)
}
if (limit != null) {
intervals = intervals.takeLast(limit).toMutableList()
}
return intervals.map { it.minus(interval) to it }
}
override fun buildRenderJobs(pool: BlockMapRenderPool<T>, trackers: Map<BlockChangelogSlice, BlockLogTracker>) {
val allBlockOffsets = trackers.map { it.value.calculateZeroBlockOffset() }
val globalBlockOffset = BlockCoordinate.maxOf(allBlockOffsets)
val allBlockMaxes = trackers.map { it.value.calculateMaxBlock() }
val globalBlockMax = BlockCoordinate.maxOf(allBlockMaxes)
val globalBlockExpanse = BlockExpanse.offsetAndMax(globalBlockOffset, globalBlockMax)
val renderer = pool.rendererFactory(globalBlockExpanse)
for ((slice, tracker) in trackers) {
if (trim != null) {
tracker.trimOutsideXAndZRange(trim.first, trim.second)
}
pool.submitRenderJob(slice) {
val map = tracker.buildBlockMap(globalBlockExpanse.offset)
renderer.render(map)
}
}
}
}