diff --git a/pom.xml b/pom.xml index 3ed891e..56ea76a 100644 --- a/pom.xml +++ b/pom.xml @@ -82,9 +82,9 @@ UTF-8 - 2.11.8 - 2.11 - 2.1.0 + 2.12.10 + 2.12 + 3.1.2 1.8 @@ -122,22 +122,22 @@ joda-time joda-time - 2.9.3 + 2.10.13 org.joda joda-convert - 1.8.2 + 2.2.1 com.typesafe.scala-logging scala-logging_${scala.binary.version} - 3.7.1 + 3.9.4 org.scalatest scalatest_${scala.binary.version} - 2.2.6 + 3.0.8 test @@ -147,7 +147,7 @@ org.apache.maven.plugins maven-compiler-plugin - 2.5.1 + 3.8.1 ${java.version} ${java.version} @@ -169,7 +169,7 @@ org.apache.maven.plugins maven-surefire-plugin - 2.7 + 2.22.2 true @@ -177,7 +177,7 @@ org.sonatype.plugins nexus-staging-maven-plugin - 1.6.7 + 1.6.8 true ossrh @@ -188,7 +188,7 @@ org.apache.maven.plugins maven-source-plugin - 2.2.1 + 3.2.1 attach-sources @@ -201,7 +201,7 @@ net.alchim31.maven scala-maven-plugin - 3.2.1 + 4.5.3 attach-javadocs @@ -214,10 +214,9 @@ org.scalatest scalatest-maven-plugin - 1.0 + 2.0.2 ${project.build.directory}/surefire-reports - . once WDF TestSuite.txt @@ -233,7 +232,7 @@ org.apache.maven.plugins maven-release-plugin - 2.3.2 + 2.5.3 true false @@ -244,14 +243,14 @@ org.apache.maven.scm maven-scm-provider-gitexe - 1.8.1 + 1.12.0 org.apache.maven.plugins maven-gpg-plugin - 1.5 + 3.0.1 sign-artifacts diff --git a/rddofeventscsv.scala b/rddofeventscsv.scala new file mode 100644 index 0000000..647a13e --- /dev/null +++ b/rddofeventscsv.scala @@ -0,0 +1,51 @@ +import io.pathogen.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.graphx.Graph +import io.pathogen.spark.DateUtils.Frequency +import scala.io.Source +import org.apache.spark.sql.types._ + +/* + The csv file cointains: + + 12 events: 6 pairs of events of the objects sun and moon, and animals: rooster, dog and owl. + Some of these events are overlapped, other are not. + Events 13,14 and 15 are random events. + Once the file is read, the events are stored in an array of class Event. +*/ + + +var events = Array[Event]() +val eventsFile = Source.fromFile("src/test/resources/io/pathogen/spark/events_test.csv") +for (line <- eventsFile.getLines.drop(1)) { + val cols = line.split(",").map(_.trim) + val currentEvent = new Event(conceptId=cols(0).toLong, eventStart=cols(1).toLong, eventStop=cols(2).toLong, amplitude=cols(3).toLong) + events = events :+ currentEvent +} +eventsFile.close + +// The following cell creates an RDD of the previous events (array). +val spark:SparkSession = SparkSession.builder().master("local[1]").appName("test").getOrCreate() +val rdd:RDD[Event] = spark.sparkContext.parallelize(events) + +// Objects definitions +val config = new Config(samplingRate=Frequency.MINUTE,maxIterations=75) +val roosterObject = new Rooster(config) +val sunObject = new Sun(config) + +/* Rooster.Scala takes as input the initial time related events (an RDD of events) + and outputs the causal effects explained as a graph (graph[Pathogen, double]). +*/ +val g = roosterObject.crow(rdd) + +/* Sun.Scala takes as input a graph[Pathogen, double] of observed (and normalized) + correlations and returns a graph where vertices contain both a sensitivity and aggressiveness scores. + + Class Pathogen contains sensitiviy and aggressiveness. + Aggressiveness: measures how likely an event could explain downstreams effects (it causes other events) + Sensitivity: measures how likely an event results from an upstream event (it is caused by other ovents) +*/ + +val s = sunObject.rise(g) +val result = s.vertices.collect() diff --git a/src/main/scala/io/pathogen/spark/Rooster.scala b/src/main/scala/io/pathogen/spark/Rooster.scala index bf5b009..a116c04 100644 --- a/src/main/scala/io/pathogen/spark/Rooster.scala +++ b/src/main/scala/io/pathogen/spark/Rooster.scala @@ -17,12 +17,11 @@ package io.pathogen.spark import com.google.common.hash.Hashing -import com.typesafe.scalalogging.LazyLogging import org.apache.spark.HashPartitioner import org.apache.spark.graphx.{Edge, Graph, PartitionStrategy} import org.apache.spark.rdd.RDD -class Rooster(config: Config) extends Serializable with LazyLogging { +class Rooster(config: Config) extends Serializable { /** * @param events The initial time related events @@ -30,15 +29,15 @@ class Rooster(config: Config) extends Serializable with LazyLogging { */ def crow(events: RDD[Event]): Graph[Pathogen, Double] = { - logger.info(s"Observing correlation across ${events.count()} time related events") + println(s"Observing correlation across ${events.count()} time related events") if (config.simulations == 0) { - logger.warn("Correlation does not imply causation, proceed at your own risk") + println("Correlation does not imply causation, proceed at your own risk") } val correlations = getEventCorrelation(events) correlations.cache() val correlationCount = correlations.count() - logger.info(s"Found $correlationCount possible correlations") + println(s"Found $correlationCount possible correlations") val causalities = if (config.simulations > 1) { getEventCausality(events, correlations, config.simulations) @@ -109,13 +108,13 @@ class Rooster(config: Config) extends Serializable with LazyLogging { val randomCorrelations = getEventCorrelation(randomEvents) randomCorrelations.cache() val rcc = randomCorrelations.count() - logger.info(s"Monte carlo $simulation/$simulations - $rcc correlations found") + println(s"Monte carlo $simulation/$simulations - $rcc correlations found") randomCorrelations } } - logger.info("Normalizing causality score") + println("Normalizing causality score") val noiseHash = events.sparkContext.union(simulationResults) map { n => (n.srcId + n.dstId, n.attr) } reduceByKey (_ + _) partitionBy new HashPartitioner(events.partitions.length) diff --git a/src/main/scala/io/pathogen/spark/Sun.scala b/src/main/scala/io/pathogen/spark/Sun.scala index 7ca3cfa..b939963 100644 --- a/src/main/scala/io/pathogen/spark/Sun.scala +++ b/src/main/scala/io/pathogen/spark/Sun.scala @@ -16,22 +16,21 @@ package io.pathogen.spark -import com.typesafe.scalalogging.LazyLogging import io.pathogen.spark.Sun.VertexData import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import scala.util.Try -class Sun(config: Config) extends Serializable with LazyLogging { +class Sun(config: Config) extends Serializable { /** * @param causalGraph The graph of observed (and normalized) correlations * @return The Contagion graph where vertices contain both a sensitivity and aggressiveness scores */ def rise(causalGraph: Graph[Pathogen, Double]): Graph[Pathogen, Double] = { - val initGraph = initializeGraph(causalGraph) + initGraph.cache() // --------------- @@ -40,7 +39,7 @@ class Sun(config: Config) extends Serializable with LazyLogging { val vertexSensitivity = propagateCausality(initGraph, config.tolerance, config.maxIterations, config.forgetfulness) val totalSensitivity = vertexSensitivity.values.sum() - logger.info(s"Total sensitivity is ${"%.2f".format(totalSensitivity)}") + println(s"Total sensitivity is ${"%.2f".format(totalSensitivity)}") // --------------- // AGGRESSIVENESS @@ -48,7 +47,7 @@ class Sun(config: Config) extends Serializable with LazyLogging { val vertexAggressiveness = propagateCausality(initGraph.reverse, config.tolerance, config.maxIterations, config.forgetfulness) val totalAggressiveness = vertexAggressiveness.values.sum() - logger.info(s"Total aggressiveness is ${"%.2f".format(totalAggressiveness)}") + println(s"Total aggressiveness is ${"%.2f".format(totalAggressiveness)}") // --------------- // PATHOGEN @@ -73,7 +72,7 @@ class Sun(config: Config) extends Serializable with LazyLogging { if(config.erratic > 0.0f) { - logger.info("Modelling Erratic behavior requires full ergodicity") + println("Modelling Erratic behavior requires full ergodicity") val edges = causalGraph.edges.map { edge => ((edge.srcId, edge.dstId), edge.attr) } @@ -126,7 +125,7 @@ class Sun(config: Config) extends Serializable with LazyLogging { graph.cache() val vertices = graph.vertices.count() val edges = graph.edges.count() - logger.info(s"Starting explaining causality on $vertices hosts and $edges connections") + println(s"Starting explaining causality on $vertices hosts and $edges connections") // Execute Pregel to source to destination nodes val propagated = graph.pregel( @@ -147,9 +146,9 @@ class Sun(config: Config) extends Serializable with LazyLogging { val maxSteps = propagated.vertices.values.values.max() if (maxSteps == maxIterations) { val nonConverged = propagated.vertices.filter(_._2._2 == maxSteps).count() - logger.warn(s"$nonConverged/$vertices nodes did not converge after $maxIterations iterations") + println(s"$nonConverged/$vertices nodes did not converge after $maxIterations iterations") } else { - logger.info(s"Pathogen converged after $averageSteps steps in average, max is $maxSteps") + println(s"Pathogen converged after $averageSteps steps in average, max is $maxSteps") } propagated.vertices.mapValues(_._1) diff --git a/src/test/resources/io/pathogen/spark/events_test.csv b/src/test/resources/io/pathogen/spark/events_test.csv new file mode 100644 index 0000000..6ebef68 --- /dev/null +++ b/src/test/resources/io/pathogen/spark/events_test.csv @@ -0,0 +1,16 @@ +Id,eventStart,eventStop,amplitude +3,1609483500,1609487100,1 +1,1609484400,1609527600,1 +3,1609570500,1609571700,1 +1,1609571100,1609614600,1 +4,1609624800,1609626600,1 +1,1609657500,1609701000,1 +4,1609709400,1609710600,1 +1,1609743600,1609787700,1 +5,1609797600,1609806600,1 +2,1609790400,1609826400,1 +5,1609889400,1609894800,1 +2,1609878600,1609916400,1 +6,1610002800,1610031600,1 +7,1610053200,1610057700,1 +8,1610121600,1610136000,1 diff --git a/src/test/scala/io/pathogen/spark/RoosterSunTest.scala b/src/test/scala/io/pathogen/spark/RoosterSunTest.scala new file mode 100644 index 0000000..53c4dc2 --- /dev/null +++ b/src/test/scala/io/pathogen/spark/RoosterSunTest.scala @@ -0,0 +1,110 @@ +package io.pathogen.spark + +import java.text.SimpleDateFormat +import io.pathogen.spark.DateUtils.Frequency +import org.joda.time.DateTime +import org.scalatest.{FlatSpec, Matchers} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.graphx._ +import io.pathogen.spark.DateUtils.Frequency +import scala.io.Source +import scala.math.BigDecimal + + +class RoosterSunTest extends FlatSpec with Matchers { + + //Function for rounding + def round(n: Double): Double = { + BigDecimal(n).setScale(4, BigDecimal.RoundingMode.HALF_UP).toDouble + } + + //Reading CSV + var events = Array[Event]() + val eventsFile = Source.fromInputStream(this.getClass.getResourceAsStream("events_test.csv"), "UTF-8") + for (line <- eventsFile.getLines.drop(1)) { + val cols = line.split(",").map(_.trim) + val currentEvent = new Event(conceptId=cols(0).toLong, eventStart=cols(1).toLong, eventStop=cols(2).toLong, amplitude=cols(3).toLong) + events = events :+ currentEvent + } + eventsFile.close + + val spark:SparkSession = SparkSession.builder().master("local[1]").appName("test").getOrCreate() + spark.sparkContext.setLogLevel("OFF"); + val rdd:RDD[Event] = spark.sparkContext.parallelize(events) + + //Defining objects needed + val config = new Config(samplingRate=Frequency.MINUTE,maxIterations=75) + val roosterObject = new Rooster(config) + val sunObject = new Sun(config) + + //Calling function from rooster + val g = roosterObject.crow(rdd) + + //Check the correct format of the output of Rooster and the input of Sun + "Rooster" should "return correct class" in { + g shouldBe a [Graph[_,_]] + g.edges.collect()(0) shouldBe a [Edge[_]] + } + + + //Check the correct values of the edges + "Rooster" should "return correct edges values" in { + for ( edge <-g.edges.collect() ) + { + //spark.graphx.Edge.srcId = id of origin node, spark.graphx.Edge.dstId = id of destination node, spark.graphx.Edge.attr = attribute + if (edge.srcId == 1 && edge.dstId == 4) + { + round(edge.attr) shouldBe 0.2959 + } + else if (edge.srcId == 2 && edge.dstId == 5) + { + edge.attr shouldBe 1.0 + } + else if (edge.srcId == 1 && edge.dstId == 2) + { + round(edge.attr) shouldBe 0.3085 + } + else if (edge.srcId == 4 && edge.dstId == 1) + { + round(edge.attr) shouldBe 0.2959 + } + else if (edge.srcId == 1 && edge.dstId == 5) + { + round(edge.attr) shouldBe 0.2749 + } + else if (edge.srcId == 3 && edge.dstId == 1) + { + round(edge.attr) shouldBe 0.7785 + } + } + } + + //Calling function for sun + val s = sunObject.rise(g) + + //Check the correct format of the output of Sun + "Sun" should "return correct class" in { + s shouldBe a [Graph[_,_]] + s.vertices.collect()(0)._1 shouldBe a [java.lang.Long] + s.vertices.collect()(0)._2 shouldBe a [Pathogen] + } + + //Check the correct values of the vertices + "Sun" should "return correct vertices values" in { + val result = s.vertices.collect().toMap + + round(result.get(1).get.aggressiveness) shouldBe 0.3717 + round(result.get(1).get.sensitivity) shouldBe 0.4265 + round(result.get(2).get.aggressiveness) shouldBe 0.1239 + round(result.get(2).get.sensitivity) shouldBe 0.1421 + round(result.get(3).get.aggressiveness) shouldBe 0.3655 + round(result.get(3).get.sensitivity) shouldBe 0.4193 + round(result.get(4).get.aggressiveness) shouldBe 0.1389 + round(result.get(4).get.sensitivity) shouldBe 0.1594 + result.get(5).get shouldBe Pathogen(0.0,0.0) + result.get(6).get shouldBe Pathogen(0.0,0.0) + result.get(7).get shouldBe Pathogen(0.0,0.0) + result.get(8).get shouldBe Pathogen(0.0,0.0) + } +}