From 895eda337ecec9eb2d512f4cc1a74ca59e5609ca Mon Sep 17 00:00:00 2001 From: Daniel Strebel Date: Wed, 27 Nov 2013 20:41:03 +0100 Subject: [PATCH] Adding starts that are used in the performance evaluation of the system. --- .../querylanguage/StreamingExecution.scala | 22 ++++++++++++++++--- .../StreamingExecutionDemo.scala | 13 +++++++---- .../ComponentResultHandler.scala | 1 + .../CountingResultHandler.scala | 14 ++++++++++++ 4 files changed, 43 insertions(+), 7 deletions(-) create mode 100644 src/main/scala/com/signalcollect/fraudppuccino/resulthandling/CountingResultHandler.scala diff --git a/src/main/scala/com/signalcollect/fraudppuccino/querylanguage/StreamingExecution.scala b/src/main/scala/com/signalcollect/fraudppuccino/querylanguage/StreamingExecution.scala index 04097ae..ef58910 100644 --- a/src/main/scala/com/signalcollect/fraudppuccino/querylanguage/StreamingExecution.scala +++ b/src/main/scala/com/signalcollect/fraudppuccino/querylanguage/StreamingExecution.scala @@ -13,6 +13,8 @@ import com.signalcollect.fraudppuccino.structuredetection._ import com.signalcollect.Vertex import akka.actor.ActorRef import com.signalcollect.fraudppuccino.componentanalysis._ +import java.lang.management.ManagementFactory +import scala.collection.JavaConversions._ /** * A streamed execution that reads transactions from an input source and matches them @@ -62,6 +64,9 @@ case class StreamingExecution( sendStatusUpdate("update", "computation has started") for (lowerWindowBound <- startTime to endTime by windowSize) { + + val stepStartTime = System.currentTimeMillis + if (maxComponentDuration > 0) { //signal maxComponentDuration if it is set retire(Array(lowerWindowBound - maxTxInterval, lowerWindowBound - 2 * maxTxInterval, lowerWindowBound - maxComponentDuration)) } else { @@ -73,16 +78,27 @@ case class StreamingExecution( load(sourceFile, lowerWindowBound, lowerWindowBound + windowSize) val loadingTime = System.currentTimeMillis - loadingStartTime + sendStatusUpdate("progress", ((lowerWindowBound - startTime) * 100 / (endTime - startTime)).toString) + val executionStartTime = System.currentTimeMillis graph.recalculateScores graph.execute val executionTime = System.currentTimeMillis - executionStartTime + val stepTime = System.currentTimeMillis - stepStartTime + if (debug) { - sendStatusUpdate("progress", ((lowerWindowBound - startTime) * 100 / (endTime - startTime)).toString) - println(loadingTime + "," + executionTime + "," + lowerWindowBound) + val memoryBean = ManagementFactory.getMemoryMXBean + val heapUsage = memoryBean.getHeapMemoryUsage.getUsed/1048576 + val nonHeapUsage = memoryBean.getNonHeapMemoryUsage.getUsed/1048576 + val totalMemoryUsage = heapUsage+nonHeapUsage + + println(loadingTime + "," + executionTime + "," + stepTime + "," + lowerWindowBound + "," + heapUsage + "," + nonHeapUsage + "," + totalMemoryUsage +","+ timeInGC) } } - + } + + def timeInGC = { + ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum } /** diff --git a/src/main/scala/com/signalcollect/fraudppuccino/querylanguage/StreamingExecutionDemo.scala b/src/main/scala/com/signalcollect/fraudppuccino/querylanguage/StreamingExecutionDemo.scala index f43cc75..09b678b 100644 --- a/src/main/scala/com/signalcollect/fraudppuccino/querylanguage/StreamingExecutionDemo.scala +++ b/src/main/scala/com/signalcollect/fraudppuccino/querylanguage/StreamingExecutionDemo.scala @@ -3,6 +3,11 @@ package com.signalcollect.fraudppuccino.querylanguage import FRAUDPPUCCINO._ object StreamingExecutionDemo extends App { + + val complexity = if(args.size>=2) args(1) else 10 + val txInterval = if(args.size>=3) args(2) else "1w" + + val executionPlan = """ #path of the input data @@ -22,23 +27,23 @@ start: 01/09/2009 02:50:00 end: 18/08/2013 17:30:00 window: 1d -transactionInterval: 1w +transactionInterval: """ + txInterval + """ #set to true if the matcher should follow all possible matching possibilities exhaustiveMatching: true #set the max number of inputs and outputs that are matched against each other -matchingComplexity: 10 +matchingComplexity: """ + complexity + """ #sets the max duration of components to prevent them from lasting for the entire streaming period. maxComponentDuration: 8w #conditions that a component has to fulfil to be reported -filters: [SIZE > 5, SIZE<1000, SINKVALUE > 10000000000, DEPTH > 3, COUNTRYHOPS > 2] +filters: [SIZE > 5, SINKVALUE > 10000000000, DEPTH > 3, COUNTRYHOPS > 2] #handlers that receive the reported components #e.g. WEBSERVER, CONSOLE, MONGODB -handlers: [WEBSERVER] +handlers: [WEBSERVER, COUNTING] debug: true diff --git a/src/main/scala/com/signalcollect/fraudppuccino/resulthandling/ComponentResultHandler.scala b/src/main/scala/com/signalcollect/fraudppuccino/resulthandling/ComponentResultHandler.scala index 934dd59..118c82f 100644 --- a/src/main/scala/com/signalcollect/fraudppuccino/resulthandling/ComponentResultHandler.scala +++ b/src/main/scala/com/signalcollect/fraudppuccino/resulthandling/ComponentResultHandler.scala @@ -11,6 +11,7 @@ object ComponentResultHandler { case "CONSOLE" => CommandLineResultHandler case "WEBSERVER" => FraudppuccinoServer case "MONGODB"=> MongoDBResultHandler + case "COUNTING" => CountingResultHandler } } } diff --git a/src/main/scala/com/signalcollect/fraudppuccino/resulthandling/CountingResultHandler.scala b/src/main/scala/com/signalcollect/fraudppuccino/resulthandling/CountingResultHandler.scala new file mode 100644 index 0000000..c2cb364 --- /dev/null +++ b/src/main/scala/com/signalcollect/fraudppuccino/resulthandling/CountingResultHandler.scala @@ -0,0 +1,14 @@ +package com.signalcollect.fraudppuccino.resulthandling + +object CountingResultHandler extends ComponentResultHandler { + var count = 0 + + def processResult(jsonData: String): Unit = { + count+=1 + } + + override def processStatusMessage(jsonStatus: String): Unit = { + print(count + ",") + } + +} \ No newline at end of file