diff --git a/pom.xml b/pom.xml
index cdebb98..757770a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,7 +11,7 @@
proto
tracedsl
spark
- testcontainers
+
@@ -29,7 +29,7 @@
2.12
2.3.0
0.7.0
- 1.13.0
+
4.11
4.0.2
3.1.0
@@ -53,11 +53,12 @@
jaeger-spark
${project.version}
+
io.jaegertracing
diff --git a/spark/pom.xml b/spark/pom.xml
index 399dd37..5109417 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -38,6 +38,11 @@
+
+ org.apache.logging.log4j
+ log4j-api
+ 2.14.1
+
org.apache.spark
spark-streaming_${version.scala}
@@ -79,7 +84,7 @@
- true
+
diff --git a/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java b/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java
index ab3debe..20a6bab 100644
--- a/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java
+++ b/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java
@@ -40,12 +40,17 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import java.util.zip.GZIPInputStream;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
+import java.util.Map;
+import java.util.HashMap;
+
/**
* @author Pavol Loffay
*/
public class SparkKinesisRunner {
-
+private static final Logger logger = Logger.getLogger(SparkKinesisRunner.class.getName());
/**
* System Properties that can be passed
* SPARK_MASTER
@@ -58,11 +63,20 @@ public class SparkKinesisRunner {
* PROMETHEUS_HOST
* PROMETHEUS_PORT
* All the above have default values
+
* @param args
* @throws InterruptedException
* @throws IOException
*/
+ static final String SPARK_MEMORY = "4073741824";
+ static final String SPARK_DEFAULT_PARALLELISM = "30";
+ static final String SPARK_EXECUTOR_CORES = "1";
+ static final String SPARK_DRIVER_CORES = "1";
+ static final String SPARK_EXECUTOR_INSTANCES = "15";
+ static final String SPARK_SERIALIZER = "org.apache.spark.serializer.KryoSerializer";
+
public static void main(String[] args) throws InterruptedException, IOException {
+
HTTPServer server = new HTTPServer(getPropOrEnv("PROMETHEUS_HOST", "localhost"), Integer.parseInt(getPropOrEnv("PROMETHEUS_PORT", "9111")));
JsonUtil.init(new ObjectMapper());
@@ -70,7 +84,20 @@ public static void main(String[] args) throws InterruptedException, IOException
SparkConf sparkConf = new SparkConf()
.setAppName("Trace DSL")
.setMaster(getPropOrEnv("SPARK_MASTER", "local[*]"))
- .set("spark.testing.memory", getPropOrEnv("SPARK_MEMORY", "471859200"));
+ .set("spark.testing.memory", getPropOrEnv("SPARK_MEMORY", SPARK_MEMORY))
+ .set("spark.worker.cleanup.enabled", "true")
+ .set("spark.default.parallelism", getPropOrEnv("SPARK_DEFAULT_PARALLELISM", SPARK_DEFAULT_PARALLELISM))
+ .set("spark.executor.cores", getPropOrEnv("SPARK_EXECUTOR_CORES",SPARK_EXECUTOR_CORES ))
+ .set("spark.driver.cores", getPropOrEnv("SPARK_DRIVER_CORES", SPARK_DRIVER_CORES))
+ .set("spark.executor.instances", getPropOrEnv("SPARK_EXECUTOR_INSTANCES", SPARK_EXECUTOR_INSTANCES))
+ .set("log4j.configuration", getPropOrEnv("log4j.configuration", "file:'/data/log4j.properties'"))
+ .set("spark.serializer", getPropOrEnv("SPARK_SERIALIZER", SPARK_SERIALIZER));
+
+/* other explored values of settings
+ .set("spark.cores.max", getPropOrEnv("SPARK_CORES_MAX", "16"))
+ .set("spark.default.parallelism", getPropOrEnv("SPARK_DEFAULT_PARALLELISM", "16"))
+ .set("spark.executor.cores", getPropOrEnv("SPARK_EXECUTOR_CORES", "12"))
+*/
JavaSparkContext sc = new JavaSparkContext(sparkConf);
long batchInterval = Integer.parseInt(getPropOrEnv("SPARK_STREAMING_BATCH_DURATION", "10000"));
@@ -84,28 +111,32 @@ public static void main(String[] args) throws InterruptedException, IOException
String region = Regions.getCurrentRegion()!=null ? Regions.getCurrentRegion().getName()
: getPropOrEnv("AWS_REGION", Regions.US_EAST_1.getName());
-
+
+ String service_endpoint = getPropOrEnv("KINESIS_ENDPOINT","NULL");
+
InitialPositionInStream initialPosition;
try {
initialPosition = InitialPositionInStream
- .valueOf(getPropOrEnv("KINESIS_STREAM_POSITION", "TRIM_HORIZON"));
+ .valueOf(getPropOrEnv("KINESIS_STREAM_POSITION", "LATEST"));
} catch (IllegalArgumentException e) {
- initialPosition = InitialPositionInStream.valueOf("TRIM_HORIZON");
+ initialPosition = InitialPositionInStream.valueOf("LATEST");
}
-
+
+ String applicationName = (getPropOrEnv("SPARK_STREAMING_BATCH_DURATION", "10000"));
KinesisInputDStream kinesisStream = KinesisInputDStream.builder()
.streamingContext(ssc)
+ .endpointUrl(service_endpoint)
.regionName(region)
.streamName(getPropOrEnv("KINESIS_STREAM", "common_haystack_traces"))
.initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(initialPosition))
- .checkpointAppName("trace-analytics")
+ .checkpointAppName(getPropOrEnv("DYNAMO_TABLE", "trace-analytics"))
.checkpointInterval(Duration.apply(60 * 1000))
.build();
JavaDStream dStream = JavaDStream.fromDStream(kinesisStream, ClassTag$.MODULE$.apply(byte[].class));
-
- long windowInterval = Integer.parseInt(getPropOrEnv("SPARK_STREAMING_WINDOW_DURATION", "60000"));
- JavaDStream spanStream = dStream.window(Duration.apply(windowInterval)).flatMap((FlatMapFunction) kinesisRecord -> {
+ // TO DO can put windowing if needed later
+ //long windowInterval = Integer.parseInt(getPropOrEnv("SPARK_STREAMING_WINDOW_DURATION", "60000"));
+ JavaDStream spanStream = dStream.flatMap((FlatMapFunction) kinesisRecord -> {
String payload = new String(decompress(kinesisRecord), StandardCharsets.UTF_8);
String[] records = payload.split(System.lineSeparator());
List spanList = new LinkedList<>();
@@ -120,8 +151,13 @@ public static void main(String[] args) throws InterruptedException, IOException
span.serviceName = span.process.serviceName;
spanList.add(span);
}
+ Map tags = new HashMap<>();
+ for (Map.Entry tag: span.tag.entrySet()) {
+ tags.put (tag.getKey().replace("@","."),tag.getValue());
+ }
+ span.tag = tags;
} catch (Exception e) {
- System.out.println("Exception for record : "+record);
+ logger.error("Exception for record : "+record);
e.printStackTrace();
}
}
@@ -130,7 +166,7 @@ public static void main(String[] args) throws InterruptedException, IOException
JavaPairDStream traceIdSpanTuple = spanStream.mapToPair(record -> new Tuple2<>(record.traceID, record));
JavaDStream tracesStream = traceIdSpanTuple.groupByKey().map(traceIdSpans -> {
- System.out.printf("TraceID: %s\n", traceIdSpans._1);
+ logger.warn("TraceID: "+traceIdSpans._1);
Iterable spans = traceIdSpans._2();
Trace trace = new Trace();
trace.traceId = traceIdSpans._1();
@@ -140,12 +176,13 @@ public static void main(String[] args) throws InterruptedException, IOException
});
List modelRunner = Arrays.asList(
- new TraceHeight(),
- new ServiceDepth(),
- new ServiceHeight(),
+ /* Removing problematic metrics*/
+ //new TraceHeight(),
+ //new ServiceDepth(),
+ //new ServiceHeight(),
new NetworkLatency(),
- new NumberOfErrors(),
- new DirectDependencies(),
+ //new NumberOfErrors(),
+ //new DirectDependencies(),
// trace quality
new HasClientServerSpans(),
new UniqueSpanId());
@@ -160,6 +197,7 @@ public static void main(String[] args) throws InterruptedException, IOException
ssc.start();
ssc.awaitTermination();
+ ssc.stop();
}
private static String getPropOrEnv(String key, String defaultValue) {
diff --git a/tracedsl/src/main/java/io/jaegertracing/analytics/gremlin/GraphCreator.java b/tracedsl/src/main/java/io/jaegertracing/analytics/gremlin/GraphCreator.java
index 4e4d921..0652ea3 100644
--- a/tracedsl/src/main/java/io/jaegertracing/analytics/gremlin/GraphCreator.java
+++ b/tracedsl/src/main/java/io/jaegertracing/analytics/gremlin/GraphCreator.java
@@ -15,8 +15,7 @@
public class GraphCreator {
private GraphCreator() {}
-
- private static final Counter ORPHAN_SPAN_COUNTER = Counter.build()
+/* private static final Counter ORPHAN_SPAN_COUNTER = Counter.build()
.name("orphan_span_total")
.help("Number of orphan spans within single trace")
.labelNames("service", "operation")
@@ -40,7 +39,7 @@ private GraphCreator() {}
.help("Number of spans in trace with root span")
.create()
.register();
-
+*/
public static Graph create(Trace trace) {
TinkerGraph graph = TinkerGraph.open();
@@ -77,16 +76,16 @@ public static Graph create(Trace trace) {
if (parent != null) {
parent.addEdge(References.CHILD_OF, vertex);
} else {
- ORPHAN_SPAN_COUNTER.labels(span.serviceName, span.operationName).inc();
+ //ORPHAN_SPAN_COUNTER.labels(span.serviceName, span.operationName).inc();
hasOrphanSpan = true;
}
} else {
- TRACE_COUNTER.inc();
- TRACE_SPAN_COUNTER.observe(trace.spans.size());
+ //TRACE_COUNTER.inc();
+ //TRACE_SPAN_COUNTER.observe(trace.spans.size());
}
}
if(hasOrphanSpan) {
- ORPHAN_SPAN_TRACE_COUNTER.inc();
+ //ORPHAN_SPAN_TRACE_COUNTER.inc();
}
return graph;