Skip to content

Commit

Permalink
Use one conversion context for all conversions. And fix an issue with…
Browse files Browse the repository at this point in the history
… keyDefinition comparison.
  • Loading branch information
jurmous committed Feb 11, 2024
1 parent 87c74c6 commit dc55b4f
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ open class RootDataModel<DM: IsValuesDataModel> internal constructor(
migrationReasons += "Major version was increased: ${storedDataModel.Meta.version} -> ${this.Meta.version}"
}

if (storedDataModel.Meta.keyDefinition !== this.Meta.keyDefinition) {
if (storedDataModel.Meta.keyDefinition != this.Meta.keyDefinition) {
migrationReasons += "Key definition was not the same"
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import maryk.core.models.migration.MigrationStatus
import maryk.core.models.migration.StoredRootDataModelDefinition
import maryk.core.models.migration.VersionUpdateHandler
import maryk.core.properties.definitions.index.IsIndexable
import maryk.core.query.DefinitionsConversionContext
import maryk.core.query.requests.AddRequest
import maryk.core.query.requests.ChangeRequest
import maryk.core.query.requests.DeleteRequest
Expand Down Expand Up @@ -97,10 +98,12 @@ class HbaseDataStore(
).await()
}

val conversionContext = DefinitionsConversionContext()

for (dataModel in dataModelsById.values) {
val tableName = getTableName(dataModel)
val tableDescriptor = admin.getDescriptor(tableName)
when (val migrationStatus = checkModelIfMigrationIsNeeded(tableDescriptor, dataModel, onlyCheckModelVersion)) {
when (val migrationStatus = checkModelIfMigrationIsNeeded(tableDescriptor, dataModel, onlyCheckModelVersion, conversionContext)) {
MigrationStatus.UpToDate, MigrationStatus.AlreadyProcessed -> Unit // Do nothing since no work is needed
MigrationStatus.NewModel -> {
scheduledVersionUpdateHandlers.add {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import java.util.concurrent.CompletableFuture
suspend fun checkModelIfMigrationIsNeeded(
tableDescriptorFuture: CompletableFuture<TableDescriptor>,
dataModel: IsRootDataModel,
onlyCheckVersion: Boolean
onlyCheckVersion: Boolean,
conversionContext: DefinitionsConversionContext,
): MigrationStatus {
val tableDescriptor = try {
tableDescriptorFuture.await()
Expand All @@ -38,20 +39,20 @@ suspend fun checkModelIfMigrationIsNeeded(

return when {
dataModel.Meta.version != version || !onlyCheckVersion -> {
val context = DefinitionsConversionContext()

// Read currently stored dependent model
tableDescriptor.getValue(TableMetaColumns.Dependents.byteArray)?.let { dependentModelBytes ->
var readIndex = 0
Definitions.Serializer.readProtoBuf(dependentModelBytes.size, { dependentModelBytes[readIndex++] }, context).toDataObject()
Definitions.Serializer.readProtoBuf(dependentModelBytes.size, { dependentModelBytes[readIndex++] }, conversionContext).toDataObject()
}

// Read currently stored model
val modelBytes = tableDescriptor.getValue(TableMetaColumns.Model.byteArray)
?: throw StorageException("Model is unexpectedly missing in metadata for ${dataModel.Meta.name}")

var readIndex = 0
val storedDataModel = RootDataModel.Model.Serializer.readProtoBuf(modelBytes.size, { modelBytes[readIndex++] }, context).toDataObject()
val storedDataModel = RootDataModel.Model.Serializer.readProtoBuf(modelBytes.size, { modelBytes[readIndex++] }, conversionContext).toDataObject()
conversionContext.dataModels[dataModel.Meta.name] = { storedDataModel }

// Check by comparing the data models for if migration is needed
return dataModel.isMigrationNeeded(storedDataModel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import maryk.core.models.migration.MigrationStatus.UpToDate
import maryk.core.models.migration.StoredRootDataModelDefinition
import maryk.core.models.migration.VersionUpdateHandler
import maryk.core.properties.definitions.index.IsIndexable
import maryk.core.query.DefinitionsConversionContext
import maryk.core.query.requests.AddRequest
import maryk.core.query.requests.ChangeRequest
import maryk.core.query.requests.DeleteRequest
Expand Down Expand Up @@ -160,10 +161,12 @@ class RocksDBDataStore(

runBlocking {
launch(Dispatchers.IO) {
val conversionContext = DefinitionsConversionContext()

for ((index, dataModel) in dataModelsById) {
columnFamilyHandlesByDataModelIndex[index]?.let { tableColumnFamilies ->
tableColumnFamilies.model.let { modelColumnFamily ->
when (val migrationStatus = checkModelIfMigrationIsNeeded(db, modelColumnFamily, dataModel, onlyCheckModelVersion)) {
when (val migrationStatus = checkModelIfMigrationIsNeeded(db, modelColumnFamily, dataModel, onlyCheckModelVersion, conversionContext)) {
UpToDate, MigrationStatus.AlreadyProcessed -> Unit // Do nothing since no work is needed
NewModel -> {
scheduledVersionUpdateHandlers.add {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ fun checkModelIfMigrationIsNeeded(
rocksDB: RocksDB,
modelColumnFamily: ColumnFamilyHandle,
dataModel: IsRootDataModel,
onlyCheckVersion: Boolean
onlyCheckVersion: Boolean,
conversionContext: DefinitionsConversionContext,
): MigrationStatus {
val name = rocksDB.get(modelColumnFamily, modelNameKey)?.decodeToString()
val version = rocksDB.get(modelColumnFamily, modelVersionKey)?.let {
Expand All @@ -30,20 +31,19 @@ fun checkModelIfMigrationIsNeeded(

return when {
dataModel.Meta.version != version || !onlyCheckVersion -> {
val context = DefinitionsConversionContext()

// Read currently stored dependent model
rocksDB.get(modelColumnFamily, modelDependentsDefinitionKey)?.let { dependentModelBytes ->
var readIndex = 0
Definitions.Serializer.readProtoBuf(dependentModelBytes.size, { dependentModelBytes[readIndex++] }, context).toDataObject()
Definitions.Serializer.readProtoBuf(dependentModelBytes.size, { dependentModelBytes[readIndex++] }, conversionContext).toDataObject()
}

// Read currently stored model
val modelBytes = rocksDB.get(modelColumnFamily, modelDefinitionKey)
?: throw StorageException("Model is unexpectedly missing in metadata for ${dataModel.Meta.name}")

var readIndex = 0
val storedDataModel = RootDataModel.Model.Serializer.readProtoBuf(modelBytes.size, { modelBytes[readIndex++] }, context).toDataObject()
val storedDataModel = RootDataModel.Model.Serializer.readProtoBuf(modelBytes.size, { modelBytes[readIndex++] }, conversionContext).toDataObject()
conversionContext.dataModels[dataModel.Meta.name] = { storedDataModel }

// Check by comparing the data models for if migration is needed
return dataModel.isMigrationNeeded(storedDataModel)
Expand Down

0 comments on commit dc55b4f

Please sign in to comment.