Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<module>proto</module>
<module>tracedsl</module>
<module>spark</module>
<module>testcontainers</module>
<!-- <module>testcontainers</module> -->
</modules>

<properties>
Expand All @@ -29,7 +29,7 @@
<version.scala>2.12</version.scala>
<version.kafka>2.3.0</version.kafka>
<version.prometheus>0.7.0</version.prometheus>
<version.testcontainers>1.13.0</version.testcontainers>
<!-- <version.testcontainers>1.13.0</version.testcontainers> -->
<version.junit>4.11</version.junit>
<version.awaitility>4.0.2</version.awaitility>
<version.maven-shade-plugin>3.1.0</version.maven-shade-plugin>
Expand All @@ -53,11 +53,12 @@
<artifactId>jaeger-spark</artifactId>
<version>${project.version}</version>
</dependency>
<!--
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jaeger-testcontainers</artifactId>
<version>${project.version}</version>
</dependency>
</dependency> -->

<dependency>
<groupId>io.jaegertracing</groupId>
Expand Down
7 changes: 6 additions & 1 deletion spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.14.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${version.scala}</artifactId>
Expand Down Expand Up @@ -79,7 +84,7 @@
</transformer>
</transformers>
<!-- spark + scala + ... = lots and lots of classes! -->
<minimizeJar>true</minimizeJar>
<!-- <minimizeJar>true</minimizeJar> -->
<filters>
<!-- Prevent minification from excluding classes looked up by name -->
<filter>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,17 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import java.util.zip.GZIPInputStream;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import java.util.Map;
import java.util.HashMap;


