Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.1.0</spark.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.1.2</spark.version>
<java.version>1.8</java.version>
</properties>

Expand Down Expand Up @@ -122,22 +122,22 @@
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.3</version>
<version>2.10.13</version>
</dependency>
<dependency>
<groupId>org.joda</groupId>
<artifactId>joda-convert</artifactId>
<version>1.8.2</version>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>com.typesafe.scala-logging</groupId>
<artifactId>scala-logging_${scala.binary.version}</artifactId>
<version>3.7.1</version>
<version>3.9.4</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>2.2.6</version>
<version>3.0.8</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand All @@ -147,7 +147,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<version>3.8.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
Expand All @@ -169,15 +169,15 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<version>2.22.2</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.7</version>
<version>1.6.8</version>
<extensions>true</extensions>
<configuration>
<serverId>ossrh</serverId>
Expand All @@ -188,7 +188,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<version>3.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
Expand All @@ -201,7 +201,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.1</version>
<version>4.5.3</version>
<executions>
<execution>
<id>attach-javadocs</id>
Expand All @@ -214,10 +214,9 @@
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<version>2.0.2</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<forkMode>once</forkMode>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
Expand All @@ -233,7 +232,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<version>2.3.2</version>
<version>2.5.3</version>
<configuration>
<autoVersionSubmodules>true</autoVersionSubmodules>
<useReleaseProfile>false</useReleaseProfile>
Expand All @@ -244,14 +243,14 @@
<dependency>
<groupId>org.apache.maven.scm</groupId>
<artifactId>maven-scm-provider-gitexe</artifactId>
<version>1.8.1</version>
<version>1.12.0</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<version>3.0.1</version>
<executions>
<execution>
<id>sign-artifacts</id>
Expand Down
51 changes: 51 additions & 0 deletions rddofeventscsv.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import io.pathogen.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.graphx.Graph
import io.pathogen.spark.DateUtils.Frequency
import scala.io.Source
import org.apache.spark.sql.types._

/*
The csv file cointains:

12 events: 6 pairs of events of the objects sun and moon, and animals: rooster, dog and owl.
Some of these events are overlapped, other are not.
Events 13,14 and 15 are random events.
Once the file is read, the events are stored in an array of class Event.
*/


var events = Array[Event]()
val eventsFile = Source.fromFile("src/test/resources/io/pathogen/spark/events_test.csv")
for (line <- eventsFile.getLines.drop(1)) {
val cols = line.split(",").map(_.trim)
val currentEvent = new Event(conceptId=cols(0).toLong, eventStart=cols(1).toLong, eventStop=cols(2).toLong, amplitude=cols(3).toLong)
events = events :+ currentEvent
}
eventsFile.close

// The following cell creates an RDD of the previous events (array).
val spark:SparkSession = SparkSession.builder().master("local[1]").appName("test").getOrCreate()
val rdd:RDD[Event] = spark.sparkContext.parallelize(events)

// Objects definitions
val config = new Config(samplingRate=Frequency.MINUTE,maxIterations=75)
val roosterObject = new Rooster(config)
val sunObject = new Sun(config)

/* Rooster.Scala takes as input the initial time related events (an RDD of events)
and outputs the causal effects explained as a graph (graph[Pathogen, double]).
*/
val g = roosterObject.crow(rdd)

/* Sun.Scala takes as input a graph[Pathogen, double] of observed (and normalized)
correlations and returns a graph where vertices contain both a sensitivity and aggressiveness scores.

Class Pathogen contains sensitiviy and aggressiveness.
Aggressiveness: measures how likely an event could explain downstreams effects (it causes other events)
Sensitivity: measures how likely an event results from an upstream event (it is caused by other ovents)
*/

val s = sunObject.rise(g)
val result = s.vertices.collect()
13 changes: 6 additions & 7 deletions src/main/scala/io/pathogen/spark/Rooster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,27 @@
package io.pathogen.spark

import com.google.common.hash.Hashing
import com.typesafe.scalalogging.LazyLogging
import org.apache.spark.HashPartitioner
import org.apache.spark.graphx.{Edge, Graph, PartitionStrategy}
import org.apache.spark.rdd.RDD

