From 29220320121a16a130e06af22d8f2322994af84f Mon Sep 17 00:00:00 2001 From: Sivatarun Ponnada Date: Mon, 28 Jun 2021 22:37:05 +0530 Subject: [PATCH 1/4] removing test-containers dependecy, and mimimzejar option --- pom.xml | 7 ++++--- spark/pom.xml | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index cdebb98..757770a 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ proto tracedsl spark - testcontainers + @@ -29,7 +29,7 @@ 2.12 2.3.0 0.7.0 - 1.13.0 + 4.11 4.0.2 3.1.0 @@ -53,11 +53,12 @@ jaeger-spark ${project.version} + io.jaegertracing diff --git a/spark/pom.xml b/spark/pom.xml index 399dd37..c0fcf3e 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -79,7 +79,7 @@ - true + From 8e9f325b910d93544c69a4298e05d0687b174e43 Mon Sep 17 00:00:00 2001 From: Sivatarun Ponnada Date: Tue, 29 Jun 2021 17:42:54 +0530 Subject: [PATCH 2/4] adding configuration for logj4, removing unnecessary metrics, adding soark configurations, changing @ in tag keys to . --- spark/pom.xml | 5 ++ .../analytics/spark/SparkKinesisRunner.java | 56 +++++++++++++------ .../analytics/gremlin/GraphCreator.java | 9 ++- 3 files changed, 49 insertions(+), 21 deletions(-) diff --git a/spark/pom.xml b/spark/pom.xml index c0fcf3e..5109417 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -38,6 +38,11 @@ + + org.apache.logging.log4j + log4j-api + 2.14.1 + org.apache.spark spark-streaming_${version.scala} diff --git a/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java b/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java index ab3debe..ef44a7e 100644 --- a/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java +++ b/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java @@ -40,12 +40,14 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import java.util.zip.GZIPInputStream; +import org.apache.log4j.Logger; +import org.apache.log4j.PropertyConfigurator; /** * @author Pavol Loffay */ public class SparkKinesisRunner { - +private static final Logger logger = Logger.getLogger(SparkKinesisRunner.class.getName()); /** * System Properties that can be passed * SPARK_MASTER @@ -63,6 +65,8 @@ public class SparkKinesisRunner { * @throws IOException */ public static void main(String[] args) throws InterruptedException, IOException { + PropertyConfigurator.configure("/data/log4j.properties"); // can make it configurable + //@transient lazy logger = LogManager.getRootLogger(); HTTPServer server = new HTTPServer(getPropOrEnv("PROMETHEUS_HOST", "localhost"), Integer.parseInt(getPropOrEnv("PROMETHEUS_PORT", "9111"))); JsonUtil.init(new ObjectMapper()); @@ -70,7 +74,18 @@ public static void main(String[] args) throws InterruptedException, IOException SparkConf sparkConf = new SparkConf() .setAppName("Trace DSL") .setMaster(getPropOrEnv("SPARK_MASTER", "local[*]")) - .set("spark.testing.memory", getPropOrEnv("SPARK_MEMORY", "471859200")); + .set("spark.testing.memory", getPropOrEnv("SPARK_MEMORY", "4073741824")) + .set("spark.default.parallelism", getPropOrEnv("SPARK_DEFAULT_PARALLELISM", "30")) + .set("spark.executor.cores", getPropOrEnv("SPARK_EXECUTOR_CORES", "1")) + .set("spark.driver.cores", getPropOrEnv("SPARK_DRIVER_CORES", "1")) + .set("spark.executor.instances", getPropOrEnv("SPARK_EXECUTOR_INSTANCES", "15")) + .set("spark.serializer", getPropOrEnv("SPARK_SERIALIZER", "org.apache.spark.serializer.KryoSerializer")); + +/* other explored values of settings + .set("spark.cores.max", getPropOrEnv("SPARK_CORES_MAX", "16")) + .set("spark.default.parallelism", getPropOrEnv("SPARK_DEFAULT_PARALLELISM", "16")) + .set("spark.executor.cores", getPropOrEnv("SPARK_EXECUTOR_CORES", "12")) +*/ JavaSparkContext sc = new JavaSparkContext(sparkConf); long batchInterval = Integer.parseInt(getPropOrEnv("SPARK_STREAMING_BATCH_DURATION", "10000")); @@ -85,27 +100,30 @@ public static void main(String[] args) throws InterruptedException, IOException String region = Regions.getCurrentRegion()!=null ? Regions.getCurrentRegion().getName() : getPropOrEnv("AWS_REGION", Regions.US_EAST_1.getName()); + String service_endpoint = getPropOrEnv("KINESIS_ENDPOINT", "https://kinesis.us-east-1.amazonaws.com"); + InitialPositionInStream initialPosition; try { initialPosition = InitialPositionInStream - .valueOf(getPropOrEnv("KINESIS_STREAM_POSITION", "TRIM_HORIZON")); + .valueOf(getPropOrEnv("KINESIS_STREAM_POSITION", "LATEST")); } catch (IllegalArgumentException e) { - initialPosition = InitialPositionInStream.valueOf("TRIM_HORIZON"); + initialPosition = InitialPositionInStream.valueOf("LATEST"); } - + + String applicationName = (getPropOrEnv("SPARK_STREAMING_BATCH_DURATION", "10000")); KinesisInputDStream kinesisStream = KinesisInputDStream.builder() .streamingContext(ssc) .regionName(region) .streamName(getPropOrEnv("KINESIS_STREAM", "common_haystack_traces")) .initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(initialPosition)) - .checkpointAppName("trace-analytics") + .checkpointAppName(getPropOrEnv("DYNAMO_TABLE", "trace-analytics")) .checkpointInterval(Duration.apply(60 * 1000)) .build(); JavaDStream dStream = JavaDStream.fromDStream(kinesisStream, ClassTag$.MODULE$.apply(byte[].class)); - - long windowInterval = Integer.parseInt(getPropOrEnv("SPARK_STREAMING_WINDOW_DURATION", "60000")); - JavaDStream spanStream = dStream.window(Duration.apply(windowInterval)).flatMap((FlatMapFunction) kinesisRecord -> { + // TO DO can put windowing if needed later + //long windowInterval = Integer.parseInt(getPropOrEnv("SPARK_STREAMING_WINDOW_DURATION", "60000")); + JavaDStream spanStream = dStream.flatMap((FlatMapFunction) kinesisRecord -> { String payload = new String(decompress(kinesisRecord), StandardCharsets.UTF_8); String[] records = payload.split(System.lineSeparator()); List spanList = new LinkedList<>(); @@ -120,8 +138,13 @@ public static void main(String[] args) throws InterruptedException, IOException span.serviceName = span.process.serviceName; spanList.add(span); } + Map tags = new HashMap<>(); + for (Map.Entry tag: span.tag.entrySet()) { + tags.put (tag.getKey().replace("@","."),tag.getValue()); + } + span.tag = tags; } catch (Exception e) { - System.out.println("Exception for record : "+record); + logger.error("Exception for record : "+record); e.printStackTrace(); } } @@ -130,7 +153,7 @@ public static void main(String[] args) throws InterruptedException, IOException JavaPairDStream traceIdSpanTuple = spanStream.mapToPair(record -> new Tuple2<>(record.traceID, record)); JavaDStream tracesStream = traceIdSpanTuple.groupByKey().map(traceIdSpans -> { - System.out.printf("TraceID: %s\n", traceIdSpans._1); + logger.info("TraceID: "+traceIdSpans._1); Iterable spans = traceIdSpans._2(); Trace trace = new Trace(); trace.traceId = traceIdSpans._1(); @@ -140,12 +163,13 @@ public static void main(String[] args) throws InterruptedException, IOException }); List modelRunner = Arrays.asList( - new TraceHeight(), - new ServiceDepth(), - new ServiceHeight(), + /* Removing problematic metrics*/ + //new TraceHeight(), + //new ServiceDepth(), + //new ServiceHeight(), new NetworkLatency(), - new NumberOfErrors(), - new DirectDependencies(), + //new NumberOfErrors(), + //new DirectDependencies(), // trace quality new HasClientServerSpans(), new UniqueSpanId()); diff --git a/tracedsl/src/main/java/io/jaegertracing/analytics/gremlin/GraphCreator.java b/tracedsl/src/main/java/io/jaegertracing/analytics/gremlin/GraphCreator.java index 4e4d921..d281183 100644 --- a/tracedsl/src/main/java/io/jaegertracing/analytics/gremlin/GraphCreator.java +++ b/tracedsl/src/main/java/io/jaegertracing/analytics/gremlin/GraphCreator.java @@ -15,8 +15,7 @@ public class GraphCreator { private GraphCreator() {} - - private static final Counter ORPHAN_SPAN_COUNTER = Counter.build() +/* private static final Counter ORPHAN_SPAN_COUNTER = Counter.build() .name("orphan_span_total") .help("Number of orphan spans within single trace") .labelNames("service", "operation") @@ -40,7 +39,7 @@ private GraphCreator() {} .help("Number of spans in trace with root span") .create() .register(); - +*/ public static Graph create(Trace trace) { TinkerGraph graph = TinkerGraph.open(); @@ -67,7 +66,7 @@ public static Graph create(Trace trace) { vertex.property(tag.key.replace("@","."), tag.value); }); } - +/* boolean hasOrphanSpan = false; for (Span span: trace.spans) { Vertex vertex = vertexMap.get(span.spanID); @@ -88,7 +87,7 @@ public static Graph create(Trace trace) { if(hasOrphanSpan) { ORPHAN_SPAN_TRACE_COUNTER.inc(); } - +*/ return graph; } From 2b34d16197bde358ed940982de7bba8028b12a41 Mon Sep 17 00:00:00 2001 From: Sivatarun Ponnada Date: Tue, 29 Jun 2021 23:02:50 +0530 Subject: [PATCH 3/4] addressing review comments --- .../analytics/spark/SparkKinesisRunner.java | 33 ++++++++++++------- .../analytics/gremlin/GraphCreator.java | 12 +++---- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java b/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java index ef44a7e..1fcb3e6 100644 --- a/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java +++ b/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java @@ -42,6 +42,9 @@ import java.util.zip.GZIPInputStream; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; +import java.util.Map; +import java.util.HashMap; + /** * @author Pavol Loffay @@ -60,13 +63,20 @@ public class SparkKinesisRunner { * PROMETHEUS_HOST * PROMETHEUS_PORT * All the above have default values + * @param args * @throws InterruptedException * @throws IOException */ + static final String SPARK_MEMORY = "4073741824"; + static final String SPARK_DEFAULT_PARALLELISM = "30"; + static final String SPARK_EXECUTOR_CORES = "1"; + static final String SPARK_DRIVER_CORES = "1"; + static final String SPARK_EXECUTOR_INSTANCES = "15"; + static final String SPARK_SERIALIZER = "org.apache.spark.serializer.KryoSerializer"; + public static void main(String[] args) throws InterruptedException, IOException { - PropertyConfigurator.configure("/data/log4j.properties"); // can make it configurable - //@transient lazy logger = LogManager.getRootLogger(); + HTTPServer server = new HTTPServer(getPropOrEnv("PROMETHEUS_HOST", "localhost"), Integer.parseInt(getPropOrEnv("PROMETHEUS_PORT", "9111"))); JsonUtil.init(new ObjectMapper()); @@ -74,12 +84,13 @@ public static void main(String[] args) throws InterruptedException, IOException SparkConf sparkConf = new SparkConf() .setAppName("Trace DSL") .setMaster(getPropOrEnv("SPARK_MASTER", "local[*]")) - .set("spark.testing.memory", getPropOrEnv("SPARK_MEMORY", "4073741824")) - .set("spark.default.parallelism", getPropOrEnv("SPARK_DEFAULT_PARALLELISM", "30")) - .set("spark.executor.cores", getPropOrEnv("SPARK_EXECUTOR_CORES", "1")) - .set("spark.driver.cores", getPropOrEnv("SPARK_DRIVER_CORES", "1")) - .set("spark.executor.instances", getPropOrEnv("SPARK_EXECUTOR_INSTANCES", "15")) - .set("spark.serializer", getPropOrEnv("SPARK_SERIALIZER", "org.apache.spark.serializer.KryoSerializer")); + .set("spark.testing.memory", getPropOrEnv("SPARK_MEMORY", SPARK_MEMORY)) + .set("spark.worker.cleanup.enabled", "true") + .set("spark.default.parallelism", getPropOrEnv("SPARK_DEFAULT_PARALLELISM", SPARK_DEFAULT_PARALLELISM)) + .set("spark.executor.cores", getPropOrEnv("SPARK_EXECUTOR_CORES",SPARK_EXECUTOR_CORES )) + .set("spark.driver.cores", getPropOrEnv("SPARK_DRIVER_CORES", SPARK_DRIVER_CORES)) + .set("spark.executor.instances", getPropOrEnv("SPARK_EXECUTOR_INSTANCES", SPARK_EXECUTOR_INSTANCES)) + .set("spark.serializer", getPropOrEnv("SPARK_SERIALIZER", SPARK_SERIALIZER)); /* other explored values of settings .set("spark.cores.max", getPropOrEnv("SPARK_CORES_MAX", "16")) @@ -99,9 +110,9 @@ public static void main(String[] args) throws InterruptedException, IOException String region = Regions.getCurrentRegion()!=null ? Regions.getCurrentRegion().getName() : getPropOrEnv("AWS_REGION", Regions.US_EAST_1.getName()); - - String service_endpoint = getPropOrEnv("KINESIS_ENDPOINT", "https://kinesis.us-east-1.amazonaws.com"); - + + String service_endpoint = getPropOrEnv("KINESIS_ENDPOINT","NULL"); + InitialPositionInStream initialPosition; try { initialPosition = InitialPositionInStream diff --git a/tracedsl/src/main/java/io/jaegertracing/analytics/gremlin/GraphCreator.java b/tracedsl/src/main/java/io/jaegertracing/analytics/gremlin/GraphCreator.java index d281183..0652ea3 100644 --- a/tracedsl/src/main/java/io/jaegertracing/analytics/gremlin/GraphCreator.java +++ b/tracedsl/src/main/java/io/jaegertracing/analytics/gremlin/GraphCreator.java @@ -66,7 +66,7 @@ public static Graph create(Trace trace) { vertex.property(tag.key.replace("@","."), tag.value); }); } -/* + boolean hasOrphanSpan = false; for (Span span: trace.spans) { Vertex vertex = vertexMap.get(span.spanID); @@ -76,18 +76,18 @@ public static Graph create(Trace trace) { if (parent != null) { parent.addEdge(References.CHILD_OF, vertex); } else { - ORPHAN_SPAN_COUNTER.labels(span.serviceName, span.operationName).inc(); + //ORPHAN_SPAN_COUNTER.labels(span.serviceName, span.operationName).inc(); hasOrphanSpan = true; } } else { - TRACE_COUNTER.inc(); - TRACE_SPAN_COUNTER.observe(trace.spans.size()); + //TRACE_COUNTER.inc(); + //TRACE_SPAN_COUNTER.observe(trace.spans.size()); } } if(hasOrphanSpan) { - ORPHAN_SPAN_TRACE_COUNTER.inc(); + //ORPHAN_SPAN_TRACE_COUNTER.inc(); } -*/ + return graph; } From bc7dd39f30f7a77554882d8fccaa73cfc553b71d Mon Sep 17 00:00:00 2001 From: Sivatarun Ponnada Date: Tue, 13 Jul 2021 12:17:58 +0530 Subject: [PATCH 4/4] addign new variable --- .../io/jaegertracing/analytics/spark/SparkKinesisRunner.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java b/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java index 1fcb3e6..20a6bab 100644 --- a/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java +++ b/spark/src/main/java/io/jaegertracing/analytics/spark/SparkKinesisRunner.java @@ -90,6 +90,7 @@ public static void main(String[] args) throws InterruptedException, IOException .set("spark.executor.cores", getPropOrEnv("SPARK_EXECUTOR_CORES",SPARK_EXECUTOR_CORES )) .set("spark.driver.cores", getPropOrEnv("SPARK_DRIVER_CORES", SPARK_DRIVER_CORES)) .set("spark.executor.instances", getPropOrEnv("SPARK_EXECUTOR_INSTANCES", SPARK_EXECUTOR_INSTANCES)) + .set("log4j.configuration", getPropOrEnv("log4j.configuration", "file:'/data/log4j.properties'")) .set("spark.serializer", getPropOrEnv("SPARK_SERIALIZER", SPARK_SERIALIZER)); /* other explored values of settings @@ -124,6 +125,7 @@ public static void main(String[] args) throws InterruptedException, IOException String applicationName = (getPropOrEnv("SPARK_STREAMING_BATCH_DURATION", "10000")); KinesisInputDStream kinesisStream = KinesisInputDStream.builder() .streamingContext(ssc) + .endpointUrl(service_endpoint) .regionName(region) .streamName(getPropOrEnv("KINESIS_STREAM", "common_haystack_traces")) .initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(initialPosition)) @@ -164,7 +166,7 @@ public static void main(String[] args) throws InterruptedException, IOException JavaPairDStream traceIdSpanTuple = spanStream.mapToPair(record -> new Tuple2<>(record.traceID, record)); JavaDStream tracesStream = traceIdSpanTuple.groupByKey().map(traceIdSpans -> { - logger.info("TraceID: "+traceIdSpans._1); + logger.warn("TraceID: "+traceIdSpans._1); Iterable spans = traceIdSpans._2(); Trace trace = new Trace(); trace.traceId = traceIdSpans._1(); @@ -195,6 +197,7 @@ public static void main(String[] args) throws InterruptedException, IOException ssc.start(); ssc.awaitTermination(); + ssc.stop(); } private static String getPropOrEnv(String key, String defaultValue) {