diff --git a/jobs-distribution/pom.xml b/jobs-distribution/pom.xml index 865c8041a..8167f011b 100644 --- a/jobs-distribution/pom.xml +++ b/jobs-distribution/pom.xml @@ -19,19 +19,6 @@ flink-streaming-scala_${scala.version} ${flink.version} jar - - - - - - - - - - - - - org.sunbird post-publish-processor @@ -44,12 +31,6 @@ 1.0.0 jar - - - - - - org.sunbird content-publish @@ -62,36 +43,12 @@ 1.0.0 jar - - - - - - - - - - - - - - - - - - org.sunbird asset-enrichment 1.0.0 jar - - org.sunbird - audit-history-indexer - 1.0.0 - jar - org.sunbird auto-creator-v2 @@ -104,24 +61,12 @@ 1.0.0 jar - - - - - - org.sunbird qrcode-image-generator 1.0.0 jar - - org.sunbird - audit-event-generator - 1.0.0 - jar - org.sunbird dialcode-context-updater @@ -152,6 +97,12 @@ 1.0.0 jar + + org.sunbird + transaction-event-generator + 1.0.0 + jar + diff --git a/pom.xml b/pom.xml index 3de752d78..f466c82aa 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ 3.0.0 - org.sunbird + org.sunbird knowledge-platform-jobs 1.0 pom @@ -27,18 +27,12 @@ jobs-core - - post-publish-processor - publish-pipeline search-indexer - auto-creator-v2 content-auto-creator - audit-history-indexer - audit-event-generator - + transaction-event-processor qrcode-image-generator dialcode-context-updater cassandra-data-migration diff --git a/transaction-event-processor/README.md b/transaction-event-processor/README.md new file mode 100644 index 000000000..025917332 --- /dev/null +++ b/transaction-event-processor/README.md @@ -0,0 +1,65 @@ +# Transaction Event Processor + +Transaction Event Processor job generates an audit event for every transaction in Graph updation for content data modification. The job utilizes the neo4j mutation data to create an index of transactions for audit purposes. For each neo4j transaction, the job will audit the old and new values of the updated object. + +## Getting Started + +These instructions will get you a copy of the project up and running on your local machine for development and testing purposes. See deployment for notes on how to deploy the project on a yarn or kubernetes. +### Prerequisites + +1. Download flink-1.13.6-scala_2.12 from [apache-flink-downloads](https://www.apache.org/dyn/closer.lua/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz). +2. Download [hadoop dependencies](https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar) (only for running on Yarn). Copy the hadoop dependency jar under lib folder of the flink download. +3. export HADOOP_CLASSPATH=`/hadoop classpath` either in .bashrc or current execution shell. +4. Docker installed. +5. A running yarn cluster or a kubernetes cluster. + +### Build + +mvn clean install + +## Deployment + +### Yarn + +Flink requires memory to be allocated for both job-manager and task manager. -yjm parameter assigns job-manager memory and -ytm assigns task-manager memory. + +``` +./bin/flink run -m yarn-cluster -p 2 -yjm 1024m -ytm 1024m /transaction-event-processor/target/transaction-event-processor-0.0.1.jar +``` + +### Kubernetes + +``` +# Create a single node cluster +k3d create --server-arg --no-deploy --server-arg traefik --name flink-cluster --image rancher/k3s:v1.0.0 +# Export the single node cluster into KUBECONFIG in the current shell or in ~/.bashrc. +export KUBECONFIG="$(k3d get-kubeconfig --name='flink-cluster')" + +# Only for Mac OSX +# /usr/local/bin/kubectl -> /Applications/Docker.app/Contents/Resources/bin/kubectl +rm /usr/local/bin/kubectl +brew link --overwrite kubernetes-cli + +# Create a configmap using the flink-configuration-configmap.yaml +kubectl create -f knowledge-platform-job/kubernetes/flink-configuration-configmap.yaml + +# Create pods for jobmanager-service, job-manager and task-manager using the yaml files +kubectl create -f knowledge-platform-job/kubernetes/jobmanager-service.yaml +kubectl create -f knowledge-platform-job/kubernetes/jobmanager-deployment.yaml +kubectl create -f knowledge-platform-job/kubernetes/taskmanager-deployment.yaml + +# Create a port-forwarding for accessing the job-manager UI on localhost:8081 +kubectl port-forward deployment/flink-jobmanager 8081:8081 + +# Submit the job to the Kubernetes single node cluster flink-cluster +./bin/flink run -m localhost:8081 /transaction-event-processor/target/transaction-event-processor-0.0.1.jar + +# Commands to delete the pods created in the cluster +kubectl delete deployment/flink-jobmanager +kubectl delete deployment/flink-taskmanager +kubectl delete service/flink-jobmanager +kubectl delete configmaps/flink-config + +# Command to stop the single-node cluster +k3d stop --name="flink-cluster" +``` diff --git a/transaction-event-processor/pom.xml b/transaction-event-processor/pom.xml new file mode 100644 index 000000000..e5a489cd4 --- /dev/null +++ b/transaction-event-processor/pom.xml @@ -0,0 +1,269 @@ + + + + 4.0.0 + + org.sunbird + knowledge-platform-jobs + 1.0 + + transaction-event-processor + 1.0.0 + jar + transaction-event-processor + + Transaction Event Processor Flink Job + + + + UTF-8 + 1.4.0 + + + + + org.apache.flink + flink-streaming-scala_${scala.version} + ${flink.version} + + + + + com.google.code.gson + gson + 2.10.1 + + + + org.sunbird + jobs-core + 1.0.0 + + + joda-time + joda-time + 2.10.6 + + + org.sunbird + jobs-core + 1.0.0 + test-jar + test + + + org.apache.flink + flink-test-utils_${scala.version} + ${flink.version} + test + + + org.apache.flink + flink-runtime_${scala.version} + ${flink.version} + test + tests + + + org.apache.flink + flink-streaming-java_${scala.version} + ${flink.version} + test + tests + + + org.scalatest + scalatest_${scala.version} + 3.0.6 + test + + + org.mockito + mockito-core + 3.3.3 + test + + + org.sunbird + platform-common + 1.0-beta + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + + + org.apache.logging.log4j + log4j-core + + + org.apache.logging.log4j + log4j-api + + + + + org.sunbird + platform-telemetry + 1.0-beta + + + com.squareup.okhttp3 + mockwebserver + 4.9.1 + test + + + + + src/main/scala + src/test/scala + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 11 + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + + package + + shade + + + + + com.google.code.findbugs:jsr305 + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + org.sunbird.job.transaction.task.TransactionEventProcessorStreamTask + + + + reference.conf + + + + + + + + + net.alchim31.maven + scala-maven-plugin + 4.4.0 + + 11 + 11 + ${scala.maj.version} + false + + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + + maven-surefire-plugin + 2.22.2 + + true + + + + + org.scalatest + scalatest-maven-plugin + 1.0 + + ${project.build.directory}/surefire-reports + . + audit-event-generator-testsuite.txt + + + + test + + test + + + + + + org.scoverage + scoverage-maven-plugin + ${scoverage.plugin.version} + + ${scala.version} + true + true + + + + org.jacoco + jacoco-maven-plugin + 0.8.5 + + + + prepare-agent + + + + + report + test + + report + + + + + + + + diff --git a/transaction-event-processor/src/main/resources/log4j.properties b/transaction-event-processor/src/main/resources/log4j.properties new file mode 100644 index 000000000..33e8e93ef --- /dev/null +++ b/transaction-event-processor/src/main/resources/log4j.properties @@ -0,0 +1,14 @@ +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.RollingFileAppender +log4j.appender.file.file=audit-event-generator.log +log4j.appender.file.append=true +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.MaxFileSize=256KB +log4j.appender.file.MaxBackupIndex=4 +log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +# Suppress the irrelevant (wrong) warnings from the Netty channel handler +log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file + + + diff --git a/transaction-event-processor/src/main/resources/transaction-event-processor.conf b/transaction-event-processor/src/main/resources/transaction-event-processor.conf new file mode 100644 index 000000000..9275a65b3 --- /dev/null +++ b/transaction-event-processor/src/main/resources/transaction-event-processor.conf @@ -0,0 +1,25 @@ +include "base-config.conf" + +kafka { + input.topic = "sunbirddev.learning.graph.events" + output.topic = "sunbirddev.telemetry.raw" + groupId = "sunbirddev-transaction-event-processor-group" +} + +task { + consumer.parallelism = 1 + parallelism = 1 + producer.parallelism = 1 + window.time = 60 +} + +schema { + basePath = "https://sunbirddev.blob.core.windows.net/sunbird-content-dev/schemas/local" +} + +channel.default = "org.sunbird" + +job { + audit-event-generator = true + audit-history-indexer = true +} diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/CaseClasses.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/CaseClasses.scala new file mode 100644 index 000000000..e6dbfa930 --- /dev/null +++ b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/CaseClasses.scala @@ -0,0 +1,7 @@ +package org.sunbird.job.transaction.domain + +import java.io.Serializable +import java.util.Date + +@SerialVersionUID(-5779950964487302125L) +case class AuditHistoryRecord(var objectId: String, objectType: String, label: String, graphId: String, var userId: String, requestId: String, logRecord: String, operation: String, createdOn: Date) extends Serializable \ No newline at end of file diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/Event.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/Event.scala new file mode 100644 index 000000000..c73f5d21a --- /dev/null +++ b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/Event.scala @@ -0,0 +1,61 @@ + +package org.sunbird.job.transaction.domain + +import org.apache.commons.lang3.StringUtils +import org.sunbird.job.domain.reader.JobRequest + +import java.text.{DateFormat, SimpleDateFormat} +import java.util +import java.util.Date + +class Event(eventMap: java.util.Map[String, Any], partition: Int, offset: Long) extends JobRequest(eventMap, partition, offset) { + + private val df:DateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); + + private val IMAGE_SUFFIX = ".img" + + private val jobName = "TransactionEventProcessor" + + def id: String = readOrDefault("nodeUniqueId", "") + + def operationType: String = readOrDefault("operationType", null) + + def nodeUniqueId: String = readOrDefault("nodeUniqueId", "") + + def createdOn: String = readOrDefault("createdOn", "") + + def channelId(channel: String): String = readOrDefault("channel", channel) + + def objectId: String = if (null != nodeUniqueId) nodeUniqueId.replaceAll(IMAGE_SUFFIX, "") else nodeUniqueId + + def objectType: String = readOrDefault[String]("objectType", null) + + def userId: String = readOrDefault[String]("userId", "") + + def transactionData: Map[String, AnyRef] = { + readOrDefault("transactionData", new util.HashMap[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]] + } + + def nodeType: String = readOrDefault("nodeType", "") + + def ets: Long = readOrDefault("ets", 0L) + + def label: String = readOrDefault("label", "") + + def graphId: String = readOrDefault("graphId", "") + + def requestId: String = readOrDefault("requestId", "") + + def syncMessage: String = readOrDefault("syncMessage", null) + + def createdOnDate: Date = if (createdOn.isEmpty) new Date else df.parse(createdOn) + + def audit: Boolean = readOrDefault("audit", true) + + def isValid: Boolean = { + StringUtils.isNotBlank(objectType) || + operationType != null && null == syncMessage && audit + } + + +} diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditEventGenerator.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditEventGenerator.scala new file mode 100644 index 000000000..a096260e8 --- /dev/null +++ b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditEventGenerator.scala @@ -0,0 +1,49 @@ +package org.sunbird.job.transaction.functions + +import java.util +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.slf4j.LoggerFactory +import org.sunbird.job.transaction.domain.Event +import org.sunbird.job.transaction.service.TransactionEventProcessorService +import org.sunbird.job.transaction.task.TransactionEventProcessorConfig +import org.sunbird.job.exception.InvalidEventException +import org.sunbird.job.{BaseProcessFunction, Metrics} + +class AuditEventGenerator(config: TransactionEventProcessorConfig) + (implicit mapTypeInfo: TypeInformation[util.Map[String, AnyRef]], + stringTypeInfo: TypeInformation[String]) + extends BaseProcessFunction[Event, String](config) with TransactionEventProcessorService{ + + private[this] lazy val logger = LoggerFactory.getLogger(classOf[AuditEventGenerator]) + + override def metricsList(): List[String] = { + List(config.totalEventsCount, config.successEventCount, config.failedEventCount, config.skippedEventCount, config.emptySchemaEventCount, config.emptyPropsEventCount) + } + + override def open(parameters: Configuration): Unit = { + super.open(parameters) + } + + override def close(): Unit = { + super.close() + } + + @throws(classOf[InvalidEventException]) + override def processElement(event: Event, + context: ProcessFunction[Event, String]#Context, + metrics: Metrics): Unit = { + try { + metrics.incCounter(config.totalEventsCount) + if(event.isValid) { + logger.info("valid event: "+event.nodeUniqueId) + processEvent(event, context, metrics)(config) + } else metrics.incCounter(config.skippedEventCount) + } catch { + case ex: Exception => + metrics.incCounter(config.failedEventCount) + throw new InvalidEventException(ex.getMessage, Map("partition" -> event.partition,"offset" -> event.offset),ex) + } + } +} diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditHistoryIndexer.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditHistoryIndexer.scala new file mode 100644 index 000000000..fe6fe2ae0 --- /dev/null +++ b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditHistoryIndexer.scala @@ -0,0 +1,44 @@ +package org.sunbird.job.transaction.functions + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.sunbird.job.transaction.domain.Event +import org.sunbird.job.transaction.service.TransactionEventProcessorService +import org.sunbird.job.transaction.task.TransactionEventProcessorConfig +import org.sunbird.job.util.ElasticSearchUtil +import org.sunbird.job.{BaseProcessKeyedFunction, Metrics} + +import java.util + +class AuditHistoryIndexer(config: TransactionEventProcessorConfig, var esUtil: ElasticSearchUtil) + (implicit mapTypeInfo: TypeInformation[util.Map[String, Any]], + stringTypeInfo: TypeInformation[String]) + extends BaseProcessKeyedFunction[String, Event, String](config) with TransactionEventProcessorService{ + + + override def metricsList(): List[String] = { + List(config.totalEventsCount, config.successEventCount, config.failedEventCount, config.esFailedEventCount, config.skippedEventCount) + } + + override def open(parameters: Configuration): Unit = { + super.open(parameters) + if (esUtil == null) { + esUtil = new ElasticSearchUtil(config.esConnectionInfo, config.auditHistoryIndex, config.auditHistoryIndexType) + } + } + + override def close(): Unit = { + esUtil.close() + super.close() + } + + override def processElement(event: Event, + context: KeyedProcessFunction[String, Event, String]#Context, + metrics: Metrics): Unit = { + metrics.incCounter(config.totalEventsCount) + if(event.isValid) { + processEvent(event, metrics)(esUtil, config) + } else metrics.incCounter(config.skippedEventCount) + } +} diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/service/TransactionEventProcessorService.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/service/TransactionEventProcessorService.scala new file mode 100644 index 000000000..7b4f043c5 --- /dev/null +++ b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/service/TransactionEventProcessorService.scala @@ -0,0 +1,240 @@ +package org.sunbird.job.transaction.service +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.commons.lang3.StringUtils +import org.slf4j.LoggerFactory +import org.sunbird.job.Metrics +import org.sunbird.job.util.{ElasticSearchUtil, JSONUtil} +import org.sunbird.telemetry.TelemetryGenerator +import org.sunbird.telemetry.TelemetryParams +import org.sunbird.job.transaction.domain.{AuditHistoryRecord, Event} +import org.sunbird.job.domain.`object`.{DefinitionCache, ObjectDefinition} +import com.google.gson.Gson +import org.sunbird.job.exception.InvalidEventException +import org.sunbird.job.transaction.task.TransactionEventProcessorConfig + +import java.io.IOException +import java.util +import java.text.SimpleDateFormat +import java.util.{Calendar, Date, TimeZone} + +trait TransactionEventProcessorService { + private[this] lazy val logger = LoggerFactory.getLogger(classOf[TransactionEventProcessorService]) + private val OBJECT_TYPE_IMAGE_SUFFIX = "Image" + private val SKIP_AUDIT = """{"object": {"type":null}}""" + private lazy val definitionCache = new DefinitionCache + private lazy val gson = new Gson + + private val systemPropsList = List("IL_SYS_NODE_TYPE", "IL_FUNC_OBJECT_TYPE", "IL_UNIQUE_ID", "IL_TAG_NAME", "IL_ATTRIBUTE_NAME", "IL_INDEXABLE_METADATA_KEY", "IL_NON_INDEXABLE_METADATA_KEY", + "IL_IN_RELATIONS_KEY", "IL_OUT_RELATIONS_KEY", "IL_REQUIRED_PROPERTIES", "IL_SYSTEM_TAGS_KEY", "IL_SEQUENCE_INDEX", "SYS_INTERNAL_LAST_UPDATED_ON", "lastUpdatedOn", "versionKey", "lastStatusChangedOn") + + private def getContext(channelId: String, env: String): Map[String, String] = { + val context = Map( + TelemetryParams.ACTOR.name -> "org.ekstep.learning.platform", + TelemetryParams.CHANNEL.name -> channelId, + TelemetryParams.ENV.name -> env + ) + context + } + + @throws(classOf[InvalidEventException]) + def processEvent(message: Event, context: ProcessFunction[Event, String]#Context, metrics: Metrics)(implicit config: TransactionEventProcessorConfig): Unit = { + logger.info("AUDIT Event::" + JSONUtil.serialize(message)) + logger.info("Input Message Received for : [" + message.nodeUniqueId + "], Txn Event createdOn:" + message.createdOn + ", Operation Type:" + message.operationType) + try { + val (auditEventStr, objectType) = getAuditMessage(message)(config, metrics) + if (StringUtils.isNotBlank(objectType)) { + context.output(config.auditOutputTag, auditEventStr) + logger.info("Telemetry Audit Message Successfully Sent for : " + message.objectId + " :: event ::" + auditEventStr) + metrics.incCounter(config.successEventCount) + } + else { + logger.info("Skipped event as the objectype is not available, event =" + auditEventStr) + metrics.incCounter(config.emptyPropsEventCount) + } + } catch { + case e: Exception => + logger.error("Failed to process message :: " + JSONUtil.serialize(message), e) + throw e + } + } + + def getDefinition(objectType: String)(implicit config: TransactionEventProcessorConfig, metrics: Metrics): ObjectDefinition = { + try { + definitionCache.getDefinition(objectType, config.configVersion, config.basePath) + } catch { + case ex: Exception => { + metrics.incCounter(config.emptySchemaEventCount) + new ObjectDefinition(objectType, config.configVersion, Map[String, AnyRef](), Map[String, AnyRef]()) + } + } + } + + + def getAuditMessage(message: Event)(implicit config: TransactionEventProcessorConfig, metrics: Metrics): (String, String) = { + var auditMap: String = null + var objectType = message.objectType + val env = if (null != objectType) objectType.toLowerCase.replace("image", "") else "system" + + val definitionNode: ObjectDefinition = getDefinition(objectType) + + val propertyMap = message.transactionData("properties").asInstanceOf[Map[String, AnyRef]] + val statusMap = propertyMap.getOrElse("status", null).asInstanceOf[Map[String, AnyRef]] + val lastStatusChangedOn = propertyMap.getOrElse("lastStatusChangedOn", null).asInstanceOf[Map[String, AnyRef]] + val addedRelations = message.transactionData.getOrElse("addedRelations", List[Map[String, AnyRef]]()).asInstanceOf[List[Map[String, AnyRef]]] + val removedRelations = message.transactionData.getOrElse("removedRelations", List[Map[String, AnyRef]]()).asInstanceOf[List[Map[String, AnyRef]]] + + var pkgVersion = "" + var prevStatus = "" + var currStatus = "" + var duration = "" + val pkgVerMap = propertyMap.getOrElse("pkgVersion", null).asInstanceOf[Map[String, AnyRef]] + if (null != pkgVerMap) pkgVersion = s"${pkgVerMap.get("nv")}" + + if (null != statusMap) { + prevStatus = statusMap.getOrElse("ov", null).asInstanceOf[String] + currStatus = statusMap.getOrElse("nv", null).asInstanceOf[String] + // Compute Duration for Status Change + if (StringUtils.isNotBlank(currStatus) && StringUtils.isNotBlank(prevStatus) && null != lastStatusChangedOn) { + var ov = lastStatusChangedOn.getOrElse("ov", null).asInstanceOf[String] + val nv = lastStatusChangedOn.getOrElse("nv", null).asInstanceOf[String] + if (null == ov) ov = propertyMap.getOrElse("lastUpdatedOn", null).asInstanceOf[Map[String, AnyRef]].getOrElse("ov", null).asInstanceOf[String] + if (null != ov && null != nv) duration = String.valueOf(computeDuration(ov, nv)) + } + } + + var props: List[String] = propertyMap.keys.toList + props ++= getRelationProps(addedRelations, definitionNode) + props ++= getRelationProps(removedRelations, definitionNode) + val propsExceptSystemProps = props.filter(prop => !systemPropsList.contains(prop)) + val cdata = getCData(addedRelations, removedRelations, propertyMap) + + var context: Map[String, String] = getContext(message.channelId(config.defaultChannel), env) + + objectType = if (null != objectType) objectType.replaceAll(OBJECT_TYPE_IMAGE_SUFFIX, "") else objectType + context ++= Map("objectId" -> message.objectId, "objectType" -> objectType) + + if (StringUtils.isNotBlank(duration)) context ++= Map("duration" -> duration) + if (StringUtils.isNotBlank(pkgVersion)) context ++= Map("pkgVersion" -> pkgVersion) + if (StringUtils.isNotBlank(message.userId)) context ++= Map(TelemetryParams.ACTOR.name -> message.userId) + + if (propsExceptSystemProps.nonEmpty) { + val cdataList = gson.fromJson(JSONUtil.serialize(cdata), classOf[java.util.List[java.util.Map[String, Object]]]) + + TelemetryGenerator.setComponent("audit-event-generator") + auditMap = TelemetryGenerator.audit( + JSONUtil.deserialize[util.Map[String, String]](JSONUtil.serialize(context)), + JSONUtil.deserialize[util.List[String]](JSONUtil.serialize(propsExceptSystemProps)), + currStatus, + prevStatus, + cdataList) + logger.info("Audit Message for Content Id [" + message.objectId + "] : " + auditMap); + + (auditMap, message.objectType) + } + else { + logger.info("Skipping Audit log as props is null or empty") + (SKIP_AUDIT, "") + } + } + + /** + * @param addedRelations + * @param removedRelations + * @param propertyMap + * @return + */ + private def getCData(addedRelations: List[Map[String, AnyRef]], removedRelations: List[Map[String, AnyRef]], propertyMap: Map[String, AnyRef]): List[Map[String, AnyRef]] = { + var cdata = List[Map[String, AnyRef]]() + if (null != propertyMap && propertyMap.nonEmpty && propertyMap.contains("dialcodes")) { + val dialcodeMap = propertyMap("dialcodes").asInstanceOf[Map[String, AnyRef]] + val dialcodes = dialcodeMap("nv").asInstanceOf[List[String]] + if (null != dialcodes) { + cdata :+= Map[String, AnyRef]("id" -> dialcodes, "type" -> "DialCode") + } + } + if (null != addedRelations && addedRelations.nonEmpty) cdata ++= prepareCMap(addedRelations) + if (null != removedRelations && removedRelations.nonEmpty) cdata ++= prepareCMap(removedRelations) + cdata + } + + /** + * @param relations + */ + private def prepareCMap(relations: List[Map[String, AnyRef]]): List[Map[String, AnyRef]] = { + relations.map(relation => { + Map[String, AnyRef]("id" -> relation("id"), "type" -> relation("type")) + }) + } + + /** + * + * @param relations + */ + private def getRelationProps(relations: List[Map[String, AnyRef]], objectDefinition: ObjectDefinition)(implicit config: TransactionEventProcessorConfig):List[String] = { + var relationProps = List[String]() + if (relations.nonEmpty) { + relations.foreach(rel => { + val direction = rel.getOrElse("dir", "").asInstanceOf[String] + val relationType = rel.getOrElse("rel", "").asInstanceOf[String] + val targetObjType = rel.getOrElse("type", "").asInstanceOf[String] + val relationProp = objectDefinition.relationLabel(targetObjType, direction, relationType) + if (relationProp.nonEmpty) { + relationProps :+= relationProp.get + } + }) + } + relationProps + } + + /** + * @param oldDate + * @param newDate + * @return + */ + def computeDuration(oldDate: String, newDate: String): Long = { + val sdf:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + val od = sdf.parse(oldDate) + val nd = sdf.parse(newDate) + val diff = nd.getTime - od.getTime + val diffSeconds = diff / 1000 + diffSeconds + } + + @throws(classOf[InvalidEventException]) + def processEvent(event: Event, metrics: Metrics)(implicit esUtil: ElasticSearchUtil, config: TransactionEventProcessorConfig): Unit = { + if (event.isValid) { + val identifier = event.nodeUniqueId + logger.info("Audit learning event received : " + identifier) + try { + val record = getAuditHistory(event) + val document = JSONUtil.serialize(record) + val indexName = getIndexName(event.ets) + esUtil.addDocumentWithIndex(document, indexName) + logger.info("Audit record created for " + identifier) + metrics.incCounter(config.successEventCount) + } catch { + case ex: IOException => + logger.error("Error while indexing message :: " + event.getJson + " :: " + ex.getMessage) + ex.printStackTrace() + metrics.incCounter(config.esFailedEventCount) + throw new InvalidEventException(ex.getMessage, Map("partition" -> event.partition, "offset" -> event.offset), ex) + case ex: Exception => + logger.error("Error while processing message :: " + event.getJson + " :: ", ex) + metrics.incCounter(config.failedEventCount) + } + } + else logger.info("Learning event not qualified for audit") + } + + private def getIndexName(ets: Long)(implicit config: TransactionEventProcessorConfig): String = { + val cal = Calendar.getInstance(TimeZone.getTimeZone(config.timeZone)) + cal.setTime(new Date(ets)) + config.auditHistoryIndex + "_" + cal.get(Calendar.YEAR) + "_" + cal.get(Calendar.WEEK_OF_YEAR) + } + + def getAuditHistory(transactionDataMap: Event): AuditHistoryRecord = { + val nodeUniqueId = StringUtils.replace(transactionDataMap.nodeUniqueId, ".img", "") + AuditHistoryRecord(nodeUniqueId, transactionDataMap.objectType, transactionDataMap.label, transactionDataMap.graphId, transactionDataMap.userId, transactionDataMap.requestId, JSONUtil.serialize(transactionDataMap.transactionData), transactionDataMap.operationType, transactionDataMap.createdOnDate) + } + +} \ No newline at end of file diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorConfig.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorConfig.scala new file mode 100644 index 000000000..7af09f05e --- /dev/null +++ b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorConfig.scala @@ -0,0 +1,56 @@ +package org.sunbird.job.transaction.task + +import java.util +import com.typesafe.config.Config +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.streaming.api.scala.OutputTag +import org.sunbird.job.BaseJobConfig + +class TransactionEventProcessorConfig(override val config: Config) extends BaseJobConfig(config, "transaction-event-processor") { + + private val serialVersionUID = 2905979434303791379L + + implicit val mapTypeInfo: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]]) + implicit val stringTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String]) + + // Kafka Topics Configuration + val kafkaInputTopic: String = config.getString("kafka.input.topic") + val kafkaOutputTopic: String = config.getString("kafka.output.topic") + override val kafkaConsumerParallelism: Int = config.getInt("task.consumer.parallelism") + override val parallelism: Int = config.getInt("task.parallelism") + val kafkaProducerParallelism: Int = config.getInt("task.producer.parallelism") + val auditEventGenerator: Boolean = config.getBoolean("job.audit-event-generator") + val auditHistoryIndexer: Boolean = config.getBoolean("job.audit-history-indexer") + + val auditOutputTag: OutputTag[String] = OutputTag[String]("audit-event-tag") + + val defaultChannel: String =config.getString("channel.default") + + // Metric List + val totalEventsCount = "total-events-count" + val successEventCount = "success-events-count" + val failedEventCount = "failed-events-count" + val skippedEventCount = "skipped-events-count" + val emptySchemaEventCount = "empty-schema-events-count" + val emptyPropsEventCount = "empty-props-events-count" + val esFailedEventCount = "elasticsearch-error-events-count" + + // Consumers + val transactionEventConsumer = "transaction-event-processor-consumer" + val auditEventGeneratorFunction = "audit-event-generator-function" + val auditHistoryIndexerFunction = "audit-history-indexer-function" + val transactionEventProducer = "transaction-event-processor-producer" + + + val basePath = config.getString("schema.basePath") + val configVersion = "1.0" + + // ES Configs + val esConnectionInfo = config.getString("es.basePath") + + val timeZone = if (config.hasPath("timezone")) config.getString("timezone") else "IST" + val auditHistoryIndex = "kp_audit_log" + val operationCreate = "CREATE" + val auditHistoryIndexType = "ah" +} diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorStreamTask.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorStreamTask.scala new file mode 100644 index 000000000..8bc584c77 --- /dev/null +++ b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorStreamTask.scala @@ -0,0 +1,72 @@ +package org.sunbird.job.transaction.task + +import java.io.File +import java.util +import com.typesafe.config.ConfigFactory +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.sunbird.job.connector.FlinkKafkaConnector +import org.sunbird.job.transaction.domain.Event +import org.sunbird.job.transaction.functions.{AuditEventGenerator, AuditHistoryIndexer} +import org.sunbird.job.util.{ElasticSearchUtil, FlinkUtil, HttpUtil} + + +class TransactionEventProcessorStreamTask(config: TransactionEventProcessorConfig, kafkaConnector: FlinkKafkaConnector, esUtil: ElasticSearchUtil) { + def process(): Unit = { + implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(config) + // implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment() + implicit val eventTypeInfo: TypeInformation[Event] = TypeExtractor.getForClass(classOf[Event]) + implicit val mapTypeInfo: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]]) + implicit val mapTypeInfoEs: TypeInformation[util.Map[String, Any]] = TypeExtractor.getForClass(classOf[util.Map[String, Any]]) + implicit val stringTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String]) + + val inputStream = env.addSource(kafkaConnector.kafkaJobRequestSource[Event](config.kafkaInputTopic)).name(config.transactionEventConsumer) + .uid(config.transactionEventConsumer).setParallelism(config.kafkaConsumerParallelism) + + if(config.auditEventGenerator) { + val auditEventGeneratorStreamTask = inputStream.rebalance + .process(new AuditEventGenerator(config)) + .name(config.auditEventGeneratorFunction) + .uid(config.auditEventGeneratorFunction) + .setParallelism(config.parallelism) + + auditEventGeneratorStreamTask.getSideOutput(config.auditOutputTag).addSink(kafkaConnector.kafkaStringSink(config.kafkaOutputTopic)) + .name(config.transactionEventProducer).uid(config.transactionEventProducer).setParallelism(config.kafkaProducerParallelism) + } + + if(config.auditHistoryIndexer){ + inputStream.rebalance + .keyBy(new TransactionEventKeySelector) + .process(new AuditHistoryIndexer(config, esUtil)) + .name(config.auditHistoryIndexerFunction) + .uid(config.auditHistoryIndexerFunction) + .setParallelism(config.parallelism) + } + env.execute(config.jobName) + } +} + +// $COVERAGE-OFF$ Disabling scoverage as the below code can only be invoked within flink cluster +object TransactionEventProcessorStreamTask { + + def main(args: Array[String]): Unit = { + val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path")) + val config = configFilePath.map { + path => ConfigFactory.parseFile(new File(path)).resolve() + }.getOrElse(ConfigFactory.load("transaction-event-processor.conf").withFallback(ConfigFactory.systemEnvironment())) + val transactionEventProcessorConfig = new TransactionEventProcessorConfig(config) + val kafkaUtil = new FlinkKafkaConnector(transactionEventProcessorConfig) + val esUtil:ElasticSearchUtil = null + val task = new TransactionEventProcessorStreamTask(transactionEventProcessorConfig, kafkaUtil, esUtil) + task.process() + } +} + +// $COVERAGE-ON$ + +class TransactionEventKeySelector extends KeySelector[Event, String] { + override def getKey(in: Event): String = in.id.replace(".img", "") +} diff --git a/transaction-event-processor/src/test/resources/logback-test.xml b/transaction-event-processor/src/test/resources/logback-test.xml new file mode 100644 index 000000000..e81294323 --- /dev/null +++ b/transaction-event-processor/src/test/resources/logback-test.xml @@ -0,0 +1,16 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + \ No newline at end of file diff --git a/transaction-event-processor/src/test/resources/test.conf b/transaction-event-processor/src/test/resources/test.conf new file mode 100644 index 000000000..26207d65d --- /dev/null +++ b/transaction-event-processor/src/test/resources/test.conf @@ -0,0 +1,25 @@ +include "base-config.conf" + +kafka { + input.topic = "sunbirddev.learning.graph.events" + output.topic = "sunbirddev.telemetry.raw" + groupId = "sunbirddev-transaction-event-processor-group" +} + +task { + consumer.parallelism = 1 + parallelism = 1 + producer.parallelism = 1 + window.time = 60 +} + +schema { + basePath = "https://sunbirddev.blob.core.windows.net/sunbird-content-dev/schemas/local" +} + +channel.default = "org.sunbird" + +job { + audit-event-generator = true + audit-history-indexer = false +} diff --git a/transaction-event-processor/src/test/scala/org/sunbird/job/fixture/EventFixture.scala b/transaction-event-processor/src/test/scala/org/sunbird/job/fixture/EventFixture.scala new file mode 100644 index 000000000..eb9da761c --- /dev/null +++ b/transaction-event-processor/src/test/scala/org/sunbird/job/fixture/EventFixture.scala @@ -0,0 +1,64 @@ +package org.sunbird.job.fixture + +object EventFixture { + + val EVENT_1: String = + """ + |{"ets":1518517878987,"nodeUniqueId":"do_11243969846440755213","requestId":null,"transactionData":{"properties":{"code":{"ov":null,"nv":"test_code"},"keywords":{"ov":null,"nv":["colors","games"]},"channel":{"ov":null,"nv":"in.ekstep"},"language":{"ov":null,"nv":["English"]},"mimeType":{"ov":null,"nv":"application/pdf"},"idealScreenSize":{"ov":null,"nv":"normal"},"createdOn":{"ov":null,"nv":"2018-02-13T16:01:18.947+0530"},"contentDisposition":{"ov":null,"nv":"inline"},"contentEncoding":{"ov":null,"nv":"identity"},"lastUpdatedOn":{"ov":null,"nv":"2018-02-13T16:01:18.947+0530"},"contentType":{"ov":null,"nv":"Story"},"audience":{"ov":null,"nv":["Learner"]},"IL_SYS_NODE_TYPE":{"ov":null,"nv":"DATA_NODE"},"os":{"ov":null,"nv":["All"]},"visibility":{"ov":null,"nv":"Default"},"mediaType":{"ov":null,"nv":"content"},"osId":{"ov":null,"nv":"org.ekstep.quiz.app"},"versionKey":{"ov":null,"nv":"1518517878947"},"idealScreenDensity":{"ov":null,"nv":"hdpi"},"framework":{"ov":null,"nv":"NCF"},"compatibilityLevel":{"ov":null,"nv":1.0},"IL_FUNC_OBJECT_TYPE":{"ov":null,"nv":"Content"},"name":{"ov":null,"nv":"Untitled Resource"},"IL_UNIQUE_ID":{"ov":null,"nv":"do_11243969846440755213"},"status":{"ov":null,"nv":"Draft"},"resourceType":{"ov":null,"nv":"Story"}}},"operationType":"CREATE","nodeGraphId":113603,"label":"Untitled Resource","graphId":"domain","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2018-02-13T16:01:18.987+0530","objectType":"Content"} + |""".stripMargin + + val EVENT_2: String = + """ + |{"ets":1552464504681,"channel":"in.ekstep","transactionData":{"properties":{"lastStatusChangedOn":{"ov":"2019-03-13T13:25:43.129+0530","nv":"2019-03-13T13:38:24.358+0530"},"lastSubmittedOn":{"ov":null,"nv":"2019-03-13T13:38:21.901+0530"},"lastUpdatedOn":{"ov":"2019-03-13T13:36:20.093+0530","nv":"2019-03-13T13:38:24.399+0530"},"status":{"ov":"Draft","nv":"Review"},"versionKey":{"ov":"1552464380093","nv":"1552464504399"}}},"label":"Resource Content 1","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2019-03-13T13:38:24.680+0530","objectType":"Content","nodeUniqueId":"do_11271778298376192013","requestId":null,"operationType":"UPDATE","nodeGraphId":590883,"graphId":"domain"} + |""".stripMargin + + val EVENT_3: String = + """ + |{"ets":1552464380225,"channel":"in.ekstep","transactionData":{"properties":{"s3Key":{"ov":null,"nv":"content/do_11271778298376192013/artifact/pdf_1552464372724.pdf"},"size":{"ov":null,"nv":433994.0},"artifactUrl":{"ov":null,"nv":"https://sunbirddev.blob.core.windows.net/sunbird-content-dev/content/do_11271778298376192013/artifact/pdf_1552464372724.pdf"},"lastUpdatedOn":{"ov":"2019-03-13T13:25:43.129+0530","nv":"2019-03-13T13:36:20.093+0530"},"versionKey":{"ov":"1552463743129","nv":"1552464380093"}}},"label":"Resource Content 1","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2019-03-13T13:36:20.223+0530","objectType":"Content","nodeUniqueId":"do_11271778298376192013","requestId":null,"operationType":"UPDATE","nodeGraphId":590883,"graphId":"domain"} + |""".stripMargin + + val EVENT_4: String = + """ + |{"ets":1552645516180,"channel":"in.ekstep","transactionData":{"properties":{"ownershipType":{"ov":null,"nv":["createdBy"]},"code":{"ov":null,"nv":"test.res.1"},"channel":{"ov":null,"nv":"in.ekstep"},"language":{"ov":null,"nv":["English"]},"mimeType":{"ov":null,"nv":"application/pdf"},"idealScreenSize":{"ov":null,"nv":"normal"},"createdOn":{"ov":null,"nv":"2019-03-15T15:55:16.071+0530"},"contentDisposition":{"ov":null,"nv":"inline"},"lastUpdatedOn":{"ov":null,"nv":"2019-03-15T15:55:16.071+0530"},"contentEncoding":{"ov":null,"nv":"identity"},"dialcodeRequired":{"ov":null,"nv":"No"},"contentType":{"ov":null,"nv":"Resource"},"lastStatusChangedOn":{"ov":null,"nv":"2019-03-15T15:55:16.071+0530"},"audience":{"ov":null,"nv":["Learner"]},"IL_SYS_NODE_TYPE":{"ov":null,"nv":"DATA_NODE"},"visibility":{"ov":null,"nv":"Default"},"os":{"ov":null,"nv":["All"]},"mediaType":{"ov":null,"nv":"content"},"osId":{"ov":null,"nv":"org.ekstep.quiz.app"},"versionKey":{"ov":null,"nv":"1552645516071"},"idealScreenDensity":{"ov":null,"nv":"hdpi"},"framework":{"ov":null,"nv":"NCF"},"compatibilityLevel":{"ov":null,"nv":1.0},"IL_FUNC_OBJECT_TYPE":{"ov":null,"nv":"Content"},"name":{"ov":null,"nv":"Resource Content 1"},"IL_UNIQUE_ID":{"ov":null,"nv":"do_11271927206783385611"},"status":{"ov":null,"nv":"Draft"}}},"mid":"9ea9ae7a-9cc1-493d-aac3-3c66cd9ff01b","label":"Resource Content 1","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2019-03-15T15:55:16.178+0530","objectType":"Content","nodeUniqueId":"do_11271927206783385611","requestId":null,"operationType":"CREATE","nodeGraphId":590921,"graphId":"domain"} + |""".stripMargin + + val EVENT_5: String = + """ + |{"ets":1.614227716965E12,"transactionData":{"removedTags":[],"addedRelations":[{"rel":"hasSequenceMember","id":"do_11322375329257062411","label":"event 4.1: 11-1","dir":"OUT","type":"Event","relMetadata":{"IL_SEQUENCE_INDEX":1.0}}],"removedRelations":[],"addedTags":[],"properties":{}},"mid":"fc0bb006-7269-4b10-96c3-10672fca53a0","label":"eventset 4","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2021-02-25T04:35:16.965+0000","objectType":null,"partition":1,"nodeUniqueId":"do_11322375344215654413","operationType":"UPDATE","nodeGraphId":509461.0,"graphId":"domain"} + |""".stripMargin + + val EVENT_6: String = + """ + |{"ets":1502102183388,"nodeUniqueId":"do_112276071067320320114","requestId":null,"transactionData":{"addedRelations":[{"rel":"hasSequenceMember","id":"do_1123032073439723521148","label":"Test unit 11","dir":"IN","type":"Content"}],"removedRelations":[],"properties":{"name":{"nv":"","ov":""}}},"operationType":"CREATE","nodeGraphId":105631,"label":"collaborator test","graphId":"domain","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2017-08-07T10:36:23.388+0000","objectType":"Content"} + |""".stripMargin + + val EVENT_7: String = + """ + |{"ets":1614776853781,"channel":"b00bc992ef25f1a9a8d63291e20efc8d","transactionData":{"properties":{"dialcodes":{"ov":null,"nv":["K1W6L6"]}}},"mid":"5b5633a2-3c18-49a6-8822-6d7b85338104","label":"Test Again","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2021-03-03T13:07:33.781+0000","objectType":"Collection","nodeUniqueId":"do_1132282511204024321262","requestId":null,"operationType":"UPDATE","nodeGraphId":510086,"graphId":"domain"} + |""".stripMargin + + val EVENT_8: String = + """ + |{"ets":1.614227716965E12,"transactionData":{"removedTags":[],"addedRelations":[{"rel":"hasSequenceMember","id":"do_11322375329257062411","label":"event 4.1: 11-1","dir":"OUT","type":"Event","relMetadata":{"IL_SEQUENCE_INDEX":1.0}}],"removedRelations":[],"addedTags":[],"properties":{}},"mid":"fc0bb006-7269-4b10-96c3-10672fca53a0","label":"eventset 4","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2021-02-25T04:35:16.965+0000","objectType":"RandomSchemaType","partition":1,"nodeUniqueId":"do_11322375344215654413","operationType":"UPDATE","nodeGraphId":509461.0,"graphId":"domain"} + |""".stripMargin + + val EVENT_9: String = + """ + |{"ets":1518517878987,"nodeUniqueId":"do_11243969846440755213","requestId":null,"transactionData":{"properties":{"code":{"ov":null,"nv":"test_code"},"keywords":{"ov":null,"nv":["colors","games"]},"channel":{"ov":null,"nv":"in.ekstep"},"language":{"ov":null,"nv":["English"]},"mimeType":{"ov":null,"nv":"application/pdf"},"idealScreenSize":{"ov":null,"nv":"normal"},"createdOn":{"ov":null,"nv":"2018-02-13T16:01:18.947+0530"},"contentDisposition":{"ov":null,"nv":"inline"},"contentEncoding":{"ov":null,"nv":"identity"},"lastUpdatedOn":{"ov":null,"nv":"2018-02-13T16:01:18.947+0530"},"contentType":{"ov":null,"nv":"Story"},"audience":{"ov":null,"nv":["Learner"]},"IL_SYS_NODE_TYPE":{"ov":null,"nv":"DATA_NODE"},"os":{"ov":null,"nv":["All"]},"visibility":{"ov":null,"nv":"Default"},"mediaType":{"ov":null,"nv":"content"},"osId":{"ov":null,"nv":"org.ekstep.quiz.app"},"versionKey":{"ov":null,"nv":"1518517878947"},"idealScreenDensity":{"ov":null,"nv":"hdpi"},"framework":{"ov":null,"nv":"NCF"},"compatibilityLevel":{"ov":null,"nv":1.0},"IL_FUNC_OBJECT_TYPE":{"ov":null,"nv":"Content"},"name":{"ov":null,"nv":"Untitled Resource"},"IL_UNIQUE_ID":{"ov":null,"nv":"do_11243969846440755213"},"status":{"ov":null,"nv":"Draft"},"resourceType":{"ov":null,"nv":"Story"}}},"operationType":"CREATE","nodeGraphId":113603,"label":"Untitled Resource","graphId":"domain","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2018-02-13T16:01:18.987+0530","objectType":"Content"} + |""".stripMargin + + val EVENT_10: String = + """ + |{"ets":1502102183388,"nodeUniqueId":"do_112276071067320320114","requestId":null,"transactionData":{"addedRelations":[{"rel":"hasSequenceMember","id":"do_1123032073439723521148","label":"Test unit 11","dir":"IN","type":"Content"}],"removedRelations":[],"properties":{"name":{"nv":"","ov":""}}},"operationType":"CREATE","nodeGraphId":105631,"label":"collaborator test","graphId":"domain","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2017-08-07T10:36:23.388+0000","objectType":"Content"} + |""".stripMargin + + val EVENT_11: String = + """ + |{"ets":1615191835547,"channel":"01309282781705830427","transactionData":{"removedTags":[],"addedRelations":[],"removedRelations":[{"rel":"associatedTo","id":"do_113198273083662336127","label":"qq\n","dir":"OUT","type":"AssessmentItem","relMetadata":{}}],"addedTags":[],"properties":{}},"mid":"98145983-63dc-4d55-866c-248d49306ad8","label":"ECML_CHANGES","nodeType":"DATA_NODE","userId":"5a587cc1-e018-4859-a0a8-e842650b9d64","createdOn":"2021-03-08T08:23:55.547+0000","objectType":"Content","nodeUniqueId":"do_1132316371218268161118","requestId":null,"operationType":"UPDATE","nodeGraphId":510477,"graphId":"domain"} + |""".stripMargin + + val EVENT_12: String = + """ + |{"ets":1500888709490,"requestId":null,"transactionData":{"properties":{"IL_SYS_NODE_TYPE":{"ov":null,"nv":"DATA_NODE"},"morphology":{"ov":null,"nv":false},"consumerId":{"ov":null,"nv":"a6654129-b58d-4dd8-9cf2-f8f3c2f458bc"},"channel":{"ov":null,"nv":"in.ekstep"},"lemma":{"ov":null,"nv":"ವಿಶ್ಲೇಷಣೆ"},"createdOn":{"ov":null,"nv":"2017-07-24T09:32:18.130+0000"},"versionKey":{"ov":null,"nv":"1500888738130"},"IL_FUNC_OBJECT_TYPE":{"ov":null,"nv":"Word"},"ekstepWordnet":{"ov":null,"nv":false},"lastUpdatedOn":{"ov":null,"nv":"2017-07-24T09:32:18.130+0000"},"isPhrase":{"ov":null,"nv":false},"IL_UNIQUE_ID":{"ov":null,"nv":"ka_11229528054276096015"},"status":{"ov":null,"nv":"Draft"}}},"nodeGraphId":433342,"label":"ವಿಶ್ಲೇಷಣೆ","graphId":"ka","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2017-07-24T09:31:49.490+0000","objectType":"Word"} + |""".stripMargin +} \ No newline at end of file diff --git a/transaction-event-processor/src/test/scala/org/sunbird/job/spec/TransactionEventProcessorTaskTestSpec.scala b/transaction-event-processor/src/test/scala/org/sunbird/job/spec/TransactionEventProcessorTaskTestSpec.scala new file mode 100644 index 000000000..fb6a63a0a --- /dev/null +++ b/transaction-event-processor/src/test/scala/org/sunbird/job/spec/TransactionEventProcessorTaskTestSpec.scala @@ -0,0 +1,207 @@ +package org.sunbird.job.spec + +import java.util +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} +import okhttp3.mockwebserver.{MockResponse, MockWebServer} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.runtime.client.JobExecutionException +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.test.util.MiniClusterWithClientResource +import org.sunbird.job.util.{ElasticSearchUtil, JSONUtil} +import org.mockito.Mockito +import org.mockito.Mockito.when +import org.sunbird.job.connector.FlinkKafkaConnector +import org.sunbird.job.transaction.domain.Event +import org.sunbird.job.fixture.EventFixture +import org.sunbird.job.transaction.task.{TransactionEventProcessorConfig, TransactionEventProcessorStreamTask} +import org.sunbird.spec.{BaseMetricsReporter, BaseTestSpec} + +class TransactionEventProcessorTaskTestSpec extends BaseTestSpec { + implicit val mapTypeInfo: TypeInformation[java.util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[java.util.Map[String, AnyRef]]) + + val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() + .setConfiguration(testConfiguration()) + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build) + val mockKafkaUtil: FlinkKafkaConnector = mock[FlinkKafkaConnector](Mockito.withSettings().serializable()) + val config: Config = ConfigFactory.load("test.conf") + val jobConfig: TransactionEventProcessorConfig = new TransactionEventProcessorConfig(config) + val esUtil:ElasticSearchUtil = null + val server = new MockWebServer() + + var currentMilliSecond = 1605816926271L + + override protected def beforeAll(): Unit = { + BaseMetricsReporter.gaugeMetrics.clear() + flinkCluster.before() + super.beforeAll() + } + + override protected def afterAll(): Unit = { + flinkCluster.after() + super.afterAll() + } + + "TransactionEventProcessorStreamTask" should "generate audit event" in { + when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new AuditEventMapSource) + when(mockKafkaUtil.kafkaStringSink(jobConfig.kafkaOutputTopic)).thenReturn(new AuditEventSink) + if (jobConfig.auditEventGenerator) { + new TransactionEventProcessorStreamTask(jobConfig, mockKafkaUtil, esUtil).process() + + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.totalEventsCount}").getValue() should be(2) + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.skippedEventCount}").getValue() should be(0) + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.successEventCount}").getValue() should be(1) + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.failedEventCount}").getValue() should be(0) + + AuditEventSink.values.size() should be(1) + AuditEventSink.values.forEach(event => { + val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](event) + eventMap("eid") should be("AUDIT") + eventMap("ver") should be("3.0") + eventMap("edata") shouldNot be(null) + }) + } + } + + "TransactionEventProcessorStreamTask" should "not generate audit event" in { + when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new AuditEventMapSource) + when(mockKafkaUtil.kafkaStringSink(jobConfig.kafkaOutputTopic)).thenReturn(new AuditEventSink) + + val setBoolean = config.withValue("job.audit-event-generator", ConfigValueFactory.fromAnyRef(false)) + val newConfig: TransactionEventProcessorConfig = new TransactionEventProcessorConfig(setBoolean) + if (newConfig.auditEventGenerator) { + + new TransactionEventProcessorStreamTask(newConfig, mockKafkaUtil, esUtil).process() + + BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.totalEventsCount}").getValue() should be(2) + BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.skippedEventCount}").getValue() should be(2) + BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.successEventCount}").getValue() should be(0) + + } + } + + "TransactionEventProcessorStreamTask" should "increase metric for unknown schema" in { + when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new RandomObjectTypeAuditEventGeneratorMapSource) + when(mockKafkaUtil.kafkaStringSink(jobConfig.kafkaOutputTopic)).thenReturn(new AuditEventSink) + if (jobConfig.auditEventGenerator) { + + new TransactionEventProcessorStreamTask(jobConfig, mockKafkaUtil, esUtil).process() + + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.totalEventsCount}").getValue() should be(1) + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.emptySchemaEventCount}").getValue() should be(1) + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.emptyPropsEventCount}").getValue() should be(1) + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.successEventCount}").getValue() should be(0) + } + } + + + "TransactionEventProcessorStreamTask" should "not generate audit history indexer event" in { + server.start(9200) + server.enqueue(new MockResponse().setHeader( + "Content-Type", "application/json" + ).setBody("""{"_index":"kp_audit_log_2018_7","_type":"ah","_id":"HLZ-1ngBtZ15DPx6ENjU","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1}""")) + + when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new AuditHistoryMapSource) + if (jobConfig.auditHistoryIndexer) { + new TransactionEventProcessorStreamTask(jobConfig, mockKafkaUtil, esUtil).process() + + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.totalEventsCount}").getValue() should be(2) + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.successEventCount}").getValue() should be(0) + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.failedEventCount}").getValue() should be(1) + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.esFailedEventCount}").getValue() should be(1) + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.skippedEventCount}").getValue() should be(0) + } + } + + "TransactionEventProcessorStreamTask" should "generate audit history indexer event" in { + server.enqueue(new MockResponse().setHeader( + "Content-Type", "application/json" + ).setBody("""{"_index":"kp_audit_log_2018_7","_type":"ah","_id":"HLZ-1ngBtZ15DPx6ENjU","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1}""")) + + when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new AuditHistoryMapSource) + val setBoolean = config.withValue("job.audit-history-indexer", ConfigValueFactory.fromAnyRef(true)) + val newConfig: TransactionEventProcessorConfig = new TransactionEventProcessorConfig(setBoolean) + if (newConfig.auditHistoryIndexer) { + new TransactionEventProcessorStreamTask(newConfig, mockKafkaUtil, esUtil).process() + BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.totalEventsCount}").getValue() should be(2) + BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.successEventCount}").getValue() should be(2) + BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.failedEventCount}").getValue() should be(0) + BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.esFailedEventCount}").getValue() should be(0) + BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.skippedEventCount}").getValue() should be(0) + server.close() + } + } + + "TransactionEventProcessorStreamTask" should "throw exception and increase es error count" in { + when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new AuditHistoryMapSource) + + try { + new TransactionEventProcessorStreamTask(jobConfig, mockKafkaUtil, esUtil).process() + } catch { + case ex: JobExecutionException => + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.totalEventsCount}").getValue() should be(1) + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.successEventCount}").getValue() should be(0) + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.failedEventCount}").getValue() should be(0) + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.esFailedEventCount}").getValue() should be(1) + BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.skippedEventCount}").getValue() should be(0) + } + } +} + +class AuditEventMapSource extends SourceFunction[Event] { + + override def run(ctx: SourceContext[Event]) { + ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_1), 0, 10)) + ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_5), 0, 11)) +} + + override def cancel(): Unit = {} +} + +class AuditHistoryMapSource extends SourceFunction[Event] { + + override def run(ctx: SourceContext[Event]) { + + ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_9), 0, 10)) + ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_12),0,11)) + } + + override def cancel(): Unit = {} +} + +class EventMapSource extends SourceFunction[Event] { + + override def run(ctx: SourceContext[Event]) { + + ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_1), 0, 10)) + ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_9),0,11)) + } + + override def cancel(): Unit = {} +} +class RandomObjectTypeAuditEventGeneratorMapSource extends SourceFunction[Event] { + + override def run(ctx: SourceContext[Event]) { + ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_8), 0, 10)) + } + + override def cancel(): Unit = {} +} + +class AuditEventSink extends SinkFunction[String] { + + override def invoke(value: String): Unit = { + synchronized { + AuditEventSink.values.add(value) + } + } +} + +object AuditEventSink { + val values: util.List[String] = new util.ArrayList() +} \ No newline at end of file diff --git a/transaction-event-processor/src/test/scala/org/sunbird/job/spec/service/TransactionEventProcessorServiceTestSpec.scala b/transaction-event-processor/src/test/scala/org/sunbird/job/spec/service/TransactionEventProcessorServiceTestSpec.scala new file mode 100644 index 000000000..b70a23e7c --- /dev/null +++ b/transaction-event-processor/src/test/scala/org/sunbird/job/spec/service/TransactionEventProcessorServiceTestSpec.scala @@ -0,0 +1,158 @@ +package org.sunbird.job.spec.service + +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.mockito.Mockito +import org.slf4j.LoggerFactory +import org.sunbird.job.Metrics +import org.sunbird.job.transaction.domain.{AuditHistoryRecord, Event} +import org.sunbird.job.fixture.EventFixture +import org.sunbird.job.transaction.functions.{AuditEventGenerator, AuditHistoryIndexer} +import org.sunbird.job.transaction.task.TransactionEventProcessorConfig +import org.sunbird.job.util.{ElasticSearchUtil, JSONUtil} +import org.sunbird.spec.BaseTestSpec + +import java.util + +class TransactionEventProcessorTestSpec extends BaseTestSpec { + implicit val mapTypeInfo: TypeInformation[java.util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[java.util.Map[String, AnyRef]]) + implicit val mpTypeInfo: TypeInformation[util.Map[String, Any]] = TypeExtractor.getForClass(classOf[util.Map[String, Any]]) + implicit val strTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String]) + + val config: Config = ConfigFactory.load("test.conf") + lazy val jobConfig: TransactionEventProcessorConfig = new TransactionEventProcessorConfig(config) + lazy val mockMetrics = mock[Metrics](Mockito.withSettings().serializable()) + lazy val auditEventGenerator: AuditEventGenerator = new AuditEventGenerator(jobConfig) + lazy val mockElasticUtil:ElasticSearchUtil = mock[ElasticSearchUtil](Mockito.withSettings().serializable()) + lazy val auditHistoryIndexer: AuditHistoryIndexer = new AuditHistoryIndexer(jobConfig,mockElasticUtil) + + override protected def beforeAll(): Unit = { + super.beforeAll() + } + + override protected def afterAll(): Unit = { + super.afterAll() + } + + "TransactionEventProcessorService" should "generate audit event" in { + val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_1) + + val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics) + val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr) + + eventMap("eid") should be("AUDIT") + eventMap("ver") should be("3.0") + eventMap("edata") shouldNot be(null) + } + + "TransactionEventProcessorService" should "add duration of status change" in { + val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_2) + + val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics) + val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr) + + eventMap("eid") should be("AUDIT") + eventMap("ver") should be("3.0") + eventMap("edata") shouldNot be(null) + val duration = eventMap("edata").asInstanceOf[Map[String, AnyRef]]("duration").asInstanceOf[Int] + duration should be(761) + } + + "TransactionEventProcessorService" should "add Duration as null" in { + val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_3) + + val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics) + val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr) + + eventMap("eid") should be("AUDIT") + eventMap("ver") should be("3.0") + eventMap("edata") shouldNot be(null) + val duration = eventMap("edata").asInstanceOf[Map[String, AnyRef]].getOrElse("duration", null) + duration should be(null) + } + + "TransactionEventProcessorService" should "generate audit for content creation" in { + val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_4) + + val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics) + val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr) + + eventMap("eid") should be("AUDIT") + eventMap("ver") should be("3.0") + eventMap("edata") shouldNot be(null) + val duration = eventMap("edata").asInstanceOf[Map[String, AnyRef]].getOrElse("duration", null) + duration should be(null) + } + + "TransactionEventProcessorService" should "skip audit for objectType is null" in { + val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_5) + + val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics) + + eventStr should be("{\"object\": {\"type\":null}}") + } + + "TransactionEventProcessorService" should "event for addedRelations" in { + val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_6) + + val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics) + val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr) + eventMap("eid") should be("AUDIT") + eventMap("ver") should be("3.0") + eventMap("edata").asInstanceOf[Map[String, AnyRef]]("props").asInstanceOf[List[String]] should contain ("name") + eventMap("edata").asInstanceOf[Map[String, AnyRef]]("props").asInstanceOf[List[String]] should contain ("collections") + val duration = eventMap("edata").asInstanceOf[Map[String, AnyRef]].getOrElse("duration", null) + duration should be(null) + } + + "TransactionEventProcessorService" should "generate audit for update dialcode" in { + val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_7) + + val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics) + val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr) + + eventMap("eid") should be("AUDIT") + eventMap("edata").asInstanceOf[Map[String, AnyRef]]("props").asInstanceOf[List[String]] should contain ("dialcodes") + val cdata = eventMap("cdata").asInstanceOf[List[Map[String, AnyRef]]] + cdata.head("id").asInstanceOf[List[String]] should contain ("K1W6L6") + cdata.head("type") should be("DialCode") + } + + "TransactionEventProcessorService" should "compute duration" in { + val ov = "2019-03-13T13:25:43.129+0530" + val nv = "2019-03-13T13:38:24.358+0530" + val duration = auditEventGenerator.computeDuration(ov, nv) + duration should be(761) + } + + "TransactionEventProcessorService" should "generate es log" in { + val inputEvent: Event = new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_9), 0, 10) + + val auditHistoryRec: AuditHistoryRecord = auditHistoryIndexer.getAuditHistory(inputEvent); + + auditHistoryRec.objectId should be(inputEvent.nodeUniqueId) + auditHistoryRec.objectType should be(inputEvent.objectType) + auditHistoryRec.logRecord should be("""{"properties":{"mediaType":{"nv":"content"},"name":{"nv":"Untitled Resource"},"createdOn":{"nv":"2018-02-13T16:01:18.947+0530"},"channel":{"nv":"in.ekstep"},"lastUpdatedOn":{"nv":"2018-02-13T16:01:18.947+0530"},"IL_FUNC_OBJECT_TYPE":{"nv":"Content"},"resourceType":{"nv":"Story"},"compatibilityLevel":{"nv":1.0},"audience":{"nv":["Learner"]},"os":{"nv":["All"]},"IL_SYS_NODE_TYPE":{"nv":"DATA_NODE"},"framework":{"nv":"NCF"},"versionKey":{"nv":"1518517878947"},"mimeType":{"nv":"application/pdf"},"code":{"nv":"test_code"},"contentType":{"nv":"Story"},"language":{"nv":["English"]},"status":{"nv":"Draft"},"keywords":{"nv":["colors","games"]},"idealScreenSize":{"nv":"normal"},"contentEncoding":{"nv":"identity"},"osId":{"nv":"org.ekstep.quiz.app"},"IL_UNIQUE_ID":{"nv":"do_11243969846440755213"},"contentDisposition":{"nv":"inline"},"visibility":{"nv":"Default"},"idealScreenDensity":{"nv":"hdpi"}}}""") + } + + "TransactionEventProcessorService" should "generate with added relations" in { + val inputEvent: Event = new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_10), 0, 11) + + val auditHistoryRec: AuditHistoryRecord = auditHistoryIndexer.getAuditHistory(inputEvent); + + auditHistoryRec.objectId should be(inputEvent.nodeUniqueId) + auditHistoryRec.objectType should be(inputEvent.objectType) + auditHistoryRec.logRecord should be("""{"addedRelations":[{"label":"Test unit 11","rel":"hasSequenceMember","dir":"IN","id":"do_1123032073439723521148","type":"Content"}],"removedRelations":[],"properties":{"name":{"nv":"","ov":""}}}""") + } + + "TransactionEventProcessorService" should "generate with removed relations" in { + val inputEvent: Event = new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_11), 0, 12) + + val auditHistoryRec: AuditHistoryRecord = auditHistoryIndexer.getAuditHistory(inputEvent); + + auditHistoryRec.objectId should be(inputEvent.nodeUniqueId) + auditHistoryRec.objectType should be(inputEvent.objectType) + auditHistoryRec.logRecord should be("""{"addedTags":[],"addedRelations":[],"properties":{},"removedRelations":[{"label":"qq\n","rel":"associatedTo","dir":"OUT","id":"do_113198273083662336127","relMetadata":{},"type":"AssessmentItem"}],"removedTags":[]}""") + } +}