class Rooster(config: Config) extends Serializable with LazyLogging {
class Rooster(config: Config) extends Serializable {

/**
* @param events The initial time related events
* @return the causal effects explained as a graph
*/
def crow(events: RDD[Event]): Graph[Pathogen, Double] = {

logger.info(s"Observing correlation across ${events.count()} time related events")
println(s"Observing correlation across ${events.count()} time related events")
if (config.simulations == 0) {
logger.warn("Correlation does not imply causation, proceed at your own risk")
println("Correlation does not imply causation, proceed at your own risk")
}

val correlations = getEventCorrelation(events)
correlations.cache()
val correlationCount = correlations.count()
logger.info(s"Found $correlationCount possible correlations")
println(s"Found $correlationCount possible correlations")

val causalities = if (config.simulations > 1) {
getEventCausality(events, correlations, config.simulations)
Expand Down Expand Up @@ -109,13 +108,13 @@ class Rooster(config: Config) extends Serializable with LazyLogging {
val randomCorrelations = getEventCorrelation(randomEvents)
randomCorrelations.cache()
val rcc = randomCorrelations.count()
logger.info(s"Monte carlo $simulation/$simulations - $rcc correlations found")
println(s"Monte carlo $simulation/$simulations - $rcc correlations found")
randomCorrelations

}
}

logger.info("Normalizing causality score")
println("Normalizing causality score")
val noiseHash = events.sparkContext.union(simulationResults) map { n =>
(n.srcId + n.dstId, n.attr)
} reduceByKey (_ + _) partitionBy new HashPartitioner(events.partitions.length)
Expand Down
17 changes: 8 additions & 9 deletions src/main/scala/io/pathogen/spark/Sun.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@

package io.pathogen.spark

import com.typesafe.scalalogging.LazyLogging
import io.pathogen.spark.Sun.VertexData
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

import scala.util.Try

class Sun(config: Config) extends Serializable with LazyLogging {
class Sun(config: Config) extends Serializable {

/**
* @param causalGraph The graph of observed (and normalized) correlations
* @return The Contagion graph where vertices contain both a sensitivity and aggressiveness scores
*/
def rise(causalGraph: Graph[Pathogen, Double]): Graph[Pathogen, Double] = {

val initGraph = initializeGraph(causalGraph)

initGraph.cache()

// ---------------
Expand All @@ -40,15 +39,15 @@ class Sun(config: Config) extends Serializable with LazyLogging {

val vertexSensitivity = propagateCausality(initGraph, config.tolerance, config.maxIterations, config.forgetfulness)
val totalSensitivity = vertexSensitivity.values.sum()
logger.info(s"Total sensitivity is ${"%.2f".format(totalSensitivity)}")
println(s"Total sensitivity is ${"%.2f".format(totalSensitivity)}")

// ---------------
// AGGRESSIVENESS
// ---------------

val vertexAggressiveness = propagateCausality(initGraph.reverse, config.tolerance, config.maxIterations, config.forgetfulness)
val totalAggressiveness = vertexAggressiveness.values.sum()
logger.info(s"Total aggressiveness is ${"%.2f".format(totalAggressiveness)}")
println(s"Total aggressiveness is ${"%.2f".format(totalAggressiveness)}")

// ---------------
// PATHOGEN
Expand All @@ -73,7 +72,7 @@ class Sun(config: Config) extends Serializable with LazyLogging {

if(config.erratic > 0.0f) {

logger.info("Modelling Erratic behavior requires full ergodicity")
println("Modelling Erratic behavior requires full ergodicity")
val edges = causalGraph.edges.map { edge =>
((edge.srcId, edge.dstId), edge.attr)
}
Expand Down Expand Up @@ -126,7 +125,7 @@ class Sun(config: Config) extends Serializable with LazyLogging {
graph.cache()
val vertices = graph.vertices.count()
val edges = graph.edges.count()
logger.info(s"Starting explaining causality on $vertices hosts and $edges connections")
println(s"Starting explaining causality on $vertices hosts and $edges connections")

// Execute Pregel to source to destination nodes
val propagated = graph.pregel(
Expand All @@ -147,9 +146,9 @@ class Sun(config: Config) extends Serializable with LazyLogging {
val maxSteps = propagated.vertices.values.values.max()
if (maxSteps == maxIterations) {
val nonConverged = propagated.vertices.filter(_._2._2 == maxSteps).count()
logger.warn(s"$nonConverged/$vertices nodes did not converge after $maxIterations iterations")
println(s"$nonConverged/$vertices nodes did not converge after $maxIterations iterations")
} else {
logger.info(s"Pathogen converged after $averageSteps steps in average, max is $maxSteps")
println(s"Pathogen converged after $averageSteps steps in average, max is $maxSteps")
}

propagated.vertices.mapValues(_._1)
Expand Down
16 changes: 16 additions & 0 deletions src/test/resources/io/pathogen/spark/events_test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Id,eventStart,eventStop,amplitude
3,1609483500,1609487100,1
1,1609484400,1609527600,1
3,1609570500,1609571700,1
1,1609571100,1609614600,1
4,1609624800,1609626600,1
1,1609657500,1609701000,1
4,1609709400,1609710600,1
1,1609743600,1609787700,1
5,1609797600,1609806600,1
2,1609790400,1609826400,1
5,1609889400,1609894800,1
2,1609878600,1609916400,1
6,1610002800,1610031600,1
7,1610053200,1610057700,1
8,1610121600,1610136000,1
Loading