From 54cd41e9257ea03d80a4402580a8bede5d840b11 Mon Sep 17 00:00:00 2001 From: Kenneth Endfinger Date: Mon, 17 Jan 2022 21:07:40 -0500 Subject: [PATCH] Gjallarhorn: Render pool rework and cleanup. --- .../commands/BlockChangeTimelapseCommand.kt | 6 +-- .../gjallarhorn/state/BlockChangelog.kt | 44 +++++++++++++++-- .../gjallarhorn/state/BlockMapRenderPool.kt | 27 +++++------ .../state/BlockMapRenderPoolDelegate.kt | 6 +++ .../gjallarhorn/state/BlockMapTimelapse.kt | 48 +++---------------- ...ockChangelogSlice.kt => ChangelogSlice.kt} | 12 ++--- 6 files changed, 73 insertions(+), 70 deletions(-) create mode 100644 tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapRenderPoolDelegate.kt rename tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/{BlockChangelogSlice.kt => ChangelogSlice.kt} (54%) diff --git a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/commands/BlockChangeTimelapseCommand.kt b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/commands/BlockChangeTimelapseCommand.kt index 5c6c8cc..3cf45bf 100644 --- a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/commands/BlockChangeTimelapseCommand.kt +++ b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/commands/BlockChangeTimelapseCommand.kt @@ -63,13 +63,13 @@ class BlockChangeTimelapseCommand : CliktCommand("Block Change Timelapse", name val changelog = BlockChangelog.query(db, filter) logger.info("Block Changelog: ${changelog.changes.size} changes") val timelapse = BlockMapTimelapse() - var slices = timelapse.calculateChangelogSlices(changelog, timelapseMode.interval, timelapseIntervalLimit) + var slices = changelog.calculateChangelogSlices(timelapseMode.interval, timelapseIntervalLimit) if (timelapseSpeedChangeThreshold != null && timelapseSpeedChangeMinimumIntervalSeconds != null) { val minimumInterval = Duration.ofSeconds(timelapseSpeedChangeMinimumIntervalSeconds!!.toLong()) val blockChangeThreshold = timelapseSpeedChangeThreshold!! - slices = timelapse.splitChangelogSlicesWithThreshold(changelog, blockChangeThreshold, minimumInterval, slices) + slices = changelog.splitChangelogSlicesWithThreshold(blockChangeThreshold, minimumInterval, slices) } logger.info("Timelapse Slices: ${slices.size} slices") @@ -80,7 +80,7 @@ class BlockChangeTimelapseCommand : CliktCommand("Block Change Timelapse", name changelog = changelog, blockTrackMode = if (considerAirBlocks) BlockTrackMode.AirOnDelete else BlockTrackMode.RemoveOnDelete, delegate = timelapse, - rendererFactory = { expanse -> render.create(expanse) }, + createRendererFunction = { expanse -> render.create(expanse) }, threadPoolExecutor = threadPoolExecutor ) { slice, result -> val speed = slice.relative.toSeconds().toDouble() / timelapseMode.interval.toSeconds().toDouble() diff --git a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockChangelog.kt b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockChangelog.kt index c447ab3..05b1712 100644 --- a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockChangelog.kt +++ b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockChangelog.kt @@ -5,20 +5,56 @@ import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Op import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.transactions.transaction +import java.time.Duration +import java.time.Instant +import java.util.stream.Stream class BlockChangelog( val changes: List ) { - fun slice(slice: BlockChangelogSlice): BlockChangelog = BlockChangelog(changes.filter { + fun slice(slice: ChangelogSlice): BlockChangelog = BlockChangelog(changes.filter { slice.isTimeWithin(it.time) }) - fun countRelativeChangesInSlice(slice: BlockChangelogSlice): Int = changes.count { + fun countRelativeChangesInSlice(slice: ChangelogSlice): Int = changes.count { slice.isRelativeWithin(it.time) } - val changeTimeRange: BlockChangelogSlice - get() = BlockChangelogSlice(changes.minOf { it.time }, changes.maxOf { it.time }) + val changeTimeRange: ChangelogSlice + get() = ChangelogSlice(changes.minOf { it.time }, changes.maxOf { it.time }) + + fun calculateChangelogSlices(interval: Duration, limit: Int? = null): List { + val (start, end) = changeTimeRange + var intervals = mutableListOf() + var current = start + while (!current.isAfter(end)) { + intervals.add(current) + current = current.plus(interval) + } + + if (limit != null) { + intervals = intervals.takeLast(limit).toMutableList() + } + return intervals.map { ChangelogSlice(start, it, interval) } + } + + fun splitChangelogSlicesWithThreshold( + targetChangeThreshold: Int, + minimumTimeInterval: Duration, + slices: List + ): List { + return slices.parallelStream().flatMap { slice -> + val count = countRelativeChangesInSlice(slice) + if (count < targetChangeThreshold || + slice.relative < minimumTimeInterval + ) { + return@flatMap Stream.of(slice) + } + + val split = slice.split() + return@flatMap splitChangelogSlicesWithThreshold(targetChangeThreshold, minimumTimeInterval, split).parallelStream() + }.toList() + } companion object { fun query(db: Database, filter: Op = Op.TRUE): BlockChangelog = transaction(db) { diff --git a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapRenderPool.kt b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapRenderPool.kt index 532db10..a3ba841 100644 --- a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapRenderPool.kt +++ b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapRenderPool.kt @@ -7,16 +7,16 @@ import java.util.concurrent.* class BlockMapRenderPool( val changelog: BlockChangelog, val blockTrackMode: BlockTrackMode, - val rendererFactory: (BlockExpanse) -> BlockMapRenderer, - val delegate: RenderPoolDelegate, + val createRendererFunction: (BlockExpanse) -> BlockMapRenderer, + val delegate: BlockMapRenderPoolDelegate, val threadPoolExecutor: ThreadPoolExecutor, - val renderResultCallback: (BlockChangelogSlice, T) -> Unit + val renderResultCallback: (ChangelogSlice, T) -> Unit ) { - private val trackers = ConcurrentHashMap() - private val playbackJobFutures = ConcurrentHashMap>() - private val renderJobFutures = ConcurrentHashMap>() + private val trackers = ConcurrentHashMap() + private val playbackJobFutures = ConcurrentHashMap>() + private val renderJobFutures = ConcurrentHashMap>() - fun submitPlaybackJob(id: String, slice: BlockChangelogSlice) { + fun submitPlaybackJob(id: String, slice: ChangelogSlice) { val future = threadPoolExecutor.submit { try { runPlaybackSlice(id, slice) @@ -27,7 +27,7 @@ class BlockMapRenderPool( playbackJobFutures[slice] = future } - fun submitRenderJob(slice: BlockChangelogSlice, callback: () -> T) { + fun submitRenderJob(slice: ChangelogSlice, callback: () -> T) { val future = threadPoolExecutor.submit { try { val result = callback() @@ -39,7 +39,7 @@ class BlockMapRenderPool( renderJobFutures[slice] = future } - fun render(slices: List) { + fun render(slices: List) { for (slice in slices) { submitPlaybackJob((slices.indexOf(slice) + 1).toString(), slice) } @@ -48,30 +48,27 @@ class BlockMapRenderPool( future.get() } - delegate.buildRenderJobs(this, trackers) + delegate.onAllPlaybackComplete(this, trackers) for (future in renderJobFutures.values) { future.get() } } - private fun runPlaybackSlice(id: String, slice: BlockChangelogSlice) { + private fun runPlaybackSlice(id: String, slice: ChangelogSlice) { val start = System.currentTimeMillis() val sliced = changelog.slice(slice) val tracker = BlockLogTracker(blockTrackMode) tracker.replay(sliced) if (tracker.isNotEmpty()) { trackers[slice] = tracker + delegate.onSinglePlaybackComplete(this, slice, tracker) } val end = System.currentTimeMillis() val timeInMilliseconds = end - start logger.debug("Playback Completed for Slice $id in ${timeInMilliseconds}ms") } - interface RenderPoolDelegate { - fun buildRenderJobs(pool: BlockMapRenderPool, trackers: MutableMap) - } - companion object { private val logger = LoggerFactory.getLogger(BlockMapRenderPool::class.java) } diff --git a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapRenderPoolDelegate.kt b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapRenderPoolDelegate.kt new file mode 100644 index 0000000..27efec8 --- /dev/null +++ b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapRenderPoolDelegate.kt @@ -0,0 +1,6 @@ +package cloud.kubelet.foundation.gjallarhorn.state + +interface BlockMapRenderPoolDelegate { + fun onSinglePlaybackComplete(pool: BlockMapRenderPool, slice: ChangelogSlice, tracker: BlockLogTracker) + fun onAllPlaybackComplete(pool: BlockMapRenderPool, trackers: Map) +} diff --git a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapTimelapse.kt b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapTimelapse.kt index 7a7f8aa..a3ac352 100644 --- a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapTimelapse.kt +++ b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockMapTimelapse.kt @@ -1,50 +1,14 @@ package cloud.kubelet.foundation.gjallarhorn.state -import java.time.Duration -import java.time.Instant -import java.util.stream.Stream - class BlockMapTimelapse : - BlockMapRenderPool.RenderPoolDelegate { - fun calculateChangelogSlices( - changelog: BlockChangelog, interval: Duration, limit: Int? = null - ): List { - val (start, end) = changelog.changeTimeRange - var intervals = mutableListOf() - var current = start - while (!current.isAfter(end)) { - intervals.add(current) - current = current.plus(interval) - } - - if (limit != null) { - intervals = intervals.takeLast(limit).toMutableList() - } - return intervals.map { BlockChangelogSlice(start, it, interval) } + BlockMapRenderPoolDelegate { + override fun onSinglePlaybackComplete(pool: BlockMapRenderPool, slice: ChangelogSlice, tracker: BlockLogTracker) { + throw UnsupportedOperationException() } - fun splitChangelogSlicesWithThreshold( - changelog: BlockChangelog, - targetChangeThreshold: Int, - minimumTimeInterval: Duration, - slices: List - ): List { - return slices.parallelStream().flatMap { slice -> - val count = changelog.countRelativeChangesInSlice(slice) - if (count < targetChangeThreshold || - slice.relative < minimumTimeInterval - ) { - return@flatMap Stream.of(slice) - } - - val split = slice.split() - return@flatMap splitChangelogSlicesWithThreshold(changelog, targetChangeThreshold, minimumTimeInterval, split).parallelStream() - }.toList() - } - - override fun buildRenderJobs( + override fun onAllPlaybackComplete( pool: BlockMapRenderPool, - trackers: MutableMap + trackers: Map ) { if (trackers.isEmpty()) { return @@ -56,7 +20,7 @@ class BlockMapTimelapse : val globalBlockMax = BlockCoordinate.maxOf(allBlockMaxes) val globalBlockExpanse = BlockExpanse.offsetAndMax(globalBlockOffset, globalBlockMax) - val renderer = pool.rendererFactory(globalBlockExpanse) + val renderer = pool.createRendererFunction(globalBlockExpanse) for ((slice, tracker) in trackers) { pool.submitRenderJob(slice) { val map = tracker.buildBlockMap(globalBlockExpanse.offset) diff --git a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockChangelogSlice.kt b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/ChangelogSlice.kt similarity index 54% rename from tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockChangelogSlice.kt rename to tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/ChangelogSlice.kt index 33099c8..bea46f7 100644 --- a/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/BlockChangelogSlice.kt +++ b/tool-gjallarhorn/src/main/kotlin/cloud/kubelet/foundation/gjallarhorn/state/ChangelogSlice.kt @@ -3,21 +3,21 @@ package cloud.kubelet.foundation.gjallarhorn.state import java.time.Duration import java.time.Instant -data class BlockChangelogSlice(val from: Instant, val to: Instant, val relative: Duration) { +data class ChangelogSlice(val from: Instant, val to: Instant, val relative: Duration) { constructor(from: Instant, to: Instant) : this(from, to, Duration.ofMillis(to.toEpochMilli() - from.toEpochMilli())) - val changeResolutionTime: Instant = to.minus(relative) + val relativeChangeStart: Instant = to.minus(relative) fun isTimeWithin(time: Instant) = time in from..to - fun isRelativeWithin(time: Instant) = time in changeResolutionTime..to + fun isRelativeWithin(time: Instant) = time in relativeChangeStart..to - fun split(): List { + fun split(): List { val half = relative.dividedBy(2) val initial = to.minus(relative) val first = initial.plus(half) return listOf( - BlockChangelogSlice(from, first, half), - BlockChangelogSlice(from, to, half) + ChangelogSlice(from, first, half), + ChangelogSlice(from, to, half) ) } }