From d223c46d474447770435ab798db7c77c52a3bd19 Mon Sep 17 00:00:00 2001 From: Leah McGuire Date: Tue, 10 Sep 2019 21:42:12 -0700 Subject: [PATCH] Ensure correct metrics despite model failures on some CV folds (#404) * allow for model to fail on some CV folds and still get the metrics calculations correct * code cleanup --- .../impl/tuning/OpCrossValidation.scala | 37 +++++++++++++------ .../RegressionModelSelectorTest.scala | 29 +++++++++++++-- .../impl/selector/ModelSelectorTest.scala | 10 ++--- 3 files changed, 57 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala index 556e0a12e4..35eb676724 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala @@ -25,7 +25,6 @@ import com.salesforce.op.evaluators.{EvaluationMetrics, OpEvaluatorBase} import com.salesforce.op.stages.OPStage import com.salesforce.op.stages.impl.selector.ModelSelectorNames import com.salesforce.op.utils.stages.FitStagesUtil._ -import com.twitter.algebird.Monoid._ import com.twitter.algebird.Operators._ import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.{Estimator, Model} @@ -33,6 +32,8 @@ import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.util.SparkThreadUtils +import com.twitter.algebird._ +import com.twitter.algebird.Operators._ import scala.concurrent.duration.Duration import scala.concurrent.{ExecutionContext, Future} @@ -49,24 +50,38 @@ private[op] class OpCrossValidation[M <: Model[_], E <: Estimator[_]] ) extends OpValidator[M, E] { val validationName: String = ModelSelectorNames.CrossValResults - private val blas = BLAS.getInstance() override def getParams(): Map[String, Any] = Map("numFolds" -> numFolds, "seed" -> seed, "evaluator" -> evaluator.name.humanFriendlyName, "stratify" -> stratify, "parallelism" -> parallelism) + private implicit val doubleSemigroup = Semigroup.from[Double](_ + _) + private implicit val mapDoubleMonoid = Monoid.mapMonoid[String, Double](doubleSemigroup) + + /** + * Should be called only on instances of the same model + */ private def findBestModel( folds: Seq[ValidatedModel[E]] ): ValidatedModel[E] = { - val metrics = folds.map(_.metrics).reduce(_ + _) - blas.dscal(metrics.length, 1.0 / numFolds, metrics, 1) - val ValidatedModel(est, _, _, grid) = folds.head - log.info(s"Average cross-validation for $est metrics: {}", metrics.toSeq.mkString(",")) - val (bestMetric, bestIndex) = - if (evaluator.isLargerBetter) metrics.zipWithIndex.maxBy(_._1) - else metrics.zipWithIndex.minBy(_._1) - log.info(s"Best set of parameters:\n${grid(bestIndex)}") + + val gridCounts = folds.flatMap(_.grids.map(_ -> 1)).sumByKey + val (_, maxFolds) = gridCounts.maxBy{ case (_, count) => count } + val gridsIn = gridCounts.filter{ case (_, foldCount) => foldCount == maxFolds }.keySet + + val gridMetrics = folds.flatMap{ + f => f.grids.zip(f.metrics).collect { case (pm, met) if gridsIn.contains(pm) => (pm, met / maxFolds) } + }.sumByKey + + val ((bestGrid, bestMetric), bestIndex) = + if (evaluator.isLargerBetter) gridMetrics.zipWithIndex.maxBy{ case ((_, metric), _) => metric} + else gridMetrics.zipWithIndex.minBy{ case ((_, metric), _) => metric} + + val ValidatedModel(est, _, _, _) = folds.head + log.info(s"Average cross-validation for $est metrics: {}", gridMetrics.mkString(",")) + log.info(s"Best set of parameters:\n$bestGrid") log.info(s"Best cross-validation metric: $bestMetric.") - ValidatedModel(est, bestIndex, metrics, grid) + val (grid, metrics) = gridMetrics.unzip + ValidatedModel(est, bestIndex, metrics.toArray, grid.toArray) } private[op] override def validate[T]( diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala index a9275baf0b..5833dd55c7 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala @@ -219,7 +219,7 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext justScores.length shouldEqual transformedData.count() } - it should "fit and predict for even when some models fail" in { + it should "fit and predict even when some models fail" in { val testEstimator = RegressionModelSelector .withCrossValidation( numFolds = 4, @@ -240,8 +240,31 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext assert(metaData.trainEvaluation.toJson(false).contains(s"${metric.entryName}"), s"Metric ${metric.entryName} is not present in metadata: " + metaData) ) - metaData.validationResults.foreach(println(_)) - metaData.validationResults.size shouldBe 42 + metaData.validationResults.size shouldBe 40 + } + + + it should "fit and predict even when some parameter settings fail for one of the models" in { + val testEstimator = RegressionModelSelector + .withCrossValidation( + numFolds = 4, + validationMetric = Evaluators.Regression.mse(), + seed = 10L, + modelTypesToUse = Seq(RMT.OpGeneralizedLinearRegression) + ) + .setInput(label, features) + + + val model = testEstimator.fit(data) + model.evaluateModel(data) + + // evaluation metrics from train set should be in metadata + val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()) + RegressionEvalMetrics.values.foreach(metric => + assert(metaData.trainEvaluation.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData) + ) + metaData.validationResults.size shouldBe 32 } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala index 987569fa81..0417e3536b 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala @@ -82,23 +82,23 @@ class ModelSelectorTest extends OpEstimatorSpec[Prediction, SelectedModel, Model private val lr = new OpLogisticRegression() private val lrParams = new ParamGridBuilder() - .addGrid(lr.regParam, Array(0.1, 100)) - .addGrid(lr.elasticNetParam, Array(0, 0.5)).build() + .addGrid(lr.regParam, Array(0.1, 10000)) + .addGrid(lr.elasticNetParam, Array(0.5)).build() private val rf = new OpRandomForestClassifier() private val rfParams = new ParamGridBuilder() .addGrid(rf.numTrees, Array(2, 4)) - .addGrid(rf.minInfoGain, Array(100.0, 10.0)).build() + .addGrid(rf.minInfoGain, Array(1000.0, 100.0)).build() private val linR = new OpLinearRegression() private val linRParams = new ParamGridBuilder() - .addGrid(linR.regParam, Array(0.1, 100)) + .addGrid(linR.regParam, Array(0.1, 1000)) .addGrid(linR.maxIter, Array(10, 20)).build() private val rfR = new OpRandomForestRegressor() private val rfRParams = new ParamGridBuilder() .addGrid(rfR.numTrees, Array(2, 4)) - .addGrid(rfR.minInfoGain, Array(100.0, 10.0)).build() + .addGrid(rfR.minInfoGain, Array(1000.0, 100.0)).build() val (inputData, rawFeature1, feature2) = TestFeatureBuilder("label", "features", Seq[(RealNN, OPVector)](