Skip to content

Commit

Permalink
Adding starts that are used in the performance evaluation of the system.
Browse files Browse the repository at this point in the history
  • Loading branch information
danistrebel committed Nov 27, 2013
1 parent 9aa4cbc commit 895eda3
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import com.signalcollect.fraudppuccino.structuredetection._
import com.signalcollect.Vertex
import akka.actor.ActorRef
import com.signalcollect.fraudppuccino.componentanalysis._
import java.lang.management.ManagementFactory
import scala.collection.JavaConversions._

/**
* A streamed execution that reads transactions from an input source and matches them
Expand Down Expand Up @@ -62,6 +64,9 @@ case class StreamingExecution(
sendStatusUpdate("update", "computation has started")

for (lowerWindowBound <- startTime to endTime by windowSize) {

val stepStartTime = System.currentTimeMillis

if (maxComponentDuration > 0) { //signal maxComponentDuration if it is set
retire(Array(lowerWindowBound - maxTxInterval, lowerWindowBound - 2 * maxTxInterval, lowerWindowBound - maxComponentDuration))
} else {
Expand All @@ -73,16 +78,27 @@ case class StreamingExecution(
load(sourceFile, lowerWindowBound, lowerWindowBound + windowSize)
val loadingTime = System.currentTimeMillis - loadingStartTime

sendStatusUpdate("progress", ((lowerWindowBound - startTime) * 100 / (endTime - startTime)).toString)

val executionStartTime = System.currentTimeMillis
graph.recalculateScores
graph.execute
val executionTime = System.currentTimeMillis - executionStartTime
val stepTime = System.currentTimeMillis - stepStartTime

if (debug) {
sendStatusUpdate("progress", ((lowerWindowBound - startTime) * 100 / (endTime - startTime)).toString)
println(loadingTime + "," + executionTime + "," + lowerWindowBound)
val memoryBean = ManagementFactory.getMemoryMXBean
val heapUsage = memoryBean.getHeapMemoryUsage.getUsed/1048576
val nonHeapUsage = memoryBean.getNonHeapMemoryUsage.getUsed/1048576
val totalMemoryUsage = heapUsage+nonHeapUsage

println(loadingTime + "," + executionTime + "," + stepTime + "," + lowerWindowBound + "," + heapUsage + "," + nonHeapUsage + "," + totalMemoryUsage +","+ timeInGC)
}
}

}

def timeInGC = {
ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package com.signalcollect.fraudppuccino.querylanguage
import FRAUDPPUCCINO._

object StreamingExecutionDemo extends App {

val complexity = if(args.size>=2) args(1) else 10
val txInterval = if(args.size>=3) args(2) else "1w"


val executionPlan = """
#path of the input data
Expand All @@ -22,23 +27,23 @@ start: 01/09/2009 02:50:00
end: 18/08/2013 17:30:00
window: 1d
transactionInterval: 1w
transactionInterval: """ + txInterval + """
#set to true if the matcher should follow all possible matching possibilities
exhaustiveMatching: true
#set the max number of inputs and outputs that are matched against each other
matchingComplexity: 10
matchingComplexity: """ + complexity + """
#sets the max duration of components to prevent them from lasting for the entire streaming period.
maxComponentDuration: 8w
#conditions that a component has to fulfil to be reported
filters: [SIZE > 5, SIZE<1000, SINKVALUE > 10000000000, DEPTH > 3, COUNTRYHOPS > 2]
filters: [SIZE > 5, SINKVALUE > 10000000000, DEPTH > 3, COUNTRYHOPS > 2]
#handlers that receive the reported components
#e.g. WEBSERVER, CONSOLE, MONGODB
handlers: [WEBSERVER]
handlers: [WEBSERVER, COUNTING]
debug: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ object ComponentResultHandler {
case "CONSOLE" => CommandLineResultHandler
case "WEBSERVER" => FraudppuccinoServer
case "MONGODB"=> MongoDBResultHandler
case "COUNTING" => CountingResultHandler
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.signalcollect.fraudppuccino.resulthandling

object CountingResultHandler extends ComponentResultHandler {
var count = 0

def processResult(jsonData: String): Unit = {
count+=1
}

override def processStatusMessage(jsonStatus: String): Unit = {
print(count + ",")
}

}

0 comments on commit 895eda3

Please sign in to comment.