diff --git a/pom.xml b/pom.xml
index 3de752d78..3c38224b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -8,8 +8,12 @@
3.0.0
+
+ transaction-event-processor
+ newMod
+
- org.sunbird
+ org.sunbird
knowledge-platform-jobs
1.0
pom
@@ -27,18 +31,12 @@
jobs-core
-
-
post-publish-processor
-
publish-pipeline
search-indexer
-
auto-creator-v2
content-auto-creator
- audit-history-indexer
- audit-event-generator
-
+ transaction-event-processor
qrcode-image-generator
dialcode-context-updater
cassandra-data-migration
diff --git a/transaction-event-processor/README.md b/transaction-event-processor/README.md
new file mode 100644
index 000000000..1521e3611
--- /dev/null
+++ b/transaction-event-processor/README.md
@@ -0,0 +1,65 @@
+# Audit Event Generator
+
+Audit Event Generator job generates an audit event for every transaction in Graph updation for content data modification
+
+## Getting Started
+
+These instructions will get you a copy of the project up and running on your local machine for development and testing purposes. See deployment for notes on how to deploy the project on a yarn or kubernetes.
+### Prerequisites
+
+1. Download flink-1.13.6-scala_2.12 from [apache-flink-downloads](https://www.apache.org/dyn/closer.lua/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz).
+2. Download [hadoop dependencies](https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar) (only for running on Yarn). Copy the hadoop dependency jar under lib folder of the flink download.
+3. export HADOOP_CLASSPATH=`/hadoop classpath` either in .bashrc or current execution shell.
+4. Docker installed.
+5. A running yarn cluster or a kubernetes cluster.
+
+### Build
+
+mvn clean install
+
+## Deployment
+
+### Yarn
+
+Flink requires memory to be allocated for both job-manager and task manager. -yjm parameter assigns job-manager memory and -ytm assigns task-manager memory.
+
+```
+./bin/flink run -m yarn-cluster -p 2 -yjm 1024m -ytm 1024m /audit-event-generator/target/audit-event-generator-0.0.1.jar
+```
+
+### Kubernetes
+
+```
+# Create a single node cluster
+k3d create --server-arg --no-deploy --server-arg traefik --name flink-cluster --image rancher/k3s:v1.0.0
+# Export the single node cluster into KUBECONFIG in the current shell or in ~/.bashrc.
+export KUBECONFIG="$(k3d get-kubeconfig --name='flink-cluster')"
+
+# Only for Mac OSX
+# /usr/local/bin/kubectl -> /Applications/Docker.app/Contents/Resources/bin/kubectl
+rm /usr/local/bin/kubectl
+brew link --overwrite kubernetes-cli
+
+# Create a configmap using the flink-configuration-configmap.yaml
+kubectl create -f knowledge-platform-job/kubernetes/flink-configuration-configmap.yaml
+
+# Create pods for jobmanager-service, job-manager and task-manager using the yaml files
+kubectl create -f knowledge-platform-job/kubernetes/jobmanager-service.yaml
+kubectl create -f knowledge-platform-job/kubernetes/jobmanager-deployment.yaml
+kubectl create -f knowledge-platform-job/kubernetes/taskmanager-deployment.yaml
+
+# Create a port-forwarding for accessing the job-manager UI on localhost:8081
+kubectl port-forward deployment/flink-jobmanager 8081:8081
+
+# Submit the job to the Kubernetes single node cluster flink-cluster
+./bin/flink run -m localhost:8081 /audit-event-generator/target/audit-event-generator-0.0.1.jar
+
+# Commands to delete the pods created in the cluster
+kubectl delete deployment/flink-jobmanager
+kubectl delete deployment/flink-taskmanager
+kubectl delete service/flink-jobmanager
+kubectl delete configmaps/flink-config
+
+# Command to stop the single-node cluster
+k3d stop --name="flink-cluster"
+```
diff --git a/transaction-event-processor/pom.xml b/transaction-event-processor/pom.xml
new file mode 100644
index 000000000..e5a489cd4
--- /dev/null
+++ b/transaction-event-processor/pom.xml
@@ -0,0 +1,269 @@
+
+
+
+ 4.0.0
+
+ org.sunbird
+ knowledge-platform-jobs
+ 1.0
+
+ transaction-event-processor
+ 1.0.0
+ jar
+ transaction-event-processor
+
+ Transaction Event Processor Flink Job
+
+
+
+ UTF-8
+ 1.4.0
+
+
+
+
+ org.apache.flink
+ flink-streaming-scala_${scala.version}
+ ${flink.version}
+
+
+
+
+ com.google.code.gson
+ gson
+ 2.10.1
+
+
+
+ org.sunbird
+ jobs-core
+ 1.0.0
+
+
+ joda-time
+ joda-time
+ 2.10.6
+
+
+ org.sunbird
+ jobs-core
+ 1.0.0
+ test-jar
+ test
+
+
+ org.apache.flink
+ flink-test-utils_${scala.version}
+ ${flink.version}
+ test
+
+
+ org.apache.flink
+ flink-runtime_${scala.version}
+ ${flink.version}
+ test
+ tests
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.version}
+ ${flink.version}
+ test
+ tests
+
+
+ org.scalatest
+ scalatest_${scala.version}
+ 3.0.6
+ test
+
+
+ org.mockito
+ mockito-core
+ 3.3.3
+ test
+
+
+ org.sunbird
+ platform-common
+ 1.0-beta
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ org.apache.logging.log4j
+ log4j-core
+
+
+ org.apache.logging.log4j
+ log4j-api
+
+
+
+
+ org.sunbird
+ platform-telemetry
+ 1.0-beta
+
+
+ com.squareup.okhttp3
+ mockwebserver
+ 4.9.1
+ test
+
+
+
+
+ src/main/scala
+ src/test/scala
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ 11
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.1
+
+
+
+ package
+
+ shade
+
+
+
+
+ com.google.code.findbugs:jsr305
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+ org.sunbird.job.transaction.task.TransactionEventProcessorStreamTask
+
+
+
+ reference.conf
+
+
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 4.4.0
+
+
+ 11
+ ${scala.maj.version}
+ false
+
+
+
+ scala-compile-first
+ process-resources
+
+ add-source
+ compile
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+
+
+ maven-surefire-plugin
+ 2.22.2
+
+ true
+
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+ 1.0
+
+ ${project.build.directory}/surefire-reports
+ .
+ audit-event-generator-testsuite.txt
+
+
+
+ test
+
+ test
+
+
+
+
+
+ org.scoverage
+ scoverage-maven-plugin
+ ${scoverage.plugin.version}
+
+ ${scala.version}
+ true
+ true
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.5
+
+
+
+ prepare-agent
+
+
+
+
+ report
+ test
+
+ report
+
+
+
+
+
+
+
+
diff --git a/transaction-event-processor/src/main/resources/log4j.properties b/transaction-event-processor/src/main/resources/log4j.properties
new file mode 100644
index 000000000..33e8e93ef
--- /dev/null
+++ b/transaction-event-processor/src/main/resources/log4j.properties
@@ -0,0 +1,14 @@
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.file=audit-event-generator.log
+log4j.appender.file.append=true
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.MaxFileSize=256KB
+log4j.appender.file.MaxBackupIndex=4
+log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
+log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
+
+
+
diff --git a/transaction-event-processor/src/main/resources/transaction-event-processor.conf b/transaction-event-processor/src/main/resources/transaction-event-processor.conf
new file mode 100644
index 000000000..9275a65b3
--- /dev/null
+++ b/transaction-event-processor/src/main/resources/transaction-event-processor.conf
@@ -0,0 +1,25 @@
+include "base-config.conf"
+
+kafka {
+ input.topic = "sunbirddev.learning.graph.events"
+ output.topic = "sunbirddev.telemetry.raw"
+ groupId = "sunbirddev-transaction-event-processor-group"
+}
+
+task {
+ consumer.parallelism = 1
+ parallelism = 1
+ producer.parallelism = 1
+ window.time = 60
+}
+
+schema {
+ basePath = "https://sunbirddev.blob.core.windows.net/sunbird-content-dev/schemas/local"
+}
+
+channel.default = "org.sunbird"
+
+job {
+ audit-event-generator = true
+ audit-history-indexer = true
+}
diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/CaseClasses.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/CaseClasses.scala
new file mode 100644
index 000000000..e6dbfa930
--- /dev/null
+++ b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/CaseClasses.scala
@@ -0,0 +1,7 @@
+package org.sunbird.job.transaction.domain
+
+import java.io.Serializable
+import java.util.Date
+
+@SerialVersionUID(-5779950964487302125L)
+case class AuditHistoryRecord(var objectId: String, objectType: String, label: String, graphId: String, var userId: String, requestId: String, logRecord: String, operation: String, createdOn: Date) extends Serializable
\ No newline at end of file
diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/Event.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/Event.scala
new file mode 100644
index 000000000..c73f5d21a
--- /dev/null
+++ b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/Event.scala
@@ -0,0 +1,61 @@
+
+package org.sunbird.job.transaction.domain
+
+import org.apache.commons.lang3.StringUtils
+import org.sunbird.job.domain.reader.JobRequest
+
+import java.text.{DateFormat, SimpleDateFormat}
+import java.util
+import java.util.Date
+
+class Event(eventMap: java.util.Map[String, Any], partition: Int, offset: Long) extends JobRequest(eventMap, partition, offset) {
+
+ private val df:DateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
+
+ private val IMAGE_SUFFIX = ".img"
+
+ private val jobName = "TransactionEventProcessor"
+
+ def id: String = readOrDefault("nodeUniqueId", "")
+
+ def operationType: String = readOrDefault("operationType", null)
+
+ def nodeUniqueId: String = readOrDefault("nodeUniqueId", "")
+
+ def createdOn: String = readOrDefault("createdOn", "")
+
+ def channelId(channel: String): String = readOrDefault("channel", channel)
+
+ def objectId: String = if (null != nodeUniqueId) nodeUniqueId.replaceAll(IMAGE_SUFFIX, "") else nodeUniqueId
+
+ def objectType: String = readOrDefault[String]("objectType", null)
+
+ def userId: String = readOrDefault[String]("userId", "")
+
+ def transactionData: Map[String, AnyRef] = {
+ readOrDefault("transactionData", new util.HashMap[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]]
+ }
+
+ def nodeType: String = readOrDefault("nodeType", "")
+
+ def ets: Long = readOrDefault("ets", 0L)
+
+ def label: String = readOrDefault("label", "")
+
+ def graphId: String = readOrDefault("graphId", "")
+
+ def requestId: String = readOrDefault("requestId", "")
+
+ def syncMessage: String = readOrDefault("syncMessage", null)
+
+ def createdOnDate: Date = if (createdOn.isEmpty) new Date else df.parse(createdOn)
+
+ def audit: Boolean = readOrDefault("audit", true)
+
+ def isValid: Boolean = {
+ StringUtils.isNotBlank(objectType) ||
+ operationType != null && null == syncMessage && audit
+ }
+
+
+}
diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditEventGenerator.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditEventGenerator.scala
new file mode 100644
index 000000000..a096260e8
--- /dev/null
+++ b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditEventGenerator.scala
@@ -0,0 +1,49 @@
+package org.sunbird.job.transaction.functions
+
+import java.util
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.slf4j.LoggerFactory
+import org.sunbird.job.transaction.domain.Event
+import org.sunbird.job.transaction.service.TransactionEventProcessorService
+import org.sunbird.job.transaction.task.TransactionEventProcessorConfig
+import org.sunbird.job.exception.InvalidEventException
+import org.sunbird.job.{BaseProcessFunction, Metrics}
+
+class AuditEventGenerator(config: TransactionEventProcessorConfig)
+ (implicit mapTypeInfo: TypeInformation[util.Map[String, AnyRef]],
+ stringTypeInfo: TypeInformation[String])
+ extends BaseProcessFunction[Event, String](config) with TransactionEventProcessorService{
+
+ private[this] lazy val logger = LoggerFactory.getLogger(classOf[AuditEventGenerator])
+
+ override def metricsList(): List[String] = {
+ List(config.totalEventsCount, config.successEventCount, config.failedEventCount, config.skippedEventCount, config.emptySchemaEventCount, config.emptyPropsEventCount)
+ }
+
+ override def open(parameters: Configuration): Unit = {
+ super.open(parameters)
+ }
+
+ override def close(): Unit = {
+ super.close()
+ }
+
+ @throws(classOf[InvalidEventException])
+ override def processElement(event: Event,
+ context: ProcessFunction[Event, String]#Context,
+ metrics: Metrics): Unit = {
+ try {
+ metrics.incCounter(config.totalEventsCount)
+ if(event.isValid) {
+ logger.info("valid event: "+event.nodeUniqueId)
+ processEvent(event, context, metrics)(config)
+ } else metrics.incCounter(config.skippedEventCount)
+ } catch {
+ case ex: Exception =>
+ metrics.incCounter(config.failedEventCount)
+ throw new InvalidEventException(ex.getMessage, Map("partition" -> event.partition,"offset" -> event.offset),ex)
+ }
+ }
+}
diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditHistoryIndexer.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditHistoryIndexer.scala
new file mode 100644
index 000000000..fe6fe2ae0
--- /dev/null
+++ b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditHistoryIndexer.scala
@@ -0,0 +1,44 @@
+package org.sunbird.job.transaction.functions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.sunbird.job.transaction.domain.Event
+import org.sunbird.job.transaction.service.TransactionEventProcessorService
+import org.sunbird.job.transaction.task.TransactionEventProcessorConfig
+import org.sunbird.job.util.ElasticSearchUtil
+import org.sunbird.job.{BaseProcessKeyedFunction, Metrics}
+
+import java.util
+
+class AuditHistoryIndexer(config: TransactionEventProcessorConfig, var esUtil: ElasticSearchUtil)
+ (implicit mapTypeInfo: TypeInformation[util.Map[String, Any]],
+ stringTypeInfo: TypeInformation[String])
+ extends BaseProcessKeyedFunction[String, Event, String](config) with TransactionEventProcessorService{
+
+
+ override def metricsList(): List[String] = {
+ List(config.totalEventsCount, config.successEventCount, config.failedEventCount, config.esFailedEventCount, config.skippedEventCount)
+ }
+
+ override def open(parameters: Configuration): Unit = {
+ super.open(parameters)
+ if (esUtil == null) {
+ esUtil = new ElasticSearchUtil(config.esConnectionInfo, config.auditHistoryIndex, config.auditHistoryIndexType)
+ }
+ }
+
+ override def close(): Unit = {
+ esUtil.close()
+ super.close()
+ }
+
+ override def processElement(event: Event,
+ context: KeyedProcessFunction[String, Event, String]#Context,
+ metrics: Metrics): Unit = {
+ metrics.incCounter(config.totalEventsCount)
+ if(event.isValid) {
+ processEvent(event, metrics)(esUtil, config)
+ } else metrics.incCounter(config.skippedEventCount)
+ }
+}
diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/service/TransactionEventProcessorService.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/service/TransactionEventProcessorService.scala
new file mode 100644
index 000000000..7b4f043c5
--- /dev/null
+++ b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/service/TransactionEventProcessorService.scala
@@ -0,0 +1,240 @@
+package org.sunbird.job.transaction.service
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.commons.lang3.StringUtils
+import org.slf4j.LoggerFactory
+import org.sunbird.job.Metrics
+import org.sunbird.job.util.{ElasticSearchUtil, JSONUtil}
+import org.sunbird.telemetry.TelemetryGenerator
+import org.sunbird.telemetry.TelemetryParams
+import org.sunbird.job.transaction.domain.{AuditHistoryRecord, Event}
+import org.sunbird.job.domain.`object`.{DefinitionCache, ObjectDefinition}
+import com.google.gson.Gson
+import org.sunbird.job.exception.InvalidEventException
+import org.sunbird.job.transaction.task.TransactionEventProcessorConfig
+
+import java.io.IOException
+import java.util
+import java.text.SimpleDateFormat
+import java.util.{Calendar, Date, TimeZone}
+
+trait TransactionEventProcessorService {
+ private[this] lazy val logger = LoggerFactory.getLogger(classOf[TransactionEventProcessorService])
+ private val OBJECT_TYPE_IMAGE_SUFFIX = "Image"
+ private val SKIP_AUDIT = """{"object": {"type":null}}"""
+ private lazy val definitionCache = new DefinitionCache
+ private lazy val gson = new Gson
+
+ private val systemPropsList = List("IL_SYS_NODE_TYPE", "IL_FUNC_OBJECT_TYPE", "IL_UNIQUE_ID", "IL_TAG_NAME", "IL_ATTRIBUTE_NAME", "IL_INDEXABLE_METADATA_KEY", "IL_NON_INDEXABLE_METADATA_KEY",
+ "IL_IN_RELATIONS_KEY", "IL_OUT_RELATIONS_KEY", "IL_REQUIRED_PROPERTIES", "IL_SYSTEM_TAGS_KEY", "IL_SEQUENCE_INDEX", "SYS_INTERNAL_LAST_UPDATED_ON", "lastUpdatedOn", "versionKey", "lastStatusChangedOn")
+
+ private def getContext(channelId: String, env: String): Map[String, String] = {
+ val context = Map(
+ TelemetryParams.ACTOR.name -> "org.ekstep.learning.platform",
+ TelemetryParams.CHANNEL.name -> channelId,
+ TelemetryParams.ENV.name -> env
+ )
+ context
+ }
+
+ @throws(classOf[InvalidEventException])
+ def processEvent(message: Event, context: ProcessFunction[Event, String]#Context, metrics: Metrics)(implicit config: TransactionEventProcessorConfig): Unit = {
+ logger.info("AUDIT Event::" + JSONUtil.serialize(message))
+ logger.info("Input Message Received for : [" + message.nodeUniqueId + "], Txn Event createdOn:" + message.createdOn + ", Operation Type:" + message.operationType)
+ try {
+ val (auditEventStr, objectType) = getAuditMessage(message)(config, metrics)
+ if (StringUtils.isNotBlank(objectType)) {
+ context.output(config.auditOutputTag, auditEventStr)
+ logger.info("Telemetry Audit Message Successfully Sent for : " + message.objectId + " :: event ::" + auditEventStr)
+ metrics.incCounter(config.successEventCount)
+ }
+ else {
+ logger.info("Skipped event as the objectype is not available, event =" + auditEventStr)
+ metrics.incCounter(config.emptyPropsEventCount)
+ }
+ } catch {
+ case e: Exception =>
+ logger.error("Failed to process message :: " + JSONUtil.serialize(message), e)
+ throw e
+ }
+ }
+
+ def getDefinition(objectType: String)(implicit config: TransactionEventProcessorConfig, metrics: Metrics): ObjectDefinition = {
+ try {
+ definitionCache.getDefinition(objectType, config.configVersion, config.basePath)
+ } catch {
+ case ex: Exception => {
+ metrics.incCounter(config.emptySchemaEventCount)
+ new ObjectDefinition(objectType, config.configVersion, Map[String, AnyRef](), Map[String, AnyRef]())
+ }
+ }
+ }
+
+
+ def getAuditMessage(message: Event)(implicit config: TransactionEventProcessorConfig, metrics: Metrics): (String, String) = {
+ var auditMap: String = null
+ var objectType = message.objectType
+ val env = if (null != objectType) objectType.toLowerCase.replace("image", "") else "system"
+
+ val definitionNode: ObjectDefinition = getDefinition(objectType)
+
+ val propertyMap = message.transactionData("properties").asInstanceOf[Map[String, AnyRef]]
+ val statusMap = propertyMap.getOrElse("status", null).asInstanceOf[Map[String, AnyRef]]
+ val lastStatusChangedOn = propertyMap.getOrElse("lastStatusChangedOn", null).asInstanceOf[Map[String, AnyRef]]
+ val addedRelations = message.transactionData.getOrElse("addedRelations", List[Map[String, AnyRef]]()).asInstanceOf[List[Map[String, AnyRef]]]
+ val removedRelations = message.transactionData.getOrElse("removedRelations", List[Map[String, AnyRef]]()).asInstanceOf[List[Map[String, AnyRef]]]
+
+ var pkgVersion = ""
+ var prevStatus = ""
+ var currStatus = ""
+ var duration = ""
+ val pkgVerMap = propertyMap.getOrElse("pkgVersion", null).asInstanceOf[Map[String, AnyRef]]
+ if (null != pkgVerMap) pkgVersion = s"${pkgVerMap.get("nv")}"
+
+ if (null != statusMap) {
+ prevStatus = statusMap.getOrElse("ov", null).asInstanceOf[String]
+ currStatus = statusMap.getOrElse("nv", null).asInstanceOf[String]
+ // Compute Duration for Status Change
+ if (StringUtils.isNotBlank(currStatus) && StringUtils.isNotBlank(prevStatus) && null != lastStatusChangedOn) {
+ var ov = lastStatusChangedOn.getOrElse("ov", null).asInstanceOf[String]
+ val nv = lastStatusChangedOn.getOrElse("nv", null).asInstanceOf[String]
+ if (null == ov) ov = propertyMap.getOrElse("lastUpdatedOn", null).asInstanceOf[Map[String, AnyRef]].getOrElse("ov", null).asInstanceOf[String]
+ if (null != ov && null != nv) duration = String.valueOf(computeDuration(ov, nv))
+ }
+ }
+
+ var props: List[String] = propertyMap.keys.toList
+ props ++= getRelationProps(addedRelations, definitionNode)
+ props ++= getRelationProps(removedRelations, definitionNode)
+ val propsExceptSystemProps = props.filter(prop => !systemPropsList.contains(prop))
+ val cdata = getCData(addedRelations, removedRelations, propertyMap)
+
+ var context: Map[String, String] = getContext(message.channelId(config.defaultChannel), env)
+
+ objectType = if (null != objectType) objectType.replaceAll(OBJECT_TYPE_IMAGE_SUFFIX, "") else objectType
+ context ++= Map("objectId" -> message.objectId, "objectType" -> objectType)
+
+ if (StringUtils.isNotBlank(duration)) context ++= Map("duration" -> duration)
+ if (StringUtils.isNotBlank(pkgVersion)) context ++= Map("pkgVersion" -> pkgVersion)
+ if (StringUtils.isNotBlank(message.userId)) context ++= Map(TelemetryParams.ACTOR.name -> message.userId)
+
+ if (propsExceptSystemProps.nonEmpty) {
+ val cdataList = gson.fromJson(JSONUtil.serialize(cdata), classOf[java.util.List[java.util.Map[String, Object]]])
+
+ TelemetryGenerator.setComponent("audit-event-generator")
+ auditMap = TelemetryGenerator.audit(
+ JSONUtil.deserialize[util.Map[String, String]](JSONUtil.serialize(context)),
+ JSONUtil.deserialize[util.List[String]](JSONUtil.serialize(propsExceptSystemProps)),
+ currStatus,
+ prevStatus,
+ cdataList)
+ logger.info("Audit Message for Content Id [" + message.objectId + "] : " + auditMap);
+
+ (auditMap, message.objectType)
+ }
+ else {
+ logger.info("Skipping Audit log as props is null or empty")
+ (SKIP_AUDIT, "")
+ }
+ }
+
+ /**
+ * @param addedRelations
+ * @param removedRelations
+ * @param propertyMap
+ * @return
+ */
+ private def getCData(addedRelations: List[Map[String, AnyRef]], removedRelations: List[Map[String, AnyRef]], propertyMap: Map[String, AnyRef]): List[Map[String, AnyRef]] = {
+ var cdata = List[Map[String, AnyRef]]()
+ if (null != propertyMap && propertyMap.nonEmpty && propertyMap.contains("dialcodes")) {
+ val dialcodeMap = propertyMap("dialcodes").asInstanceOf[Map[String, AnyRef]]
+ val dialcodes = dialcodeMap("nv").asInstanceOf[List[String]]
+ if (null != dialcodes) {
+ cdata :+= Map[String, AnyRef]("id" -> dialcodes, "type" -> "DialCode")
+ }
+ }
+ if (null != addedRelations && addedRelations.nonEmpty) cdata ++= prepareCMap(addedRelations)
+ if (null != removedRelations && removedRelations.nonEmpty) cdata ++= prepareCMap(removedRelations)
+ cdata
+ }
+
+ /**
+ * @param relations
+ */
+ private def prepareCMap(relations: List[Map[String, AnyRef]]): List[Map[String, AnyRef]] = {
+ relations.map(relation => {
+ Map[String, AnyRef]("id" -> relation("id"), "type" -> relation("type"))
+ })
+ }
+
+ /**
+ *
+ * @param relations
+ */
+ private def getRelationProps(relations: List[Map[String, AnyRef]], objectDefinition: ObjectDefinition)(implicit config: TransactionEventProcessorConfig):List[String] = {
+ var relationProps = List[String]()
+ if (relations.nonEmpty) {
+ relations.foreach(rel => {
+ val direction = rel.getOrElse("dir", "").asInstanceOf[String]
+ val relationType = rel.getOrElse("rel", "").asInstanceOf[String]
+ val targetObjType = rel.getOrElse("type", "").asInstanceOf[String]
+ val relationProp = objectDefinition.relationLabel(targetObjType, direction, relationType)
+ if (relationProp.nonEmpty) {
+ relationProps :+= relationProp.get
+ }
+ })
+ }
+ relationProps
+ }
+
+ /**
+ * @param oldDate
+ * @param newDate
+ * @return
+ */
+ def computeDuration(oldDate: String, newDate: String): Long = {
+ val sdf:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+ val od = sdf.parse(oldDate)
+ val nd = sdf.parse(newDate)
+ val diff = nd.getTime - od.getTime
+ val diffSeconds = diff / 1000
+ diffSeconds
+ }
+
+ @throws(classOf[InvalidEventException])
+ def processEvent(event: Event, metrics: Metrics)(implicit esUtil: ElasticSearchUtil, config: TransactionEventProcessorConfig): Unit = {
+ if (event.isValid) {
+ val identifier = event.nodeUniqueId
+ logger.info("Audit learning event received : " + identifier)
+ try {
+ val record = getAuditHistory(event)
+ val document = JSONUtil.serialize(record)
+ val indexName = getIndexName(event.ets)
+ esUtil.addDocumentWithIndex(document, indexName)
+ logger.info("Audit record created for " + identifier)
+ metrics.incCounter(config.successEventCount)
+ } catch {
+ case ex: IOException =>
+ logger.error("Error while indexing message :: " + event.getJson + " :: " + ex.getMessage)
+ ex.printStackTrace()
+ metrics.incCounter(config.esFailedEventCount)
+ throw new InvalidEventException(ex.getMessage, Map("partition" -> event.partition, "offset" -> event.offset), ex)
+ case ex: Exception =>
+ logger.error("Error while processing message :: " + event.getJson + " :: ", ex)
+ metrics.incCounter(config.failedEventCount)
+ }
+ }
+ else logger.info("Learning event not qualified for audit")
+ }
+
+ private def getIndexName(ets: Long)(implicit config: TransactionEventProcessorConfig): String = {
+ val cal = Calendar.getInstance(TimeZone.getTimeZone(config.timeZone))
+ cal.setTime(new Date(ets))
+ config.auditHistoryIndex + "_" + cal.get(Calendar.YEAR) + "_" + cal.get(Calendar.WEEK_OF_YEAR)
+ }
+
+ def getAuditHistory(transactionDataMap: Event): AuditHistoryRecord = {
+ val nodeUniqueId = StringUtils.replace(transactionDataMap.nodeUniqueId, ".img", "")
+ AuditHistoryRecord(nodeUniqueId, transactionDataMap.objectType, transactionDataMap.label, transactionDataMap.graphId, transactionDataMap.userId, transactionDataMap.requestId, JSONUtil.serialize(transactionDataMap.transactionData), transactionDataMap.operationType, transactionDataMap.createdOnDate)
+ }
+
+}
\ No newline at end of file
diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorConfig.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorConfig.scala
new file mode 100644
index 000000000..7af09f05e
--- /dev/null
+++ b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorConfig.scala
@@ -0,0 +1,56 @@
+package org.sunbird.job.transaction.task
+
+import java.util
+import com.typesafe.config.Config
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.sunbird.job.BaseJobConfig
+
+class TransactionEventProcessorConfig(override val config: Config) extends BaseJobConfig(config, "transaction-event-processor") {
+
+ private val serialVersionUID = 2905979434303791379L
+
+ implicit val mapTypeInfo: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]])
+ implicit val stringTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String])
+
+ // Kafka Topics Configuration
+ val kafkaInputTopic: String = config.getString("kafka.input.topic")
+ val kafkaOutputTopic: String = config.getString("kafka.output.topic")
+ override val kafkaConsumerParallelism: Int = config.getInt("task.consumer.parallelism")
+ override val parallelism: Int = config.getInt("task.parallelism")
+ val kafkaProducerParallelism: Int = config.getInt("task.producer.parallelism")
+ val auditEventGenerator: Boolean = config.getBoolean("job.audit-event-generator")
+ val auditHistoryIndexer: Boolean = config.getBoolean("job.audit-history-indexer")
+
+ val auditOutputTag: OutputTag[String] = OutputTag[String]("audit-event-tag")
+
+ val defaultChannel: String =config.getString("channel.default")
+
+ // Metric List
+ val totalEventsCount = "total-events-count"
+ val successEventCount = "success-events-count"
+ val failedEventCount = "failed-events-count"
+ val skippedEventCount = "skipped-events-count"
+ val emptySchemaEventCount = "empty-schema-events-count"
+ val emptyPropsEventCount = "empty-props-events-count"
+ val esFailedEventCount = "elasticsearch-error-events-count"
+
+ // Consumers
+ val transactionEventConsumer = "transaction-event-processor-consumer"
+ val auditEventGeneratorFunction = "audit-event-generator-function"
+ val auditHistoryIndexerFunction = "audit-history-indexer-function"
+ val transactionEventProducer = "transaction-event-processor-producer"
+
+
+ val basePath = config.getString("schema.basePath")
+ val configVersion = "1.0"
+
+ // ES Configs
+ val esConnectionInfo = config.getString("es.basePath")
+
+ val timeZone = if (config.hasPath("timezone")) config.getString("timezone") else "IST"
+ val auditHistoryIndex = "kp_audit_log"
+ val operationCreate = "CREATE"
+ val auditHistoryIndexType = "ah"
+}
diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorStreamTask.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorStreamTask.scala
new file mode 100644
index 000000000..8bc584c77
--- /dev/null
+++ b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorStreamTask.scala
@@ -0,0 +1,72 @@
+package org.sunbird.job.transaction.task
+
+import java.io.File
+import java.util
+import com.typesafe.config.ConfigFactory
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.sunbird.job.connector.FlinkKafkaConnector
+import org.sunbird.job.transaction.domain.Event
+import org.sunbird.job.transaction.functions.{AuditEventGenerator, AuditHistoryIndexer}
+import org.sunbird.job.util.{ElasticSearchUtil, FlinkUtil, HttpUtil}
+
+
+class TransactionEventProcessorStreamTask(config: TransactionEventProcessorConfig, kafkaConnector: FlinkKafkaConnector, esUtil: ElasticSearchUtil) {
+ def process(): Unit = {
+ implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(config)
+ // implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
+ implicit val eventTypeInfo: TypeInformation[Event] = TypeExtractor.getForClass(classOf[Event])
+ implicit val mapTypeInfo: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]])
+ implicit val mapTypeInfoEs: TypeInformation[util.Map[String, Any]] = TypeExtractor.getForClass(classOf[util.Map[String, Any]])
+ implicit val stringTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String])
+
+ val inputStream = env.addSource(kafkaConnector.kafkaJobRequestSource[Event](config.kafkaInputTopic)).name(config.transactionEventConsumer)
+ .uid(config.transactionEventConsumer).setParallelism(config.kafkaConsumerParallelism)
+
+ if(config.auditEventGenerator) {
+ val auditEventGeneratorStreamTask = inputStream.rebalance
+ .process(new AuditEventGenerator(config))
+ .name(config.auditEventGeneratorFunction)
+ .uid(config.auditEventGeneratorFunction)
+ .setParallelism(config.parallelism)
+
+ auditEventGeneratorStreamTask.getSideOutput(config.auditOutputTag).addSink(kafkaConnector.kafkaStringSink(config.kafkaOutputTopic))
+ .name(config.transactionEventProducer).uid(config.transactionEventProducer).setParallelism(config.kafkaProducerParallelism)
+ }
+
+ if(config.auditHistoryIndexer){
+ inputStream.rebalance
+ .keyBy(new TransactionEventKeySelector)
+ .process(new AuditHistoryIndexer(config, esUtil))
+ .name(config.auditHistoryIndexerFunction)
+ .uid(config.auditHistoryIndexerFunction)
+ .setParallelism(config.parallelism)
+ }
+ env.execute(config.jobName)
+ }
+}
+
+// $COVERAGE-OFF$ Disabling scoverage as the below code can only be invoked within flink cluster
+object TransactionEventProcessorStreamTask {
+
+ def main(args: Array[String]): Unit = {
+ val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path"))
+ val config = configFilePath.map {
+ path => ConfigFactory.parseFile(new File(path)).resolve()
+ }.getOrElse(ConfigFactory.load("transaction-event-processor.conf").withFallback(ConfigFactory.systemEnvironment()))
+ val transactionEventProcessorConfig = new TransactionEventProcessorConfig(config)
+ val kafkaUtil = new FlinkKafkaConnector(transactionEventProcessorConfig)
+ val esUtil:ElasticSearchUtil = null
+ val task = new TransactionEventProcessorStreamTask(transactionEventProcessorConfig, kafkaUtil, esUtil)
+ task.process()
+ }
+}
+
+// $COVERAGE-ON$
+
+class TransactionEventKeySelector extends KeySelector[Event, String] {
+ override def getKey(in: Event): String = in.id.replace(".img", "")
+}
diff --git a/transaction-event-processor/src/test/resources/logback-test.xml b/transaction-event-processor/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..e81294323
--- /dev/null
+++ b/transaction-event-processor/src/test/resources/logback-test.xml
@@ -0,0 +1,16 @@
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/transaction-event-processor/src/test/resources/test.conf b/transaction-event-processor/src/test/resources/test.conf
new file mode 100644
index 000000000..26207d65d
--- /dev/null
+++ b/transaction-event-processor/src/test/resources/test.conf
@@ -0,0 +1,25 @@
+include "base-config.conf"
+
+kafka {
+ input.topic = "sunbirddev.learning.graph.events"
+ output.topic = "sunbirddev.telemetry.raw"
+ groupId = "sunbirddev-transaction-event-processor-group"
+}
+
+task {
+ consumer.parallelism = 1
+ parallelism = 1
+ producer.parallelism = 1
+ window.time = 60
+}
+
+schema {
+ basePath = "https://sunbirddev.blob.core.windows.net/sunbird-content-dev/schemas/local"
+}
+
+channel.default = "org.sunbird"
+
+job {
+ audit-event-generator = true
+ audit-history-indexer = false
+}
diff --git a/transaction-event-processor/src/test/scala/org/sunbird/job/fixture/EventFixture.scala b/transaction-event-processor/src/test/scala/org/sunbird/job/fixture/EventFixture.scala
new file mode 100644
index 000000000..eb9da761c
--- /dev/null
+++ b/transaction-event-processor/src/test/scala/org/sunbird/job/fixture/EventFixture.scala
@@ -0,0 +1,64 @@
+package org.sunbird.job.fixture
+
+object EventFixture {
+
+ val EVENT_1: String =
+ """
+ |{"ets":1518517878987,"nodeUniqueId":"do_11243969846440755213","requestId":null,"transactionData":{"properties":{"code":{"ov":null,"nv":"test_code"},"keywords":{"ov":null,"nv":["colors","games"]},"channel":{"ov":null,"nv":"in.ekstep"},"language":{"ov":null,"nv":["English"]},"mimeType":{"ov":null,"nv":"application/pdf"},"idealScreenSize":{"ov":null,"nv":"normal"},"createdOn":{"ov":null,"nv":"2018-02-13T16:01:18.947+0530"},"contentDisposition":{"ov":null,"nv":"inline"},"contentEncoding":{"ov":null,"nv":"identity"},"lastUpdatedOn":{"ov":null,"nv":"2018-02-13T16:01:18.947+0530"},"contentType":{"ov":null,"nv":"Story"},"audience":{"ov":null,"nv":["Learner"]},"IL_SYS_NODE_TYPE":{"ov":null,"nv":"DATA_NODE"},"os":{"ov":null,"nv":["All"]},"visibility":{"ov":null,"nv":"Default"},"mediaType":{"ov":null,"nv":"content"},"osId":{"ov":null,"nv":"org.ekstep.quiz.app"},"versionKey":{"ov":null,"nv":"1518517878947"},"idealScreenDensity":{"ov":null,"nv":"hdpi"},"framework":{"ov":null,"nv":"NCF"},"compatibilityLevel":{"ov":null,"nv":1.0},"IL_FUNC_OBJECT_TYPE":{"ov":null,"nv":"Content"},"name":{"ov":null,"nv":"Untitled Resource"},"IL_UNIQUE_ID":{"ov":null,"nv":"do_11243969846440755213"},"status":{"ov":null,"nv":"Draft"},"resourceType":{"ov":null,"nv":"Story"}}},"operationType":"CREATE","nodeGraphId":113603,"label":"Untitled Resource","graphId":"domain","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2018-02-13T16:01:18.987+0530","objectType":"Content"}
+ |""".stripMargin
+
+ val EVENT_2: String =
+ """
+ |{"ets":1552464504681,"channel":"in.ekstep","transactionData":{"properties":{"lastStatusChangedOn":{"ov":"2019-03-13T13:25:43.129+0530","nv":"2019-03-13T13:38:24.358+0530"},"lastSubmittedOn":{"ov":null,"nv":"2019-03-13T13:38:21.901+0530"},"lastUpdatedOn":{"ov":"2019-03-13T13:36:20.093+0530","nv":"2019-03-13T13:38:24.399+0530"},"status":{"ov":"Draft","nv":"Review"},"versionKey":{"ov":"1552464380093","nv":"1552464504399"}}},"label":"Resource Content 1","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2019-03-13T13:38:24.680+0530","objectType":"Content","nodeUniqueId":"do_11271778298376192013","requestId":null,"operationType":"UPDATE","nodeGraphId":590883,"graphId":"domain"}
+ |""".stripMargin
+
+ val EVENT_3: String =
+ """
+ |{"ets":1552464380225,"channel":"in.ekstep","transactionData":{"properties":{"s3Key":{"ov":null,"nv":"content/do_11271778298376192013/artifact/pdf_1552464372724.pdf"},"size":{"ov":null,"nv":433994.0},"artifactUrl":{"ov":null,"nv":"https://sunbirddev.blob.core.windows.net/sunbird-content-dev/content/do_11271778298376192013/artifact/pdf_1552464372724.pdf"},"lastUpdatedOn":{"ov":"2019-03-13T13:25:43.129+0530","nv":"2019-03-13T13:36:20.093+0530"},"versionKey":{"ov":"1552463743129","nv":"1552464380093"}}},"label":"Resource Content 1","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2019-03-13T13:36:20.223+0530","objectType":"Content","nodeUniqueId":"do_11271778298376192013","requestId":null,"operationType":"UPDATE","nodeGraphId":590883,"graphId":"domain"}
+ |""".stripMargin
+
+ val EVENT_4: String =
+ """
+ |{"ets":1552645516180,"channel":"in.ekstep","transactionData":{"properties":{"ownershipType":{"ov":null,"nv":["createdBy"]},"code":{"ov":null,"nv":"test.res.1"},"channel":{"ov":null,"nv":"in.ekstep"},"language":{"ov":null,"nv":["English"]},"mimeType":{"ov":null,"nv":"application/pdf"},"idealScreenSize":{"ov":null,"nv":"normal"},"createdOn":{"ov":null,"nv":"2019-03-15T15:55:16.071+0530"},"contentDisposition":{"ov":null,"nv":"inline"},"lastUpdatedOn":{"ov":null,"nv":"2019-03-15T15:55:16.071+0530"},"contentEncoding":{"ov":null,"nv":"identity"},"dialcodeRequired":{"ov":null,"nv":"No"},"contentType":{"ov":null,"nv":"Resource"},"lastStatusChangedOn":{"ov":null,"nv":"2019-03-15T15:55:16.071+0530"},"audience":{"ov":null,"nv":["Learner"]},"IL_SYS_NODE_TYPE":{"ov":null,"nv":"DATA_NODE"},"visibility":{"ov":null,"nv":"Default"},"os":{"ov":null,"nv":["All"]},"mediaType":{"ov":null,"nv":"content"},"osId":{"ov":null,"nv":"org.ekstep.quiz.app"},"versionKey":{"ov":null,"nv":"1552645516071"},"idealScreenDensity":{"ov":null,"nv":"hdpi"},"framework":{"ov":null,"nv":"NCF"},"compatibilityLevel":{"ov":null,"nv":1.0},"IL_FUNC_OBJECT_TYPE":{"ov":null,"nv":"Content"},"name":{"ov":null,"nv":"Resource Content 1"},"IL_UNIQUE_ID":{"ov":null,"nv":"do_11271927206783385611"},"status":{"ov":null,"nv":"Draft"}}},"mid":"9ea9ae7a-9cc1-493d-aac3-3c66cd9ff01b","label":"Resource Content 1","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2019-03-15T15:55:16.178+0530","objectType":"Content","nodeUniqueId":"do_11271927206783385611","requestId":null,"operationType":"CREATE","nodeGraphId":590921,"graphId":"domain"}
+ |""".stripMargin
+
+ val EVENT_5: String =
+ """
+ |{"ets":1.614227716965E12,"transactionData":{"removedTags":[],"addedRelations":[{"rel":"hasSequenceMember","id":"do_11322375329257062411","label":"event 4.1: 11-1","dir":"OUT","type":"Event","relMetadata":{"IL_SEQUENCE_INDEX":1.0}}],"removedRelations":[],"addedTags":[],"properties":{}},"mid":"fc0bb006-7269-4b10-96c3-10672fca53a0","label":"eventset 4","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2021-02-25T04:35:16.965+0000","objectType":null,"partition":1,"nodeUniqueId":"do_11322375344215654413","operationType":"UPDATE","nodeGraphId":509461.0,"graphId":"domain"}
+ |""".stripMargin
+
+ val EVENT_6: String =
+ """
+ |{"ets":1502102183388,"nodeUniqueId":"do_112276071067320320114","requestId":null,"transactionData":{"addedRelations":[{"rel":"hasSequenceMember","id":"do_1123032073439723521148","label":"Test unit 11","dir":"IN","type":"Content"}],"removedRelations":[],"properties":{"name":{"nv":"","ov":""}}},"operationType":"CREATE","nodeGraphId":105631,"label":"collaborator test","graphId":"domain","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2017-08-07T10:36:23.388+0000","objectType":"Content"}
+ |""".stripMargin
+
+ val EVENT_7: String =
+ """
+ |{"ets":1614776853781,"channel":"b00bc992ef25f1a9a8d63291e20efc8d","transactionData":{"properties":{"dialcodes":{"ov":null,"nv":["K1W6L6"]}}},"mid":"5b5633a2-3c18-49a6-8822-6d7b85338104","label":"Test Again","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2021-03-03T13:07:33.781+0000","objectType":"Collection","nodeUniqueId":"do_1132282511204024321262","requestId":null,"operationType":"UPDATE","nodeGraphId":510086,"graphId":"domain"}
+ |""".stripMargin
+
+ val EVENT_8: String =
+ """
+ |{"ets":1.614227716965E12,"transactionData":{"removedTags":[],"addedRelations":[{"rel":"hasSequenceMember","id":"do_11322375329257062411","label":"event 4.1: 11-1","dir":"OUT","type":"Event","relMetadata":{"IL_SEQUENCE_INDEX":1.0}}],"removedRelations":[],"addedTags":[],"properties":{}},"mid":"fc0bb006-7269-4b10-96c3-10672fca53a0","label":"eventset 4","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2021-02-25T04:35:16.965+0000","objectType":"RandomSchemaType","partition":1,"nodeUniqueId":"do_11322375344215654413","operationType":"UPDATE","nodeGraphId":509461.0,"graphId":"domain"}
+ |""".stripMargin
+
+ val EVENT_9: String =
+ """
+ |{"ets":1518517878987,"nodeUniqueId":"do_11243969846440755213","requestId":null,"transactionData":{"properties":{"code":{"ov":null,"nv":"test_code"},"keywords":{"ov":null,"nv":["colors","games"]},"channel":{"ov":null,"nv":"in.ekstep"},"language":{"ov":null,"nv":["English"]},"mimeType":{"ov":null,"nv":"application/pdf"},"idealScreenSize":{"ov":null,"nv":"normal"},"createdOn":{"ov":null,"nv":"2018-02-13T16:01:18.947+0530"},"contentDisposition":{"ov":null,"nv":"inline"},"contentEncoding":{"ov":null,"nv":"identity"},"lastUpdatedOn":{"ov":null,"nv":"2018-02-13T16:01:18.947+0530"},"contentType":{"ov":null,"nv":"Story"},"audience":{"ov":null,"nv":["Learner"]},"IL_SYS_NODE_TYPE":{"ov":null,"nv":"DATA_NODE"},"os":{"ov":null,"nv":["All"]},"visibility":{"ov":null,"nv":"Default"},"mediaType":{"ov":null,"nv":"content"},"osId":{"ov":null,"nv":"org.ekstep.quiz.app"},"versionKey":{"ov":null,"nv":"1518517878947"},"idealScreenDensity":{"ov":null,"nv":"hdpi"},"framework":{"ov":null,"nv":"NCF"},"compatibilityLevel":{"ov":null,"nv":1.0},"IL_FUNC_OBJECT_TYPE":{"ov":null,"nv":"Content"},"name":{"ov":null,"nv":"Untitled Resource"},"IL_UNIQUE_ID":{"ov":null,"nv":"do_11243969846440755213"},"status":{"ov":null,"nv":"Draft"},"resourceType":{"ov":null,"nv":"Story"}}},"operationType":"CREATE","nodeGraphId":113603,"label":"Untitled Resource","graphId":"domain","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2018-02-13T16:01:18.987+0530","objectType":"Content"}
+ |""".stripMargin
+
+ val EVENT_10: String =
+ """
+ |{"ets":1502102183388,"nodeUniqueId":"do_112276071067320320114","requestId":null,"transactionData":{"addedRelations":[{"rel":"hasSequenceMember","id":"do_1123032073439723521148","label":"Test unit 11","dir":"IN","type":"Content"}],"removedRelations":[],"properties":{"name":{"nv":"","ov":""}}},"operationType":"CREATE","nodeGraphId":105631,"label":"collaborator test","graphId":"domain","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2017-08-07T10:36:23.388+0000","objectType":"Content"}
+ |""".stripMargin
+
+ val EVENT_11: String =
+ """
+ |{"ets":1615191835547,"channel":"01309282781705830427","transactionData":{"removedTags":[],"addedRelations":[],"removedRelations":[{"rel":"associatedTo","id":"do_113198273083662336127","label":"qq\n","dir":"OUT","type":"AssessmentItem","relMetadata":{}}],"addedTags":[],"properties":{}},"mid":"98145983-63dc-4d55-866c-248d49306ad8","label":"ECML_CHANGES","nodeType":"DATA_NODE","userId":"5a587cc1-e018-4859-a0a8-e842650b9d64","createdOn":"2021-03-08T08:23:55.547+0000","objectType":"Content","nodeUniqueId":"do_1132316371218268161118","requestId":null,"operationType":"UPDATE","nodeGraphId":510477,"graphId":"domain"}
+ |""".stripMargin
+
+ val EVENT_12: String =
+ """
+ |{"ets":1500888709490,"requestId":null,"transactionData":{"properties":{"IL_SYS_NODE_TYPE":{"ov":null,"nv":"DATA_NODE"},"morphology":{"ov":null,"nv":false},"consumerId":{"ov":null,"nv":"a6654129-b58d-4dd8-9cf2-f8f3c2f458bc"},"channel":{"ov":null,"nv":"in.ekstep"},"lemma":{"ov":null,"nv":"ವಿಶ್ಲೇಷಣೆ"},"createdOn":{"ov":null,"nv":"2017-07-24T09:32:18.130+0000"},"versionKey":{"ov":null,"nv":"1500888738130"},"IL_FUNC_OBJECT_TYPE":{"ov":null,"nv":"Word"},"ekstepWordnet":{"ov":null,"nv":false},"lastUpdatedOn":{"ov":null,"nv":"2017-07-24T09:32:18.130+0000"},"isPhrase":{"ov":null,"nv":false},"IL_UNIQUE_ID":{"ov":null,"nv":"ka_11229528054276096015"},"status":{"ov":null,"nv":"Draft"}}},"nodeGraphId":433342,"label":"ವಿಶ್ಲೇಷಣೆ","graphId":"ka","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2017-07-24T09:31:49.490+0000","objectType":"Word"}
+ |""".stripMargin
+}
\ No newline at end of file
diff --git a/transaction-event-processor/src/test/scala/org/sunbird/job/spec/TransactionEventProcessorTaskTestSpec.scala b/transaction-event-processor/src/test/scala/org/sunbird/job/spec/TransactionEventProcessorTaskTestSpec.scala
new file mode 100644
index 000000000..fb6a63a0a
--- /dev/null
+++ b/transaction-event-processor/src/test/scala/org/sunbird/job/spec/TransactionEventProcessorTaskTestSpec.scala
@@ -0,0 +1,207 @@
+package org.sunbird.job.spec
+
+import java.util
+import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
+import okhttp3.mockwebserver.{MockResponse, MockWebServer}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.runtime.client.JobExecutionException
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.test.util.MiniClusterWithClientResource
+import org.sunbird.job.util.{ElasticSearchUtil, JSONUtil}
+import org.mockito.Mockito
+import org.mockito.Mockito.when
+import org.sunbird.job.connector.FlinkKafkaConnector
+import org.sunbird.job.transaction.domain.Event
+import org.sunbird.job.fixture.EventFixture
+import org.sunbird.job.transaction.task.{TransactionEventProcessorConfig, TransactionEventProcessorStreamTask}
+import org.sunbird.spec.{BaseMetricsReporter, BaseTestSpec}
+
+class TransactionEventProcessorTaskTestSpec extends BaseTestSpec {
+ implicit val mapTypeInfo: TypeInformation[java.util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[java.util.Map[String, AnyRef]])
+
+ val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(testConfiguration())
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build)
+ val mockKafkaUtil: FlinkKafkaConnector = mock[FlinkKafkaConnector](Mockito.withSettings().serializable())
+ val config: Config = ConfigFactory.load("test.conf")
+ val jobConfig: TransactionEventProcessorConfig = new TransactionEventProcessorConfig(config)
+ val esUtil:ElasticSearchUtil = null
+ val server = new MockWebServer()
+
+ var currentMilliSecond = 1605816926271L
+
+ override protected def beforeAll(): Unit = {
+ BaseMetricsReporter.gaugeMetrics.clear()
+ flinkCluster.before()
+ super.beforeAll()
+ }
+
+ override protected def afterAll(): Unit = {
+ flinkCluster.after()
+ super.afterAll()
+ }
+
+ "TransactionEventProcessorStreamTask" should "generate audit event" in {
+ when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new AuditEventMapSource)
+ when(mockKafkaUtil.kafkaStringSink(jobConfig.kafkaOutputTopic)).thenReturn(new AuditEventSink)
+ if (jobConfig.auditEventGenerator) {
+ new TransactionEventProcessorStreamTask(jobConfig, mockKafkaUtil, esUtil).process()
+
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.totalEventsCount}").getValue() should be(2)
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.skippedEventCount}").getValue() should be(0)
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.successEventCount}").getValue() should be(1)
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.failedEventCount}").getValue() should be(0)
+
+ AuditEventSink.values.size() should be(1)
+ AuditEventSink.values.forEach(event => {
+ val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](event)
+ eventMap("eid") should be("AUDIT")
+ eventMap("ver") should be("3.0")
+ eventMap("edata") shouldNot be(null)
+ })
+ }
+ }
+
+ "TransactionEventProcessorStreamTask" should "not generate audit event" in {
+ when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new AuditEventMapSource)
+ when(mockKafkaUtil.kafkaStringSink(jobConfig.kafkaOutputTopic)).thenReturn(new AuditEventSink)
+
+ val setBoolean = config.withValue("job.audit-event-generator", ConfigValueFactory.fromAnyRef(false))
+ val newConfig: TransactionEventProcessorConfig = new TransactionEventProcessorConfig(setBoolean)
+ if (newConfig.auditEventGenerator) {
+
+ new TransactionEventProcessorStreamTask(newConfig, mockKafkaUtil, esUtil).process()
+
+ BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.totalEventsCount}").getValue() should be(2)
+ BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.skippedEventCount}").getValue() should be(2)
+ BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.successEventCount}").getValue() should be(0)
+
+ }
+ }
+
+ "TransactionEventProcessorStreamTask" should "increase metric for unknown schema" in {
+ when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new RandomObjectTypeAuditEventGeneratorMapSource)
+ when(mockKafkaUtil.kafkaStringSink(jobConfig.kafkaOutputTopic)).thenReturn(new AuditEventSink)
+ if (jobConfig.auditEventGenerator) {
+
+ new TransactionEventProcessorStreamTask(jobConfig, mockKafkaUtil, esUtil).process()
+
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.totalEventsCount}").getValue() should be(1)
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.emptySchemaEventCount}").getValue() should be(1)
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.emptyPropsEventCount}").getValue() should be(1)
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.successEventCount}").getValue() should be(0)
+ }
+ }
+
+
+ "TransactionEventProcessorStreamTask" should "not generate audit history indexer event" in {
+ server.start(9200)
+ server.enqueue(new MockResponse().setHeader(
+ "Content-Type", "application/json"
+ ).setBody("""{"_index":"kp_audit_log_2018_7","_type":"ah","_id":"HLZ-1ngBtZ15DPx6ENjU","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1}"""))
+
+ when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new AuditHistoryMapSource)
+ if (jobConfig.auditHistoryIndexer) {
+ new TransactionEventProcessorStreamTask(jobConfig, mockKafkaUtil, esUtil).process()
+
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.totalEventsCount}").getValue() should be(2)
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.successEventCount}").getValue() should be(0)
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.failedEventCount}").getValue() should be(1)
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.esFailedEventCount}").getValue() should be(1)
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.skippedEventCount}").getValue() should be(0)
+ }
+ }
+
+ "TransactionEventProcessorStreamTask" should "generate audit history indexer event" in {
+ server.enqueue(new MockResponse().setHeader(
+ "Content-Type", "application/json"
+ ).setBody("""{"_index":"kp_audit_log_2018_7","_type":"ah","_id":"HLZ-1ngBtZ15DPx6ENjU","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1}"""))
+
+ when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new AuditHistoryMapSource)
+ val setBoolean = config.withValue("job.audit-history-indexer", ConfigValueFactory.fromAnyRef(true))
+ val newConfig: TransactionEventProcessorConfig = new TransactionEventProcessorConfig(setBoolean)
+ if (newConfig.auditHistoryIndexer) {
+ new TransactionEventProcessorStreamTask(newConfig, mockKafkaUtil, esUtil).process()
+ BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.totalEventsCount}").getValue() should be(2)
+ BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.successEventCount}").getValue() should be(2)
+ BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.failedEventCount}").getValue() should be(0)
+ BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.esFailedEventCount}").getValue() should be(0)
+ BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.skippedEventCount}").getValue() should be(0)
+ server.close()
+ }
+ }
+
+ "TransactionEventProcessorStreamTask" should "throw exception and increase es error count" in {
+ when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new AuditHistoryMapSource)
+
+ try {
+ new TransactionEventProcessorStreamTask(jobConfig, mockKafkaUtil, esUtil).process()
+ } catch {
+ case ex: JobExecutionException =>
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.totalEventsCount}").getValue() should be(1)
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.successEventCount}").getValue() should be(0)
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.failedEventCount}").getValue() should be(0)
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.esFailedEventCount}").getValue() should be(1)
+ BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.skippedEventCount}").getValue() should be(0)
+ }
+ }
+}
+
+class AuditEventMapSource extends SourceFunction[Event] {
+
+ override def run(ctx: SourceContext[Event]) {
+ ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_1), 0, 10))
+ ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_5), 0, 11))
+}
+
+ override def cancel(): Unit = {}
+}
+
+class AuditHistoryMapSource extends SourceFunction[Event] {
+
+ override def run(ctx: SourceContext[Event]) {
+
+ ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_9), 0, 10))
+ ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_12),0,11))
+ }
+
+ override def cancel(): Unit = {}
+}
+
+class EventMapSource extends SourceFunction[Event] {
+
+ override def run(ctx: SourceContext[Event]) {
+
+ ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_1), 0, 10))
+ ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_9),0,11))
+ }
+
+ override def cancel(): Unit = {}
+}
+class RandomObjectTypeAuditEventGeneratorMapSource extends SourceFunction[Event] {
+
+ override def run(ctx: SourceContext[Event]) {
+ ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_8), 0, 10))
+ }
+
+ override def cancel(): Unit = {}
+}
+
+class AuditEventSink extends SinkFunction[String] {
+
+ override def invoke(value: String): Unit = {
+ synchronized {
+ AuditEventSink.values.add(value)
+ }
+ }
+}
+
+object AuditEventSink {
+ val values: util.List[String] = new util.ArrayList()
+}
\ No newline at end of file
diff --git a/transaction-event-processor/src/test/scala/org/sunbird/job/spec/service/TransactionEventProcessorServiceTestSpec.scala b/transaction-event-processor/src/test/scala/org/sunbird/job/spec/service/TransactionEventProcessorServiceTestSpec.scala
new file mode 100644
index 000000000..b70a23e7c
--- /dev/null
+++ b/transaction-event-processor/src/test/scala/org/sunbird/job/spec/service/TransactionEventProcessorServiceTestSpec.scala
@@ -0,0 +1,158 @@
+package org.sunbird.job.spec.service
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.mockito.Mockito
+import org.slf4j.LoggerFactory
+import org.sunbird.job.Metrics
+import org.sunbird.job.transaction.domain.{AuditHistoryRecord, Event}
+import org.sunbird.job.fixture.EventFixture
+import org.sunbird.job.transaction.functions.{AuditEventGenerator, AuditHistoryIndexer}
+import org.sunbird.job.transaction.task.TransactionEventProcessorConfig
+import org.sunbird.job.util.{ElasticSearchUtil, JSONUtil}
+import org.sunbird.spec.BaseTestSpec
+
+import java.util
+
+class TransactionEventProcessorTestSpec extends BaseTestSpec {
+ implicit val mapTypeInfo: TypeInformation[java.util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[java.util.Map[String, AnyRef]])
+ implicit val mpTypeInfo: TypeInformation[util.Map[String, Any]] = TypeExtractor.getForClass(classOf[util.Map[String, Any]])
+ implicit val strTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String])
+
+ val config: Config = ConfigFactory.load("test.conf")
+ lazy val jobConfig: TransactionEventProcessorConfig = new TransactionEventProcessorConfig(config)
+ lazy val mockMetrics = mock[Metrics](Mockito.withSettings().serializable())
+ lazy val auditEventGenerator: AuditEventGenerator = new AuditEventGenerator(jobConfig)
+ lazy val mockElasticUtil:ElasticSearchUtil = mock[ElasticSearchUtil](Mockito.withSettings().serializable())
+ lazy val auditHistoryIndexer: AuditHistoryIndexer = new AuditHistoryIndexer(jobConfig,mockElasticUtil)
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ }
+
+ override protected def afterAll(): Unit = {
+ super.afterAll()
+ }
+
+ "TransactionEventProcessorService" should "generate audit event" in {
+ val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_1)
+
+ val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics)
+ val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr)
+
+ eventMap("eid") should be("AUDIT")
+ eventMap("ver") should be("3.0")
+ eventMap("edata") shouldNot be(null)
+ }
+
+ "TransactionEventProcessorService" should "add duration of status change" in {
+ val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_2)
+
+ val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics)
+ val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr)
+
+ eventMap("eid") should be("AUDIT")
+ eventMap("ver") should be("3.0")
+ eventMap("edata") shouldNot be(null)
+ val duration = eventMap("edata").asInstanceOf[Map[String, AnyRef]]("duration").asInstanceOf[Int]
+ duration should be(761)
+ }
+
+ "TransactionEventProcessorService" should "add Duration as null" in {
+ val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_3)
+
+ val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics)
+ val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr)
+
+ eventMap("eid") should be("AUDIT")
+ eventMap("ver") should be("3.0")
+ eventMap("edata") shouldNot be(null)
+ val duration = eventMap("edata").asInstanceOf[Map[String, AnyRef]].getOrElse("duration", null)
+ duration should be(null)
+ }
+
+ "TransactionEventProcessorService" should "generate audit for content creation" in {
+ val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_4)
+
+ val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics)
+ val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr)
+
+ eventMap("eid") should be("AUDIT")
+ eventMap("ver") should be("3.0")
+ eventMap("edata") shouldNot be(null)
+ val duration = eventMap("edata").asInstanceOf[Map[String, AnyRef]].getOrElse("duration", null)
+ duration should be(null)
+ }
+
+ "TransactionEventProcessorService" should "skip audit for objectType is null" in {
+ val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_5)
+
+ val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics)
+
+ eventStr should be("{\"object\": {\"type\":null}}")
+ }
+
+ "TransactionEventProcessorService" should "event for addedRelations" in {
+ val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_6)
+
+ val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics)
+ val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr)
+ eventMap("eid") should be("AUDIT")
+ eventMap("ver") should be("3.0")
+ eventMap("edata").asInstanceOf[Map[String, AnyRef]]("props").asInstanceOf[List[String]] should contain ("name")
+ eventMap("edata").asInstanceOf[Map[String, AnyRef]]("props").asInstanceOf[List[String]] should contain ("collections")
+ val duration = eventMap("edata").asInstanceOf[Map[String, AnyRef]].getOrElse("duration", null)
+ duration should be(null)
+ }
+
+ "TransactionEventProcessorService" should "generate audit for update dialcode" in {
+ val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_7)
+
+ val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics)
+ val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr)
+
+ eventMap("eid") should be("AUDIT")
+ eventMap("edata").asInstanceOf[Map[String, AnyRef]]("props").asInstanceOf[List[String]] should contain ("dialcodes")
+ val cdata = eventMap("cdata").asInstanceOf[List[Map[String, AnyRef]]]
+ cdata.head("id").asInstanceOf[List[String]] should contain ("K1W6L6")
+ cdata.head("type") should be("DialCode")
+ }
+
+ "TransactionEventProcessorService" should "compute duration" in {
+ val ov = "2019-03-13T13:25:43.129+0530"
+ val nv = "2019-03-13T13:38:24.358+0530"
+ val duration = auditEventGenerator.computeDuration(ov, nv)
+ duration should be(761)
+ }
+
+ "TransactionEventProcessorService" should "generate es log" in {
+ val inputEvent: Event = new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_9), 0, 10)
+
+ val auditHistoryRec: AuditHistoryRecord = auditHistoryIndexer.getAuditHistory(inputEvent);
+
+ auditHistoryRec.objectId should be(inputEvent.nodeUniqueId)
+ auditHistoryRec.objectType should be(inputEvent.objectType)
+ auditHistoryRec.logRecord should be("""{"properties":{"mediaType":{"nv":"content"},"name":{"nv":"Untitled Resource"},"createdOn":{"nv":"2018-02-13T16:01:18.947+0530"},"channel":{"nv":"in.ekstep"},"lastUpdatedOn":{"nv":"2018-02-13T16:01:18.947+0530"},"IL_FUNC_OBJECT_TYPE":{"nv":"Content"},"resourceType":{"nv":"Story"},"compatibilityLevel":{"nv":1.0},"audience":{"nv":["Learner"]},"os":{"nv":["All"]},"IL_SYS_NODE_TYPE":{"nv":"DATA_NODE"},"framework":{"nv":"NCF"},"versionKey":{"nv":"1518517878947"},"mimeType":{"nv":"application/pdf"},"code":{"nv":"test_code"},"contentType":{"nv":"Story"},"language":{"nv":["English"]},"status":{"nv":"Draft"},"keywords":{"nv":["colors","games"]},"idealScreenSize":{"nv":"normal"},"contentEncoding":{"nv":"identity"},"osId":{"nv":"org.ekstep.quiz.app"},"IL_UNIQUE_ID":{"nv":"do_11243969846440755213"},"contentDisposition":{"nv":"inline"},"visibility":{"nv":"Default"},"idealScreenDensity":{"nv":"hdpi"}}}""")
+ }
+
+ "TransactionEventProcessorService" should "generate with added relations" in {
+ val inputEvent: Event = new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_10), 0, 11)
+
+ val auditHistoryRec: AuditHistoryRecord = auditHistoryIndexer.getAuditHistory(inputEvent);
+
+ auditHistoryRec.objectId should be(inputEvent.nodeUniqueId)
+ auditHistoryRec.objectType should be(inputEvent.objectType)
+ auditHistoryRec.logRecord should be("""{"addedRelations":[{"label":"Test unit 11","rel":"hasSequenceMember","dir":"IN","id":"do_1123032073439723521148","type":"Content"}],"removedRelations":[],"properties":{"name":{"nv":"","ov":""}}}""")
+ }
+
+ "TransactionEventProcessorService" should "generate with removed relations" in {
+ val inputEvent: Event = new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_11), 0, 12)
+
+ val auditHistoryRec: AuditHistoryRecord = auditHistoryIndexer.getAuditHistory(inputEvent);
+
+ auditHistoryRec.objectId should be(inputEvent.nodeUniqueId)
+ auditHistoryRec.objectType should be(inputEvent.objectType)
+ auditHistoryRec.logRecord should be("""{"addedTags":[],"addedRelations":[],"properties":{},"removedRelations":[{"label":"qq\n","rel":"associatedTo","dir":"OUT","id":"do_113198273083662336127","relMetadata":{},"type":"AssessmentItem"}],"removedTags":[]}""")
+ }
+}