diff --git a/python/gresearch/spark/diff/__init__.py b/python/gresearch/spark/diff/__init__.py index 7a338526..f6320603 100644 --- a/python/gresearch/spark/diff/__init__.py +++ b/python/gresearch/spark/diff/__init__.py @@ -379,52 +379,6 @@ def _get_change_column(self, .alias(self._options.change_column) def _do_diff(self, left: DataFrame, right: DataFrame, id_columns: List[str], ignore_columns: List[str]) -> DataFrame: - """ - private def doDiff[T, U]( - left: Dataset[T], - right: Dataset[U], - idColumns: Seq[String], - ignoreColumns: Seq[String] = Seq.empty - ): DataFrame = { - checkSchema(left, right, idColumns, ignoreColumns) - - val columns = left.columns.diffCaseSensitivity(ignoreColumns).toList - val pkColumns = if (idColumns.isEmpty) columns else idColumns - val valueColumns = columns.diffCaseSensitivity(pkColumns) - val valueStructFields = left.schema.fields.map(f => f.name -> f).toMap - val valueVolumnsWithComparator = valueColumns.map(c => c -> options.comparatorFor(valueStructFields(c))) - - val existsColumnName = distinctPrefixFor(left.columns) + "exists" - val leftWithExists = left.withColumn(existsColumnName, lit(1)) - val rightWithExists = right.withColumn(existsColumnName, lit(1)) - val joinCondition = - pkColumns.map(c => leftWithExists(backticks(c)) <=> rightWithExists(backticks(c))).reduce(_ && _) - val unChanged = valueVolumnsWithComparator - .map { case (c, cmp) => - cmp.equiv(leftWithExists(backticks(c)), rightWithExists(backticks(c))) - } - .reduceOption(_ && _) - - val changeCondition = not(unChanged.getOrElse(lit(true))) - - val diffActionColumn = - when(leftWithExists(existsColumnName).isNull, lit(options.insertDiffValue)) - .when(rightWithExists(existsColumnName).isNull, lit(options.deleteDiffValue)) - .when(changeCondition, lit(options.changeDiffValue)) - .otherwise(lit(options.nochangeDiffValue)) - .as(options.diffColumn) - - val diffColumns = getDiffColumns(pkColumns, valueColumns, left, right, ignoreColumns).map(_._2) - val changeColumn = getChangeColumn(existsColumnName, valueVolumnsWithComparator, leftWithExists, rightWithExists) - // turn this column into a sequence of one or none column so we can easily concat it below with diffActionColumn and diffColumns - .map(Seq(_)) - .getOrElse(Seq.empty[Column]) - - leftWithExists - .join(rightWithExists, joinCondition, "fullouter") - .select((diffActionColumn +: changeColumn) ++ diffColumns: _*) - } - """ self._check_schema(left, right, id_columns, ignore_columns) case_sensitive = left.session().conf.get("spark.sql.caseSensitive") == "true"