Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics scope #74

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/main/java/com/teragrep/pth_06/DummyTaskMetricAggregator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.teragrep.pth_06;

import org.apache.spark.sql.connector.metric.CustomMetric;

public class DummyTaskMetricAggregator implements CustomMetric {

@Override
public String name() {
// I guess names need to match with CustomTaskMetrics and here
// guess based on lines 1150 and 1158 in SQLAppStatusListenerSuite.scala
return "DummyTaskMetric";
}

@Override
public String description() {
return "DummyTaskMetric yadda yadda DummyTaskMetric";
}

@Override
public String aggregateTaskMetrics(final long[] longs) {
long count = 0;
for (long l : longs) {
if (l != Long.MIN_VALUE) {
throw new IllegalArgumentException("long not expected, DummyTaskMetric is supposed to return only Long.MIN_VALUES");
}
count++;
}
return "Many Long.MIN_VALUES were present, can't summarize or so but their total count was: " + count;
}

}
14 changes: 3 additions & 11 deletions src/main/java/com/teragrep/pth_06/TeragrepDatasource.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,9 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S

@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return () -> new Scan() {

@Override
public StructType readSchema() {
return schema;
}

@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
return new ArchiveMicroStreamReader(new Config(options));
}
return () -> {
Config config = new Config(options);
return new TeragrepScan(schema, config);
};
}

Expand Down
45 changes: 45 additions & 0 deletions src/main/java/com/teragrep/pth_06/TeragrepScan.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.teragrep.pth_06;

import com.teragrep.pth_06.config.Config;
import org.apache.spark.sql.connector.metric.CustomMetric;
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;

public class TeragrepScan implements Scan {
private final StructType schema;
private final Config config;

TeragrepScan(StructType schema, Config config) {
this.schema = schema;
this.config = config;
}

@Override
public StructType readSchema() {
return schema;
}

@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
return new ArchiveMicroStreamReader(config);
}

@Override
public CustomMetric[] supportedCustomMetrics() {
// see examples at sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala

CustomMetric[] customMetrics = new CustomMetric[1];
customMetrics[0] = new DummyTaskMetricAggregator();
return customMetrics;
}

@Override
public CustomTaskMetric[] reportDriverMetrics() {
// there needs to be an Aggregator for this too, registered in supportedCustomMetrics
// these are driver specific metrics
return new CustomTaskMetric[]{};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.teragrep.rad_01.AuditPlugin;
import com.teragrep.rad_01.AuditPluginFactory;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
import org.apache.spark.sql.connector.read.PartitionReader;

import java.io.IOException;
Expand Down Expand Up @@ -184,6 +185,13 @@ public InternalRow get() {
return rowConverter.get();
}

@Override
public CustomTaskMetric[] currentMetricsValues() {
CustomTaskMetric[] customTaskMetrics = new CustomTaskMetric[1];
customTaskMetrics[0] = new DummyTaskMetric();
return customTaskMetrics;
}

@Override
public void close() throws IOException {
if (LOGGER.isDebugEnabled())
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/com/teragrep/pth_06/task/DummyTaskMetric.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.teragrep.pth_06.task;

import org.apache.spark.sql.connector.metric.CustomTaskMetric;

public class DummyTaskMetric implements CustomTaskMetric {

@Override
public String name() {
return "DummyTaskMetric";
}

@Override
public long value() {
return Long.MIN_VALUE;
}

}