Skip to content

Commit

Permalink
Validate options before starting SparkSession (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
gquintana authored Jan 16, 2022
1 parent a1b9092 commit e147862
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 62 deletions.
6 changes: 2 additions & 4 deletions src/main/scala/com/coxautodata/OptionsParsing.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ object OptionsParsing {

/** Parse a set of command-line arguments into a [[Config]] object
*/
def parse(args: Array[String], hadoopConfiguration: Configuration): Config = {
def parse(args: Array[String]): Config = {

val parser = new scopt.OptionParser[Config]("") {
opt[Unit]("i")
Expand Down Expand Up @@ -37,9 +37,7 @@ object OptionsParsing {
.text("Overwrite if source and destination differ in size, or checksum")

opt[String]("filters")
.action((f, c) =>
c.copyOptions(_.withFiltersFromFile(new URI(f), hadoopConfiguration))
)
.action((f, c) => c.copyOptions(_.copy(filters = Some(new URI(f)))))
.text(
"The path to a file containing a list of pattern strings, one string per line, such that paths matching the pattern will be excluded from the copy."
)
Expand Down
8 changes: 3 additions & 5 deletions src/main/scala/com/coxautodata/SparkDistCP.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,11 @@ object SparkDistCP extends Logging {
*/
def main(args: Array[String]): Unit = {

val config = OptionsParsing.parse(args)
val sparkSession = SparkSession.builder().getOrCreate()

val config =
OptionsParsing.parse(args, sparkSession.sparkContext.hadoopConfiguration)

val options = config.options.withFiltersFromFile(sparkSession.sparkContext.hadoopConfiguration)
val (src, dest) = config.sourceAndDestPaths
run(sparkSession, src, dest, config.options)
run(sparkSession, src, dest, options)

}

Expand Down
26 changes: 17 additions & 9 deletions src/main/scala/com/coxautodata/SparkDistCPOptions.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.coxautodata

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import java.io.IOException
import java.net.URI
import scala.util.matching.Regex

/** Options for the DistCP application See [[OptionsParsing.parse]] for the
Expand All @@ -21,6 +21,7 @@ case class SparkDistCPOptions(
SparkDistCPOptions.Defaults.consistentPathBehaviour,
maxFilesPerTask: Int = SparkDistCPOptions.Defaults.maxFilesPerTask,
maxBytesPerTask: Long = SparkDistCPOptions.Defaults.maxBytesPerTask,
filters: Option[URI] = SparkDistCPOptions.Defaults.filters,
filterNot: List[Regex] = SparkDistCPOptions.Defaults.filterNot,
numListstatusThreads: Int = SparkDistCPOptions.Defaults.numListstatusThreads,
verbose: Boolean = SparkDistCPOptions.Defaults.verbose
Expand Down Expand Up @@ -48,20 +49,26 @@ case class SparkDistCPOptions(
}

def withFiltersFromFile(
uri: URI,
hadoopConfiguration: Configuration
): SparkDistCPOptions = {

val path = new Path(uri)
val fs = path.getFileSystem(hadoopConfiguration)
val fn = filters.map(f => {
try {
val path = new Path(f)
val fs = path.getFileSystem(hadoopConfiguration)

val in = fs.open(path)
val in = fs.open(path)

val r = scala.io.Source.fromInputStream(in).getLines().map(_.r).toList
val r = scala.io.Source.fromInputStream(in).getLines().map(_.r).toList

in.close()
in.close()
r
} catch {
case e:IOException => throw new RuntimeException("Invalid filter file "+f, e)
}
}).getOrElse(List.empty)

this.copy(filterNot = r)
this.copy(filterNot = fn)

}

Expand All @@ -79,6 +86,7 @@ object SparkDistCPOptions {
val consistentPathBehaviour: Boolean = false
val maxFilesPerTask: Int = 1000
val maxBytesPerTask: Long = 1073741824L
val filters: Option[URI] = None
val filterNot: List[Regex] = List.empty
val numListstatusThreads: Int = 10
val verbose: Boolean = false
Expand Down
75 changes: 31 additions & 44 deletions src/test/scala/com/coxautodata/TestOptionsParsing.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {

it("default options one source") {

val conf = OptionsParsing.parse(Array("src", "dest"), new Configuration())
val conf = OptionsParsing.parse(Array("src", "dest"))
conf.sourceAndDestPaths should be(Seq(new Path("src")), new Path("dest"))
conf.options should be(SparkDistCPOptions())

Expand All @@ -22,29 +22,30 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {
it("default options two sources") {

val conf =
OptionsParsing.parse(Array("src1", "src2", "dest"), new Configuration())
OptionsParsing.parse(Array("src1", "src2", "dest"))
conf.sourceAndDestPaths should be(
Seq(new Path("src1"), new Path("src2")),
new Path("dest")
)
conf.options should be(SparkDistCPOptions())
val options = conf.options.withFiltersFromFile(new Configuration())
options should be(SparkDistCPOptions())

}

it("ignore failures flag") {

val conf =
OptionsParsing.parse(Array("--i", "src", "dest"), new Configuration())
OptionsParsing.parse(Array("--i", "src", "dest"))
conf.sourceAndDestPaths should be(Seq(new Path("src")), new Path("dest"))
conf.options should be(SparkDistCPOptions(ignoreErrors = true))
val options = conf.options.withFiltersFromFile(new Configuration())
options should be(SparkDistCPOptions(ignoreErrors = true))

}

it("log option") {

val conf = OptionsParsing.parse(
Array("--log", "log", "src", "dest"),
new Configuration()
Array("--log", "log", "src", "dest")
)
conf.sourceAndDestPaths should be(Seq(new Path("src")), new Path("dest"))
conf.options should be(SparkDistCPOptions(log = Some(new URI("log"))))
Expand All @@ -54,8 +55,7 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {
it("dry-run flag") {

val conf = OptionsParsing.parse(
Array("--dryrun", "src", "dest"),
new Configuration()
Array("--dryrun", "src", "dest")
)
conf.sourceAndDestPaths should be(Seq(new Path("src")), new Path("dest"))
conf.options should be(SparkDistCPOptions(dryRun = true))
Expand All @@ -65,8 +65,7 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {
it("verbose flag") {

val conf = OptionsParsing.parse(
Array("--verbose", "src", "dest"),
new Configuration()
Array("--verbose", "src", "dest")
)
conf.sourceAndDestPaths should be(Seq(new Path("src")), new Path("dest"))
conf.options should be(SparkDistCPOptions(verbose = true))
Expand All @@ -76,8 +75,7 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {
it("overwrite flag") {

val conf = OptionsParsing.parse(
Array("--overwrite", "src", "dest"),
new Configuration()
Array("--overwrite", "src", "dest")
)
conf.sourceAndDestPaths should be(Seq(new Path("src")), new Path("dest"))
conf.options should be(SparkDistCPOptions(overwrite = true))
Expand All @@ -87,8 +85,7 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {
it("update flag") {

val conf = OptionsParsing.parse(
Array("--update", "src", "dest"),
new Configuration()
Array("--update", "src", "dest")
)
conf.sourceAndDestPaths should be(Seq(new Path("src")), new Path("dest"))
conf.options should be(SparkDistCPOptions(update = true))
Expand All @@ -99,27 +96,27 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {

val filtersFile = this.getClass.getResource("test.filters").getPath
val conf = OptionsParsing.parse(
Array("--filters", filtersFile, "src", "dest"),
new Configuration()
Array("--filters", filtersFile, "src", "dest")
)
conf.sourceAndDestPaths should be(Seq(new Path("src")), new Path("dest"))
conf.options.copy(filterNot = List.empty) should be(SparkDistCPOptions())
conf.options.filterNot.map(_.toString()) should be(
val options = conf.options.withFiltersFromFile(new Configuration())
options.filterNot.map(_.toString()) should be(
List(
".*/_temporary($|/.*)",
".*/_committed.*",
".*/_started.*",
".*/_SUCCESS.*"
)
)
val resetOptions = options.copy(filters = None).withFiltersFromFile(new Configuration())
resetOptions should be(SparkDistCPOptions())

}

it("delete flag") {

val conf = OptionsParsing.parse(
Array("--delete", "--update", "src", "dest"),
new Configuration()
Array("--delete", "--update", "src", "dest")
)
conf.sourceAndDestPaths should be(Seq(new Path("src")), new Path("dest"))
conf.options should be(SparkDistCPOptions(delete = true, update = true))
Expand All @@ -129,8 +126,7 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {
it("numListstatusThreads option") {

val conf = OptionsParsing.parse(
Array("--numListstatusThreads", "3", "src", "dest"),
new Configuration()
Array("--numListstatusThreads", "3", "src", "dest")
)
conf.sourceAndDestPaths should be(Seq(new Path("src")), new Path("dest"))
conf.options should be(SparkDistCPOptions(numListstatusThreads = 3))
Expand All @@ -140,8 +136,7 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {
it("consistentPathBehaviour option") {

val conf = OptionsParsing.parse(
Array("--consistentPathBehaviour", "src", "dest"),
new Configuration()
Array("--consistentPathBehaviour", "src", "dest")
)
conf.sourceAndDestPaths should be(Seq(new Path("src")), new Path("dest"))
conf.options should be(SparkDistCPOptions(consistentPathBehaviour = true))
Expand All @@ -151,8 +146,7 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {
it("maxFilesPerTask option") {

val conf = OptionsParsing.parse(
Array("--maxFilesPerTask", "3", "src", "dest"),
new Configuration()
Array("--maxFilesPerTask", "3", "src", "dest")
)
conf.sourceAndDestPaths should be(Seq(new Path("src")), new Path("dest"))
conf.options should be(SparkDistCPOptions(maxFilesPerTask = 3))
Expand All @@ -162,8 +156,7 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {
it("maxBytesPerTask option") {

val conf = OptionsParsing.parse(
Array("--maxBytesPerTask", "30000000", "src", "dest"),
new Configuration()
Array("--maxBytesPerTask", "30000000", "src", "dest")
)
conf.sourceAndDestPaths should be(Seq(new Path("src")), new Path("dest"))
conf.options should be(SparkDistCPOptions(maxBytesPerTask = 30000000))
Expand All @@ -177,7 +170,7 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {
it("single path") {

intercept[IllegalArgumentException] {
OptionsParsing.parse(Array("path"), new Configuration())
OptionsParsing.parse(Array("path"))
}.getMessage should be(
"requirement failed: you must supply two or more paths, representing the source paths and a destination"
)
Expand All @@ -188,19 +181,17 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {

intercept[RuntimeException] {
OptionsParsing.parse(
Array("--filters", "none", "src", "dest"),
new Configuration()
)
}.getMessage should be("Failed to parse arguments")
Array("--filters", "none", "src", "dest")
).options.withFiltersFromFile(new Configuration())
}.getMessage should be("Invalid filter file none")

}

it("negative max files") {

intercept[java.lang.AssertionError] {
OptionsParsing.parse(
Array("--maxFilesPerTask", "-2", "src", "dest"),
new Configuration()
Array("--maxFilesPerTask", "-2", "src", "dest")
)
}.getMessage should be(
"assertion failed: maxFilesPerTask must be positive"
Expand All @@ -212,8 +203,7 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {

intercept[java.lang.AssertionError] {
OptionsParsing.parse(
Array("--maxBytesPerTask", "-2", "src", "dest"),
new Configuration()
Array("--maxBytesPerTask", "-2", "src", "dest")
)
}.getMessage should be(
"assertion failed: maxBytesPerTask must be positive"
Expand All @@ -225,8 +215,7 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {

intercept[java.lang.AssertionError] {
OptionsParsing.parse(
Array("--numListstatusThreads", "-2", "src", "dest"),
new Configuration()
Array("--numListstatusThreads", "-2", "src", "dest")
)
}.getMessage should be(
"assertion failed: numListstatusThreads must be positive"
Expand All @@ -238,8 +227,7 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {

intercept[java.lang.AssertionError] {
OptionsParsing.parse(
Array("--update", "--overwrite", "src", "dest"),
new Configuration()
Array("--update", "--overwrite", "src", "dest")
)
}.getMessage should be(
"assertion failed: Both update and overwrite cannot be specified"
Expand All @@ -251,8 +239,7 @@ class TestOptionsParsing extends AnyFunSpec with Matchers {

intercept[java.lang.AssertionError] {
OptionsParsing.parse(
Array("--delete", "src", "dest"),
new Configuration()
Array("--delete", "src", "dest")
)
}.getMessage should be(
"assertion failed: Delete must be specified with either overwrite or update"
Expand Down

0 comments on commit e147862

Please sign in to comment.