Gjallarhorn: Render pool rework and cleanup.

This commit is contained in:
Kenneth Endfinger
2022-01-17 21:07:40 -05:00
parent 011e3100bf
commit 54cd41e925
6 changed files with 73 additions and 70 deletions

View File

@ -63,13 +63,13 @@ class BlockChangeTimelapseCommand : CliktCommand("Block Change Timelapse", name
val changelog = BlockChangelog.query(db, filter)
logger.info("Block Changelog: ${changelog.changes.size} changes")
val timelapse = BlockMapTimelapse<BufferedImage>()
var slices = timelapse.calculateChangelogSlices(changelog, timelapseMode.interval, timelapseIntervalLimit)
var slices = changelog.calculateChangelogSlices(timelapseMode.interval, timelapseIntervalLimit)
if (timelapseSpeedChangeThreshold != null && timelapseSpeedChangeMinimumIntervalSeconds != null) {
val minimumInterval = Duration.ofSeconds(timelapseSpeedChangeMinimumIntervalSeconds!!.toLong())
val blockChangeThreshold = timelapseSpeedChangeThreshold!!
slices = timelapse.splitChangelogSlicesWithThreshold(changelog, blockChangeThreshold, minimumInterval, slices)
slices = changelog.splitChangelogSlicesWithThreshold(blockChangeThreshold, minimumInterval, slices)
}
logger.info("Timelapse Slices: ${slices.size} slices")
@ -80,7 +80,7 @@ class BlockChangeTimelapseCommand : CliktCommand("Block Change Timelapse", name
changelog = changelog,
blockTrackMode = if (considerAirBlocks) BlockTrackMode.AirOnDelete else BlockTrackMode.RemoveOnDelete,
delegate = timelapse,
rendererFactory = { expanse -> render.create(expanse) },
createRendererFunction = { expanse -> render.create(expanse) },
threadPoolExecutor = threadPoolExecutor
) { slice, result ->
val speed = slice.relative.toSeconds().toDouble() / timelapseMode.interval.toSeconds().toDouble()

View File

@ -5,20 +5,56 @@ import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Op
import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.transactions.transaction
import java.time.Duration
import java.time.Instant
import java.util.stream.Stream
class BlockChangelog(
val changes: List<BlockChange>
) {
fun slice(slice: BlockChangelogSlice): BlockChangelog = BlockChangelog(changes.filter {
fun slice(slice: ChangelogSlice): BlockChangelog = BlockChangelog(changes.filter {
slice.isTimeWithin(it.time)
})
fun countRelativeChangesInSlice(slice: BlockChangelogSlice): Int = changes.count {
fun countRelativeChangesInSlice(slice: ChangelogSlice): Int = changes.count {
slice.isRelativeWithin(it.time)
}
val changeTimeRange: BlockChangelogSlice
get() = BlockChangelogSlice(changes.minOf { it.time }, changes.maxOf { it.time })
val changeTimeRange: ChangelogSlice
get() = ChangelogSlice(changes.minOf { it.time }, changes.maxOf { it.time })
fun calculateChangelogSlices(interval: Duration, limit: Int? = null): List<ChangelogSlice> {
val (start, end) = changeTimeRange
var intervals = mutableListOf<Instant>()
var current = start
while (!current.isAfter(end)) {
intervals.add(current)
current = current.plus(interval)
}
if (limit != null) {
intervals = intervals.takeLast(limit).toMutableList()
}
return intervals.map { ChangelogSlice(start, it, interval) }
}
fun splitChangelogSlicesWithThreshold(
targetChangeThreshold: Int,
minimumTimeInterval: Duration,
slices: List<ChangelogSlice>
): List<ChangelogSlice> {
return slices.parallelStream().flatMap { slice ->
val count = countRelativeChangesInSlice(slice)
if (count < targetChangeThreshold ||
slice.relative < minimumTimeInterval
) {
return@flatMap Stream.of(slice)
}
val split = slice.split()
return@flatMap splitChangelogSlicesWithThreshold(targetChangeThreshold, minimumTimeInterval, split).parallelStream()
}.toList()
}
companion object {
fun query(db: Database, filter: Op<Boolean> = Op.TRUE): BlockChangelog = transaction(db) {

View File

@ -7,16 +7,16 @@ import java.util.concurrent.*
class BlockMapRenderPool<T>(
val changelog: BlockChangelog,
val blockTrackMode: BlockTrackMode,
val rendererFactory: (BlockExpanse) -> BlockMapRenderer<T>,
val delegate: RenderPoolDelegate<T>,
val createRendererFunction: (BlockExpanse) -> BlockMapRenderer<T>,
val delegate: BlockMapRenderPoolDelegate<T>,
val threadPoolExecutor: ThreadPoolExecutor,
val renderResultCallback: (BlockChangelogSlice, T) -> Unit
val renderResultCallback: (ChangelogSlice, T) -> Unit
) {
private val trackers = ConcurrentHashMap<BlockChangelogSlice, BlockLogTracker>()
private val playbackJobFutures = ConcurrentHashMap<BlockChangelogSlice, Future<*>>()
private val renderJobFutures = ConcurrentHashMap<BlockChangelogSlice, Future<*>>()
private val trackers = ConcurrentHashMap<ChangelogSlice, BlockLogTracker>()
private val playbackJobFutures = ConcurrentHashMap<ChangelogSlice, Future<*>>()
private val renderJobFutures = ConcurrentHashMap<ChangelogSlice, Future<*>>()
fun submitPlaybackJob(id: String, slice: BlockChangelogSlice) {
fun submitPlaybackJob(id: String, slice: ChangelogSlice) {
val future = threadPoolExecutor.submit {
try {
runPlaybackSlice(id, slice)
@ -27,7 +27,7 @@ class BlockMapRenderPool<T>(
playbackJobFutures[slice] = future
}
fun submitRenderJob(slice: BlockChangelogSlice, callback: () -> T) {
fun submitRenderJob(slice: ChangelogSlice, callback: () -> T) {
val future = threadPoolExecutor.submit {
try {
val result = callback()
@ -39,7 +39,7 @@ class BlockMapRenderPool<T>(
renderJobFutures[slice] = future
}
fun render(slices: List<BlockChangelogSlice>) {
fun render(slices: List<ChangelogSlice>) {
for (slice in slices) {
submitPlaybackJob((slices.indexOf(slice) + 1).toString(), slice)
}
@ -48,30 +48,27 @@ class BlockMapRenderPool<T>(
future.get()
}
delegate.buildRenderJobs(this, trackers)
delegate.onAllPlaybackComplete(this, trackers)
for (future in renderJobFutures.values) {
future.get()
}
}
private fun runPlaybackSlice(id: String, slice: BlockChangelogSlice) {
private fun runPlaybackSlice(id: String, slice: ChangelogSlice) {
val start = System.currentTimeMillis()
val sliced = changelog.slice(slice)
val tracker = BlockLogTracker(blockTrackMode)
tracker.replay(sliced)
if (tracker.isNotEmpty()) {
trackers[slice] = tracker
delegate.onSinglePlaybackComplete(this, slice, tracker)
}
val end = System.currentTimeMillis()
val timeInMilliseconds = end - start
logger.debug("Playback Completed for Slice $id in ${timeInMilliseconds}ms")
}
interface RenderPoolDelegate<T> {
fun buildRenderJobs(pool: BlockMapRenderPool<T>, trackers: MutableMap<BlockChangelogSlice, BlockLogTracker>)
}
companion object {
private val logger = LoggerFactory.getLogger(BlockMapRenderPool::class.java)
}

View File

@ -0,0 +1,6 @@
package cloud.kubelet.foundation.gjallarhorn.state
interface BlockMapRenderPoolDelegate<T> {
fun onSinglePlaybackComplete(pool: BlockMapRenderPool<T>, slice: ChangelogSlice, tracker: BlockLogTracker)
fun onAllPlaybackComplete(pool: BlockMapRenderPool<T>, trackers: Map<ChangelogSlice, BlockLogTracker>)
}

View File

@ -1,50 +1,14 @@
package cloud.kubelet.foundation.gjallarhorn.state
import java.time.Duration
import java.time.Instant
import java.util.stream.Stream
class BlockMapTimelapse<T> :
BlockMapRenderPool.RenderPoolDelegate<T> {
fun calculateChangelogSlices(
changelog: BlockChangelog, interval: Duration, limit: Int? = null
): List<BlockChangelogSlice> {
val (start, end) = changelog.changeTimeRange
var intervals = mutableListOf<Instant>()
var current = start
while (!current.isAfter(end)) {
intervals.add(current)
current = current.plus(interval)
}
if (limit != null) {
intervals = intervals.takeLast(limit).toMutableList()
}
return intervals.map { BlockChangelogSlice(start, it, interval) }
BlockMapRenderPoolDelegate<T> {
override fun onSinglePlaybackComplete(pool: BlockMapRenderPool<T>, slice: ChangelogSlice, tracker: BlockLogTracker) {
throw UnsupportedOperationException()
}
fun splitChangelogSlicesWithThreshold(
changelog: BlockChangelog,
targetChangeThreshold: Int,
minimumTimeInterval: Duration,
slices: List<BlockChangelogSlice>
): List<BlockChangelogSlice> {
return slices.parallelStream().flatMap { slice ->
val count = changelog.countRelativeChangesInSlice(slice)
if (count < targetChangeThreshold ||
slice.relative < minimumTimeInterval
) {
return@flatMap Stream.of(slice)
}
val split = slice.split()
return@flatMap splitChangelogSlicesWithThreshold(changelog, targetChangeThreshold, minimumTimeInterval, split).parallelStream()
}.toList()
}
override fun buildRenderJobs(
override fun onAllPlaybackComplete(
pool: BlockMapRenderPool<T>,
trackers: MutableMap<BlockChangelogSlice, BlockLogTracker>
trackers: Map<ChangelogSlice, BlockLogTracker>
) {
if (trackers.isEmpty()) {
return
@ -56,7 +20,7 @@ class BlockMapTimelapse<T> :
val globalBlockMax = BlockCoordinate.maxOf(allBlockMaxes)
val globalBlockExpanse = BlockExpanse.offsetAndMax(globalBlockOffset, globalBlockMax)
val renderer = pool.rendererFactory(globalBlockExpanse)
val renderer = pool.createRendererFunction(globalBlockExpanse)
for ((slice, tracker) in trackers) {
pool.submitRenderJob(slice) {
val map = tracker.buildBlockMap(globalBlockExpanse.offset)

View File

@ -3,21 +3,21 @@ package cloud.kubelet.foundation.gjallarhorn.state
import java.time.Duration
import java.time.Instant
data class BlockChangelogSlice(val from: Instant, val to: Instant, val relative: Duration) {
data class ChangelogSlice(val from: Instant, val to: Instant, val relative: Duration) {
constructor(from: Instant, to: Instant) : this(from, to, Duration.ofMillis(to.toEpochMilli() - from.toEpochMilli()))
val changeResolutionTime: Instant = to.minus(relative)
val relativeChangeStart: Instant = to.minus(relative)
fun isTimeWithin(time: Instant) = time in from..to
fun isRelativeWithin(time: Instant) = time in changeResolutionTime..to
fun isRelativeWithin(time: Instant) = time in relativeChangeStart..to
fun split(): List<BlockChangelogSlice> {
fun split(): List<ChangelogSlice> {
val half = relative.dividedBy(2)
val initial = to.minus(relative)
val first = initial.plus(half)
return listOf(
BlockChangelogSlice(from, first, half),
BlockChangelogSlice(from, to, half)
ChangelogSlice(from, first, half),
ChangelogSlice(from, to, half)
)
}
}