Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for successful/failed Spark index creation #837

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,21 @@ public final class MetricConstants {
*/
public static final String OUTPUT_TOTAL_RECORDS_WRITTEN = "output.totalRecordsWritten.count";

/**
* Metric group related to skipping indices, such as create success and failure
*/
public static final String CREATE_SKIPPING_INDICES = "query.execution.index.skipping";

/**
* Metric group related to covering indices, such as create success and failure
*/
public static final String CREATE_COVERING_INDICES = "query.execution.index.covering";

/**
* Metric group related to materialized view indices, such as create success and failure
*/
public static final String CREATE_MV_INDICES = "query.execution.index.mv";

/**
* Metric for tracking the latency of checkpoint deletion
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.JavaConverters.mapAsScalaMapConverter

import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsUtil
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
Expand Down Expand Up @@ -45,9 +47,12 @@ object FlintSparkIndexFactory extends Logging {
*/
def create(spark: SparkSession, metadata: FlintMetadata): Option[FlintSparkIndex] = {
try {
Some(doCreate(spark, metadata))
val result = doCreate(spark, metadata)
emitIndexCreationStatusMetric(metadata, success = true)
Some(result)
} catch {
case e: Exception =>
emitIndexCreationStatusMetric(metadata, success = false)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you double check if this is the right place? I recall this factory is actually deserializer from Flint metadata to index class. I guess you may want to intercept API call in FlintSpark? Also for all metrics related logic, please consider AOP style rather than insert this logic in many places.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the metric collection to the new location. I experimented with trying something AOP-like but it was cumbersome -- linking the existing logic in spark/refresh introduces a circular dependency, and the callable implicitly raising Exception makes error handling awkward if trying to make a general-purpose duplicate of what exists on the Scala side.

logWarning(s"Failed to create Flint index from metadata $metadata", e)
None
}
Expand Down Expand Up @@ -162,6 +167,24 @@ object FlintSparkIndexFactory extends Logging {
}
}

private def emitIndexCreationStatusMetric(metadata: FlintMetadata, success: Boolean): Unit = {
val successSuffix = if (success) ".create_success" else ".create_failed"
metadata.kind match {
case SKIPPING_INDEX_TYPE =>
MetricsUtil.addHistoricGauge(
MetricConstants.CREATE_SKIPPING_INDICES + successSuffix + ".count",
1)
case COVERING_INDEX_TYPE =>
MetricsUtil.addHistoricGauge(
MetricConstants.CREATE_COVERING_INDICES + successSuffix + ".count",
1)
case MV_INDEX_TYPE =>
MetricsUtil.addHistoricGauge(
MetricConstants.CREATE_MV_INDICES + successSuffix + ".count",
1)
}
}

private def getArrayString(map: java.util.Map[String, AnyRef], key: String): Array[String] = {
map.get(key) match {
case list: java.util.ArrayList[_] =>
Expand Down
Loading