Skip to content

Commit

Permalink
Refactor the static const client and add a parameters version. (#393)
Browse files Browse the repository at this point in the history
  • Loading branch information
mpawliszyn authored Jul 3, 2024
1 parent 67121fe commit 4416d6c
Show file tree
Hide file tree
Showing 15 changed files with 380 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ abstract class DynamoDbBackfill<I : Any, P : Any> : Backfill {
// Like MyBackfill.
val thisType = TypeLiteral.get(this::class.java)

// Like Backfill<MyItem, Parameters>.
// Like MyBackfill<MyItem, Parameters>.
val supertype = thisType.getSupertype(
DynamoDbBackfill::class.java,
).type as ParameterizedType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ abstract class DynamoDbBackfill<I : Any, P : Any> : Backfill {
// Like MyBackfill.
val thisType = TypeLiteral.get(this::class.java)

// Like Backfill<MyItem, Parameters>.
// Like MyBackfill<MyItem, Parameters>.
val supertype = thisType.getSupertype(
DynamoDbBackfill::class.java,
).type as ParameterizedType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class MiskJooqBackfillTests {
assertThat(run.partitionProgressSnapshot.values.single().previousEndKey).isNull()

val scan1 = run.singleScan()
assertThat(scan1.batches).size().isEqualTo(1)
assertThat(scan1.batches).hasSize(1)
assertThat(scan1.batches.single().scanned_record_count).isEqualTo(5)
assertThat(scan1.batches.single().matching_record_count).isEqualTo(0)
testingAssertThat(run.partitionProgressSnapshot.values.single())
Expand Down Expand Up @@ -204,15 +204,15 @@ class MiskJooqBackfillTests {
run.computeCountLimit = 1L

val scan1 = run.precomputeScan()
assertThat(scan1.batches).size().isEqualTo(1)
assertThat(scan1.batches).hasSize(1)
val batch1 = scan1.batches.single()
assertThat(batch1.batch_range.start.utf8()).isEqualTo(backfillRowKeys[0].toString())
assertThat(batch1.matching_record_count).isEqualTo(10)
assertThat(batch1.scanned_record_count).isEqualTo(10)

run.scanSize = 20L
val scan2 = run.precomputeScan()
assertThat(scan2.batches).size().isEqualTo(1)
assertThat(scan2.batches).hasSize(1)
val batch2 = scan2.batches.single()
assertThat(batch2.matching_record_count).isEqualTo(10)
// 5 extra were scanned and skipped, because they were interspersed.
Expand Down Expand Up @@ -247,7 +247,7 @@ class MiskJooqBackfillTests {

run1.computeCountLimit = 2
val scan = run1.singleScan()
assertThat(scan.batches).size().isEqualTo(2)
assertThat(scan.batches).hasSize(2)
assertThat(scan.batches[0].batch_range.end).isLessThan(scan.batches[1].batch_range.start)

// Requesting two batches should give the same batches as requesting one twice.
Expand All @@ -271,7 +271,7 @@ class MiskJooqBackfillTests {
run1.scanSize = 4L
run1.computeCountLimit = 3
val scan = run1.singleScan()
assertThat(scan.batches).size().isEqualTo(3)
assertThat(scan.batches).hasSize(3)
assertThat(scan.batches[0].batch_range.end).isLessThan(scan.batches[1].batch_range.start)
assertThat(scan.batches[1].batch_range.end).isLessThan(scan.batches[2].batch_range.start)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ abstract class SinglePartitionHibernateBackfillTest {
assertThat(run.partitionProgressSnapshot.values.single().previousEndKey).isNull()

val scan1 = run.singleScan()
assertThat(scan1.batches).size().isEqualTo(1)
assertThat(scan1.batches).hasSize(1)
assertThat(scan1.batches.single().scanned_record_count).isEqualTo(5)
assertThat(scan1.batches.single().matching_record_count).isEqualTo(0)
assertThat(run.partitionProgressSnapshot.values.single()).isNotDone()
Expand Down Expand Up @@ -150,15 +150,15 @@ abstract class SinglePartitionHibernateBackfillTest {
run.computeCountLimit = 1L

val scan1 = run.precomputeScan()
assertThat(scan1.batches).size().isEqualTo(1)
assertThat(scan1.batches).hasSize(1)
val batch1 = scan1.batches.single()
assertThat(batch1.batch_range.start.utf8()).isEqualTo(expectedIds[0].toString())
assertThat(batch1.matching_record_count).isEqualTo(10)
assertThat(batch1.scanned_record_count).isEqualTo(10)

run.scanSize = 20L
val scan2 = run.precomputeScan()
assertThat(scan2.batches).size().isEqualTo(1)
assertThat(scan2.batches).hasSize(1)
val batch2 = scan2.batches.single()
assertThat(batch2.matching_record_count).isEqualTo(10)
// 5 extra were scanned and skipped, because they were interspersed.
Expand Down Expand Up @@ -188,7 +188,7 @@ abstract class SinglePartitionHibernateBackfillTest {

run1.computeCountLimit = 2
val scan = run1.singleScan()
assertThat(scan.batches).size().isEqualTo(2)
assertThat(scan.batches).hasSize(2)
assertThat(scan.batches[0].batch_range.end).isLessThan(scan.batches[1].batch_range.start)

// Requesting two batches should give the same batches as requesting one twice.
Expand All @@ -208,7 +208,7 @@ abstract class SinglePartitionHibernateBackfillTest {
run1.scanSize = 4L
run1.computeCountLimit = 3
val scan = run1.singleScan()
assertThat(scan.batches).size().isEqualTo(3)
assertThat(scan.batches).hasSize(3)
assertThat(scan.batches[0].batch_range.end).isLessThan(scan.batches[1].batch_range.start)
assertThat(scan.batches[1].batch_range.end).isLessThan(scan.batches[2].batch_range.start)

Expand Down Expand Up @@ -272,7 +272,7 @@ abstract class SinglePartitionHibernateBackfillTest {
.apply { configureForTest() }

run.execute()
assertThat(run.backfill.idsRanDry).size().isEqualTo(5)
assertThat(run.backfill.idsRanDry).hasSize(5)
assertThat(run.backfill.idsRanWet).isEmpty()
// We got beef as a parameter
assertThat(run.backfill.parametersLog).containsExactly(SandwichParameters("beef"))
Expand Down Expand Up @@ -300,7 +300,7 @@ abstract class SinglePartitionHibernateBackfillTest {
.apply { configureForTest() }

run.execute()
assertThat(run.backfill.idsRanDry).size().isEqualTo(20)
assertThat(run.backfill.idsRanDry).hasSize(20)
assertThat(run.backfill.idsRanWet).isEmpty()
// Null parameter used the default
assertThat(run.backfill.parametersLog).contains(SandwichParameters("chicken"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class S3Utf8StringNewlineBackfillTest {
.containsExactlyInAnyOrder("main-blt")

// See that we processed two empty lines
assertThat(run.backfill.backfilledIngredients.filter { it.second == "" }).size().isEqualTo(2)
assertThat(run.backfill.backfilledIngredients.filter { it.second == "" }).hasSize(2)

// Now run efficiently aka "without empty ingredient lines"
val optimizedRun = backfila.createWetRun<OptimizedLunchBackfill>(parameters = RecipeAttributes())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,57 +1,12 @@
package app.cash.backfila.client.stat

import app.cash.backfila.client.Backfill
import app.cash.backfila.client.BackfillConfig
import app.cash.backfila.client.PrepareBackfillConfig
import com.google.inject.TypeLiteral
import com.squareup.moshi.Types
import java.lang.reflect.ParameterizedType
import kotlin.reflect.KClass

abstract class StaticDatasourceBackfill<I : Any, P : Any> : Backfill {
val itemType: KClass<I>

/*
* Extract the type parameters from the subtype's generic declaration. This uses Guice magic to
* read the ("I") type parameter.
*/
init {
// Like MyBackfill.
val thisType = TypeLiteral.get(this::class.java)

// Like Backfill<MyItem, Parameters>.
val supertype = thisType.getSupertype(
StaticDatasourceBackfill::class.java,
).type as ParameterizedType

// Like MyItem.
@Suppress("UNCHECKED_CAST")
itemType = (Types.getRawType(supertype.actualTypeArguments[0]) as Class<I>).kotlin
}

/**
* Override this and throw an exception to prevent the backfill from being created.
* This is also a good place to do any prep work before batches are run.
*/
open fun validate(config: PrepareBackfillConfig<P>) {}

abstract class StaticDatasourceBackfill<I : Any, P : Any> : StaticDatasourceBackfillBase<I, P>() {
/**
* Called for each batch of matching records.
* Override in a backfill to process all records in a batch.
*/
open fun runBatch(items: List<@JvmSuppressWildcards I>, config: BackfillConfig<P>) {
items.forEach { runOne(it, config) }
}

/**
* Called for each matching record.
* Override in a backfill to process one record at a time.
*/
open fun runOne(item: I, config: BackfillConfig<P>) {
}

/**
* This invokes the static list of items that the backfill will iterate over.
* This provides the static list of items that the backfill will iterate over.
*/
abstract val staticDatasource: List<I>

override fun getStaticDatasource(config: PrepareBackfillConfig<P>) = staticDatasource
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package app.cash.backfila.client.stat

import app.cash.backfila.client.Backfill
import app.cash.backfila.client.BackfillConfig
import app.cash.backfila.client.PrepareBackfillConfig
import com.google.inject.TypeLiteral
import com.squareup.moshi.Types
import java.lang.reflect.ParameterizedType
import kotlin.reflect.KClass

abstract class StaticDatasourceBackfillBase<I : Any, P : Any> : Backfill {
val itemType: KClass<I>

/*
* Extract the type parameters from the subtype's generic declaration. This uses Guice magic to
* read the ("I") type parameter.
*/
init {
// Like MyBackfill.
val thisType = TypeLiteral.get(this::class.java)

// Like MyBackfill<MyItem, Parameters>.
val supertype = thisType.getSupertype(
StaticDatasourceBackfillBase::class.java,
).type as ParameterizedType

// Like MyItem.
@Suppress("UNCHECKED_CAST")
itemType = (Types.getRawType(supertype.actualTypeArguments[0]) as Class<I>).kotlin
}

/**
* Override this and throw an exception to prevent the backfill from being created.
* This is also a good place to do any prep work before batches are run.
*/
open fun validate(config: PrepareBackfillConfig<P>) {}

/**
* Called for each batch of matching records.
* Override in a backfill to process all records in a batch.
*/
open fun runBatch(items: List<@JvmSuppressWildcards I>, config: BackfillConfig<P>) {
items.forEach { runOne(it, config) }
}

/**
* Called for each matching record.
* Override in a backfill to process one record at a time.
*/
open fun runOne(item: I, config: BackfillConfig<P>) {
}

abstract fun getStaticDatasource(config: PrepareBackfillConfig<P>): List<@JvmSuppressWildcards I>
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import kotlin.reflect.jvm.jvmName
/**
* Installs the [BackfillBackend] for Static Datasource backfills. See the java doc for [RealBackfillModule].
*/
class StaticDatasourceBackfillModule<T : StaticDatasourceBackfill<*, *>> private constructor(
class StaticDatasourceBackfillModule<T : StaticDatasourceBackfillBase<*, *>> private constructor(
private val backfillClass: KClass<T>,
) : AbstractModule() {
override fun configure() {
Expand All @@ -26,15 +26,15 @@ class StaticDatasourceBackfillModule<T : StaticDatasourceBackfill<*, *>> private
}

companion object {
inline fun <reified T : StaticDatasourceBackfill<*, *>> create(): StaticDatasourceBackfillModule<T> = create(T::class)
inline fun <reified T : StaticDatasourceBackfillBase<*, *>> create(): StaticDatasourceBackfillModule<T> = create(T::class)

@JvmStatic
fun <T : StaticDatasourceBackfill<*, *>> create(backfillClass: KClass<T>): StaticDatasourceBackfillModule<T> {
fun <T : StaticDatasourceBackfillBase<*, *>> create(backfillClass: KClass<T>): StaticDatasourceBackfillModule<T> {
return StaticDatasourceBackfillModule(backfillClass)
}

@JvmStatic
fun <T : StaticDatasourceBackfill<*, *>> create(backfillClass: Class<T>): StaticDatasourceBackfillModule<T> {
fun <T : StaticDatasourceBackfillBase<*, *>> create(backfillClass: Class<T>): StaticDatasourceBackfillModule<T> {
return StaticDatasourceBackfillModule(backfillClass.kotlin)
}
}
Expand All @@ -53,7 +53,7 @@ private object StaticDatasourceBackfillBackendModule : AbstractModule() {
private fun mapBinder(binder: Binder) = MapBinder.newMapBinder(
binder,
object : TypeLiteral<String>() {},
object : TypeLiteral<KClass<out StaticDatasourceBackfill<*, *>>>() {},
object : TypeLiteral<KClass<out StaticDatasourceBackfillBase<*, *>>>() {},
ForStaticBackend::class.java,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import app.cash.backfila.client.spi.BackfillBackend
import app.cash.backfila.client.spi.BackfillOperator
import app.cash.backfila.client.spi.BackfillRegistration
import app.cash.backfila.client.stat.ForStaticBackend
import app.cash.backfila.client.stat.StaticDatasourceBackfill
import app.cash.backfila.client.stat.StaticDatasourceBackfillBase
import com.google.inject.Injector
import com.google.inject.TypeLiteral
import com.squareup.moshi.Types
Expand All @@ -21,21 +21,21 @@ import kotlin.reflect.full.findAnnotation
@Singleton
class StaticDatasourceBackend @Inject constructor(
private val injector: Injector,
@ForStaticBackend private val backfills: MutableMap<String, KClass<out StaticDatasourceBackfill<*, *>>>,
@ForStaticBackend private val backfills: MutableMap<String, KClass<out StaticDatasourceBackfillBase<*, *>>>,
) : BackfillBackend {

/** Creates Backfill instances. Each backfill ID gets a new Backfill instance. */
private fun getBackfill(name: String): StaticDatasourceBackfill<*, *>? {
private fun getBackfill(name: String): StaticDatasourceBackfillBase<*, *>? {
val backfillClass = backfills[name]
return if (backfillClass != null) {
injector.getInstance(backfillClass.java) as StaticDatasourceBackfill<*, *>
injector.getInstance(backfillClass.java) as StaticDatasourceBackfillBase<*, *>
} else {
null
}
}

private fun <E : Any, Param : Any> createStaticDatasourceOperator(
backfill: StaticDatasourceBackfill<E, Param>,
backfill: StaticDatasourceBackfillBase<E, Param>,
) = StaticDatasourceBackfillOperator(
backfill,
BackfilaParametersOperator(parametersClass(backfill::class)),
Expand All @@ -46,7 +46,7 @@ class StaticDatasourceBackend @Inject constructor(

if (backfill != null) {
@Suppress("UNCHECKED_CAST") // We don't know the types statically, so fake them.
return createStaticDatasourceOperator(backfill as StaticDatasourceBackfill<Any, Any>)
return createStaticDatasourceOperator(backfill as StaticDatasourceBackfillBase<Any, Any>)
}

return null
Expand All @@ -57,18 +57,18 @@ class StaticDatasourceBackend @Inject constructor(
BackfillRegistration(
name = it.key,
description = it.value.findAnnotation<Description>()?.text,
parametersClass = parametersClass(it.value as KClass<StaticDatasourceBackfill<Any, Any>>),
parametersClass = parametersClass(it.value as KClass<StaticDatasourceBackfillBase<Any, Any>>),
deleteBy = it.value.findAnnotation<DeleteBy>()?.parseDeleteByDate(),
)
}.toSet()
}

private fun <P : Any> parametersClass(backfillClass: KClass<out StaticDatasourceBackfill<*, P>>): KClass<P> {
private fun <P : Any> parametersClass(backfillClass: KClass<out StaticDatasourceBackfillBase<*, P>>): KClass<P> {
// Like MyBackfill.
val thisType = TypeLiteral.get(backfillClass.java)

// Like StaticDatasourceBackfill<MyItemClass, MyParameterClass>.
val supertype = thisType.getSupertype(StaticDatasourceBackfill::class.java).type as ParameterizedType
// Like StaticDatasourceBackfillBase<MyItemClass, MyParameterClass>.
val supertype = thisType.getSupertype(StaticDatasourceBackfillBase::class.java).type as ParameterizedType

// Like MyParameterClass
return (Types.getRawType(supertype.actualTypeArguments[1]) as Class<P>).kotlin
Expand Down
Loading

0 comments on commit 4416d6c

Please sign in to comment.