Skip to content

Commit

Permalink
Remove commented code
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Aug 16, 2024
1 parent 52d45fd commit 74e1ed2
Showing 1 changed file with 0 additions and 46 deletions.
46 changes: 0 additions & 46 deletions python/gresearch/spark/diff/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,52 +379,6 @@ def _get_change_column(self,
.alias(self._options.change_column)

def _do_diff(self, left: DataFrame, right: DataFrame, id_columns: List[str], ignore_columns: List[str]) -> DataFrame:
"""
private def doDiff[T, U](
left: Dataset[T],
right: Dataset[U],
idColumns: Seq[String],
ignoreColumns: Seq[String] = Seq.empty
): DataFrame = {
checkSchema(left, right, idColumns, ignoreColumns)
val columns = left.columns.diffCaseSensitivity(ignoreColumns).toList
val pkColumns = if (idColumns.isEmpty) columns else idColumns
val valueColumns = columns.diffCaseSensitivity(pkColumns)
val valueStructFields = left.schema.fields.map(f => f.name -> f).toMap
val valueVolumnsWithComparator = valueColumns.map(c => c -> options.comparatorFor(valueStructFields(c)))
val existsColumnName = distinctPrefixFor(left.columns) + "exists"
val leftWithExists = left.withColumn(existsColumnName, lit(1))
val rightWithExists = right.withColumn(existsColumnName, lit(1))
val joinCondition =
pkColumns.map(c => leftWithExists(backticks(c)) <=> rightWithExists(backticks(c))).reduce(_ && _)
val unChanged = valueVolumnsWithComparator
.map { case (c, cmp) =>
cmp.equiv(leftWithExists(backticks(c)), rightWithExists(backticks(c)))
}
.reduceOption(_ && _)
val changeCondition = not(unChanged.getOrElse(lit(true)))
val diffActionColumn =
when(leftWithExists(existsColumnName).isNull, lit(options.insertDiffValue))
.when(rightWithExists(existsColumnName).isNull, lit(options.deleteDiffValue))
.when(changeCondition, lit(options.changeDiffValue))
.otherwise(lit(options.nochangeDiffValue))
.as(options.diffColumn)
val diffColumns = getDiffColumns(pkColumns, valueColumns, left, right, ignoreColumns).map(_._2)
val changeColumn = getChangeColumn(existsColumnName, valueVolumnsWithComparator, leftWithExists, rightWithExists)
// turn this column into a sequence of one or none column so we can easily concat it below with diffActionColumn and diffColumns
.map(Seq(_))
.getOrElse(Seq.empty[Column])
leftWithExists
.join(rightWithExists, joinCondition, "fullouter")
.select((diffActionColumn +: changeColumn) ++ diffColumns: _*)
}
"""
self._check_schema(left, right, id_columns, ignore_columns)

case_sensitive = left.session().conf.get("spark.sql.caseSensitive") == "true"
Expand Down

0 comments on commit 74e1ed2

Please sign in to comment.