Skip to content

Commit

Permalink
Prepare for scalafmt
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Oct 10, 2023
1 parent 47b901b commit 0b0670a
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 6 deletions.
6 changes: 6 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
version = 3.7.14

runner.dialect = scala213
rewrite.trailingCommas.style = keep

maxColumn = 120
3 changes: 2 additions & 1 deletion src/main/scala/uk/co/gresearch/spark/Histogram.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ object Histogram {
.call(
bins.foldLeft(_) { case (df, bin) =>
df.withColumn(s"${bin.last}", when(valueColumn > bin.head && valueColumn <= bin.last, 1).otherwise(0))
})
}
)
.withColumn(s">${thresholds.last}", when(valueColumn > thresholds.last, 1).otherwise(0))
.groupBy(aggregateColumns: _*)
.agg(
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/uk/co/gresearch/spark/RowNumbers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ case class RowNumbersFunc(rowNumberColumnName: String = "row_number",
this.copy(orderColumns = orderColumns)

def of[D](df: Dataset[D]): DataFrame = {
if (storageLevel.equals(StorageLevel.NONE) && (SparkMajorVersion > 3 || SparkMajorVersion == 3 && SparkMinorVersion >= 5)) {
if (
storageLevel.equals(StorageLevel.NONE) &&
(SparkMajorVersion > 3 || SparkMajorVersion == 3 && SparkMinorVersion >= 5)
) {
throw new IllegalArgumentException(s"Storage level $storageLevel not supported with Spark 3.5.0 and above.")
}

Expand Down
13 changes: 9 additions & 4 deletions src/main/scala/uk/co/gresearch/spark/diff/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ object App {

// read options from args
val programName = s"spark-extension_${spark.BuildScalaCompatVersionString}-${spark.VersionString}.jar"
val sparkSubmit = s"spark-submit --packages com.github.scopt:scopt_${spark.BuildScalaCompatVersionString}:4.1.0 $programName"
val parser: OptionParser[Options] = new scopt.OptionParser[Options](programName) {
head(s"Spark Diff app (${spark.VersionString})")
head()
Expand All @@ -77,10 +78,11 @@ object App {
note("Examples:")
note("")
note(" - Diff CSV files 'left.csv' and 'right.csv' and write result into CSV file 'diff.csv':")
note(s" spark-submit --packages com.github.scopt:scopt_${spark.BuildScalaCompatVersionString}:4.1.0 $programName --format csv left.csv right.csv diff.csv")
note(s" $sparkSubmit --format csv left.csv right.csv diff.csv")
note("")
note(" - Diff CSV file 'left.csv' with Parquet file 'right.parquet' with id column 'id', and write result into Hive table 'diff':")
note(s" spark-submit --packages com.github.scopt:scopt_${spark.BuildScalaCompatVersionString}:4.1.0 $programName --left-format csv --right-format parquet --hive --id id left.csv right.parquet diff")
note(" - Diff CSV file 'left.csv' and Parquet file 'right.parquet' with id column 'id',")
note(" and write result into Hive table 'diff':")
note(s" $sparkSubmit --left-format csv --right-format parquet --hive --id id left.csv right.parquet diff")

note("")
note("Spark session")
Expand Down Expand Up @@ -179,7 +181,10 @@ object App {
.optional()
.valueName("<filter>")
.action((x, c) => c.copy(filter = c.filter + x))
.text(s"Filters for rows with these diff actions, with default diffing options use 'N', 'I', 'D', or 'C' (see 'Diffing options' section)")
.text(
s"Filters for rows with these diff actions, with default diffing options" +
s" use 'N', 'I', 'D', or 'C' (see 'Diffing options' section)"
)
opt[Unit]("statistics")
.optional()
.action((_, c) => c.copy(statistics = true))
Expand Down
16 changes: 16 additions & 0 deletions src/test/scala/uk/co/gresearch/spark/GroupBySortedSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {

import spark.implicits._

// format: off
val ds: Dataset[Val] = Seq(
Val(1, 1, 1.1),
Val(1, 2, 1.2),
Expand All @@ -49,6 +50,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {

Val(3, 1, 3.1),
).reverse.toDS().repartition(3).cache()
// format: on

describe("ds.groupBySorted") {
testGroupByIdSortBySeq(ds.groupBySorted($"id")($"seq", $"value"))
Expand Down Expand Up @@ -93,6 +95,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {
.collect()
.sortBy(v => (v._1, v._2))

// format: off
val expected = Seq(
// (key, group index, value)
(1, 0, (1, 1, 1.1)),
Expand All @@ -106,6 +109,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {

(3, 0, (3, 1, 3.1)),
)
// format: on

assert(actual === expected)
}
Expand All @@ -116,6 +120,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {
.collect()
.sortBy(v => (v._1._1, v._1._2))

// format: off
val expected = Seq(
// (value, state)
((1, 1, 1.1), 1 + 1),
Expand All @@ -129,6 +134,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {

((3, 1, 3.1), 3 + 1),
)
// format: on

assert(actual === expected)
}
Expand All @@ -143,6 +149,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {
.collect()
.sortBy(v => (v._1, v._2))

// format: off
val expected = Seq(
// (key, group index, value)
(1, 0, (1, 3, 1.31)),
Expand All @@ -156,6 +163,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {

(3, 0, (3, 1, 3.1)),
)
// format: on

assert(actual === expected)
}
Expand All @@ -172,6 +180,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {
.collect()
.sortBy(v => (v._1, v._2))

// format: off
val expected = Seq(
// (key, group index, value)
(1, 0, (1, 1, 1.1)),
Expand All @@ -185,6 +194,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {

(3, 0, (3, 1, 3.1)),
)
// format: on

val partitionSizes = grouped.mapPartitions(it => Iterator.single(it.length)).collect()
assert(partitionSizes.length === partitions)
Expand All @@ -203,6 +213,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {
.collect()
.sortBy(v => (v._1, v._2))

// format: off
val expected = Seq(
// (key, group index, value)
(1, 0, (1, 3, 1.31)),
Expand All @@ -216,6 +227,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {

(3, 0, (3, 1, 3.1)),
)
// format: on

val partitionSizes = grouped.mapPartitions(it => Iterator.single(it.length)).collect()
assert(partitionSizes.length === partitions)
Expand All @@ -233,6 +245,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {
.collect()
.sortBy(v => (v._1, v._2))

// format: off
val expected = Seq(
// (key, group index, value)
((1, 1), 0, (1, 1, 1.1)),
Expand All @@ -250,6 +263,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {

((3, 1), 0, (3, 1, 3.1)),
)
// format: on

assert(actual === expected)
}
Expand All @@ -260,6 +274,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {
.collect()
.sortBy(v => (v._1._1, v._1._2))

// format: off
val expected = Seq(
// (value, state)
((1, 1, 1.1), 1 + 1),
Expand All @@ -273,6 +288,7 @@ class GroupBySortedSuite extends AnyFunSpec with SparkTestSession {

((3, 1, 3.1), 3 + 1),
)
// format: on

assert(actual === expected)
}
Expand Down
4 changes: 4 additions & 0 deletions src/test/scala/uk/co/gresearch/spark/HistogramSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,12 @@ class HistogramSuite extends AnyFunSuite with SparkTestSession {
ints.histogram(Seq(0, -200, 100, -100, 200), $"does-not-exist", $"id")
}
assert(
// format: off
exception.getMessage.startsWith("cannot resolve '`does-not-exist`' given input columns: [id, title, value]") ||
exception.getMessage.startsWith("Column '`does-not-exist`' does not exist. Did you mean one of the following? [title, id, value]") ||
exception.getMessage.startsWith("[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `does-not-exist` cannot be resolved. Did you mean one of the following? [`title`, `id`, `value`]") ||
exception.getMessage.startsWith("[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `does-not-exist` cannot be resolved. Did you mean one of the following? [`title`, `id`, `value`]")
// format: on
)
}

Expand All @@ -194,10 +196,12 @@ class HistogramSuite extends AnyFunSuite with SparkTestSession {
ints.histogram(intThresholds, $"value", $"does-not-exist")
}
assert(
// format: off
exception.getMessage.startsWith("cannot resolve '`does-not-exist`' given input columns: [") ||
exception.getMessage.startsWith("Column '`does-not-exist`' does not exist. Did you mean one of the following? [") ||
exception.getMessage.startsWith("[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `does-not-exist` cannot be resolved. Did you mean one of the following? [") ||
exception.getMessage.startsWith("[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `does-not-exist` cannot be resolved. Did you mean one of the following? [")
// format: on
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class ParquetSuite extends AnyFunSuite with SparkTestSession with SparkVersion {
StructField("values", LongType, nullable = false),
StructField("nulls", LongType, nullable = false),
)),
// format: off
Seq(
Row("file1.parquet", 1, "[id]", "SNAPPY", "required int64 id", "[BIT_PACKED, PLAIN]", "0", "99", 4, 437, 826, 100, 0),
Row("file1.parquet", 1, "[val]", "SNAPPY", "required double val", "[BIT_PACKED, PLAIN]", "0.005067503372006343", "0.9973357672164814", 441, 831, 826, 100, 0),
Expand All @@ -148,6 +149,7 @@ class ParquetSuite extends AnyFunSuite with SparkTestSession with SparkVersion {
Row("file2.parquet", 2, "[id]", "SNAPPY", "required int64 id", "[BIT_PACKED, PLAIN]", "200", "299", 1273, 440, 826, 100, 0),
Row("file2.parquet", 2, "[val]", "SNAPPY", "required double val", "[BIT_PACKED, PLAIN]", "0.011277044401634018", "0.970525681750662", 1713, 830, 825, 100, 0),
),
// format: on
parallelism,
(df: DataFrame) => df
.withColumn("column", $"column".cast(StringType))
Expand Down

0 comments on commit 0b0670a

Please sign in to comment.