/**
* @author Pavol Loffay
*/
public class SparkKinesisRunner {

private static final Logger logger = Logger.getLogger(SparkKinesisRunner.class.getName());
/**
* System Properties that can be passed
* SPARK_MASTER
Expand All @@ -58,19 +63,41 @@ public class SparkKinesisRunner {
* PROMETHEUS_HOST
* PROMETHEUS_PORT
* All the above have default values

* @param args
* @throws InterruptedException
* @throws IOException
*/
static final String SPARK_MEMORY = "4073741824";
static final String SPARK_DEFAULT_PARALLELISM = "30";
static final String SPARK_EXECUTOR_CORES = "1";
static final String SPARK_DRIVER_CORES = "1";
static final String SPARK_EXECUTOR_INSTANCES = "15";
static final String SPARK_SERIALIZER = "org.apache.spark.serializer.KryoSerializer";

public static void main(String[] args) throws InterruptedException, IOException {

HTTPServer server = new HTTPServer(getPropOrEnv("PROMETHEUS_HOST", "localhost"), Integer.parseInt(getPropOrEnv("PROMETHEUS_PORT", "9111")));

JsonUtil.init(new ObjectMapper());

SparkConf sparkConf = new SparkConf()
.setAppName("Trace DSL")
.setMaster(getPropOrEnv("SPARK_MASTER", "local[*]"))
.set("spark.testing.memory", getPropOrEnv("SPARK_MEMORY", "471859200"));
.set("spark.testing.memory", getPropOrEnv("SPARK_MEMORY", SPARK_MEMORY))
.set("spark.worker.cleanup.enabled", "true")
.set("spark.default.parallelism", getPropOrEnv("SPARK_DEFAULT_PARALLELISM", SPARK_DEFAULT_PARALLELISM))
.set("spark.executor.cores", getPropOrEnv("SPARK_EXECUTOR_CORES",SPARK_EXECUTOR_CORES ))
.set("spark.driver.cores", getPropOrEnv("SPARK_DRIVER_CORES", SPARK_DRIVER_CORES))
.set("spark.executor.instances", getPropOrEnv("SPARK_EXECUTOR_INSTANCES", SPARK_EXECUTOR_INSTANCES))
.set("log4j.configuration", getPropOrEnv("log4j.configuration", "file:'/data/log4j.properties'"))
.set("spark.serializer", getPropOrEnv("SPARK_SERIALIZER", SPARK_SERIALIZER));

/* other explored values of settings
.set("spark.cores.max", getPropOrEnv("SPARK_CORES_MAX", "16"))
.set("spark.default.parallelism", getPropOrEnv("SPARK_DEFAULT_PARALLELISM", "16"))
.set("spark.executor.cores", getPropOrEnv("SPARK_EXECUTOR_CORES", "12"))
*/

JavaSparkContext sc = new JavaSparkContext(sparkConf);
long batchInterval = Integer.parseInt(getPropOrEnv("SPARK_STREAMING_BATCH_DURATION", "10000"));
Expand All @@ -84,28 +111,32 @@ public static void main(String[] args) throws InterruptedException, IOException

String region = Regions.getCurrentRegion()!=null ? Regions.getCurrentRegion().getName()
: getPropOrEnv("AWS_REGION", Regions.US_EAST_1.getName());


String service_endpoint = getPropOrEnv("KINESIS_ENDPOINT","NULL");

InitialPositionInStream initialPosition;
try {
initialPosition = InitialPositionInStream
.valueOf(getPropOrEnv("KINESIS_STREAM_POSITION", "TRIM_HORIZON"));
.valueOf(getPropOrEnv("KINESIS_STREAM_POSITION", "LATEST"));
} catch (IllegalArgumentException e) {
initialPosition = InitialPositionInStream.valueOf("TRIM_HORIZON");
initialPosition = InitialPositionInStream.valueOf("LATEST");
}


String applicationName = (getPropOrEnv("SPARK_STREAMING_BATCH_DURATION", "10000"));
KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
.streamingContext(ssc)
.endpointUrl(service_endpoint)
.regionName(region)
.streamName(getPropOrEnv("KINESIS_STREAM", "common_haystack_traces"))
.initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(initialPosition))
.checkpointAppName("trace-analytics")
.checkpointAppName(getPropOrEnv("DYNAMO_TABLE", "trace-analytics"))
.checkpointInterval(Duration.apply(60 * 1000))
.build();

JavaDStream<byte[]> dStream = JavaDStream.fromDStream(kinesisStream, ClassTag$.MODULE$.apply(byte[].class));

long windowInterval = Integer.parseInt(getPropOrEnv("SPARK_STREAMING_WINDOW_DURATION", "60000"));
JavaDStream<Span> spanStream = dStream.window(Duration.apply(windowInterval)).flatMap((FlatMapFunction<byte[], Span>) kinesisRecord -> {
// TO DO can put windowing if needed later
//long windowInterval = Integer.parseInt(getPropOrEnv("SPARK_STREAMING_WINDOW_DURATION", "60000"));
JavaDStream<Span> spanStream = dStream.flatMap((FlatMapFunction<byte[], Span>) kinesisRecord -> {
String payload = new String(decompress(kinesisRecord), StandardCharsets.UTF_8);
String[] records = payload.split(System.lineSeparator());
List<Span> spanList = new LinkedList<>();
Expand All @@ -120,8 +151,13 @@ public static void main(String[] args) throws InterruptedException, IOException
span.serviceName = span.process.serviceName;
spanList.add(span);
}
Map<String,String> tags = new HashMap<>();
for (Map.Entry<String, String> tag: span.tag.entrySet()) {
tags.put (tag.getKey().replace("@","."),tag.getValue());
}
span.tag = tags;
} catch (Exception e) {
System.out.println("Exception for record : "+record);
logger.error("Exception for record : "+record);
e.printStackTrace();
}
}
Expand All @@ -130,7 +166,7 @@ public static void main(String[] args) throws InterruptedException, IOException

JavaPairDStream<String, Span> traceIdSpanTuple = spanStream.mapToPair(record -> new Tuple2<>(record.traceID, record));
JavaDStream<Trace> tracesStream = traceIdSpanTuple.groupByKey().map(traceIdSpans -> {
System.out.printf("TraceID: %s\n", traceIdSpans._1);
logger.warn("TraceID: "+traceIdSpans._1);
Iterable<Span> spans = traceIdSpans._2();
Trace trace = new Trace();
trace.traceId = traceIdSpans._1();
Expand All @@ -140,12 +176,13 @@ public static void main(String[] args) throws InterruptedException, IOException
});

List<ModelRunner> modelRunner = Arrays.asList(
new TraceHeight(),
new ServiceDepth(),
new ServiceHeight(),
/* Removing problematic metrics*/
//new TraceHeight(),
//new ServiceDepth(),
//new ServiceHeight(),
new NetworkLatency(),
new NumberOfErrors(),
new DirectDependencies(),
//new NumberOfErrors(),
//new DirectDependencies(),
// trace quality
new HasClientServerSpans(),
new UniqueSpanId());
Expand All @@ -160,6 +197,7 @@ public static void main(String[] args) throws InterruptedException, IOException

ssc.start();
ssc.awaitTermination();
ssc.stop();
}

private static String getPropOrEnv(String key, String defaultValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
public class GraphCreator {

private GraphCreator() {}

private static final Counter ORPHAN_SPAN_COUNTER = Counter.build()
/* private static final Counter ORPHAN_SPAN_COUNTER = Counter.build()
.name("orphan_span_total")
.help("Number of orphan spans within single trace")
.labelNames("service", "operation")
Expand All @@ -40,7 +39,7 @@ private GraphCreator() {}
.help("Number of spans in trace with root span")
.create()
.register();

*/
public static Graph create(Trace trace) {
TinkerGraph graph = TinkerGraph.open();

Expand Down Expand Up @@ -77,16 +76,16 @@ public static Graph create(Trace trace) {
if (parent != null) {
parent.addEdge(References.CHILD_OF, vertex);
} else {
ORPHAN_SPAN_COUNTER.labels(span.serviceName, span.operationName).inc();
//ORPHAN_SPAN_COUNTER.labels(span.serviceName, span.operationName).inc();
hasOrphanSpan = true;
}
} else {
TRACE_COUNTER.inc();
TRACE_SPAN_COUNTER.observe(trace.spans.size());
//TRACE_COUNTER.inc();
//TRACE_SPAN_COUNTER.observe(trace.spans.size());
}
}
if(hasOrphanSpan) {
ORPHAN_SPAN_TRACE_COUNTER.inc();
//ORPHAN_SPAN_TRACE_COUNTER.inc();
}

return graph;
Expand Down