From f5d8f130d3ea103660d5d1501e4d380866f8e76b Mon Sep 17 00:00:00 2001 From: SurabhiAngadi Date: Mon, 4 Sep 2023 15:10:37 +0530 Subject: [PATCH] Revert "Code Refactoring" This reverts commit bfec31cc8ee750cc010aa2b4110f92f8c47fce71. --- audit-event-generator/pom.xml | 31 +- .../src/main/resources/log4j.properties | 5 +- .../domain/Event.scala | 2 +- .../functions/AuditEventGenerator.scala | 8 +- .../service/AuditEventGeneratorService.scala | 6 +- .../task/AuditEventGeneratorConfig.scala | 2 +- .../task/AuditEventGeneratorStreamTask.scala | 7 +- .../sunbird/job/fixture/EventFixture.scala | 2 +- .../AuditEventGeneratorTaskTestSpec.scala | 4 +- .../AuditEventGeneratorServiceTestSpec.scala | 6 +- audit-history-indexer/pom.xml | 2 +- .../task/AuditHistoryIndexerConfig.scala | 2 +- .../task/AuditHistoryIndexerStreamTask.scala | 1 - pom.xml | 6 +- .../helpers/spec/ContentPublisherSpec.scala | 4 +- .../spec/LiveContentPublisherSpec.scala | 4 +- publish-pipeline/pom.xml | 6 - .../QRCodeImageGeneratorFunction.scala | 13 +- .../task/QRCodeImageGeneratorConfig.scala | 5 +- .../util/QRCodeImageGeneratorUtil.scala | 13 +- transaction-event-processor/README.md | 65 ----- transaction-event-processor/pom.xml | 269 ------------------ .../src/main/resources/log4j.properties | 14 - .../transaction-event-processor.conf | 25 -- .../job/transaction/domain/CaseClasses.scala | 7 - .../job/transaction/domain/Event.scala | 61 ---- .../functions/AuditEventGenerator.scala | 49 ---- .../functions/AuditHistoryIndexer.scala | 44 --- .../TransactionEventProcessorService.scala | 240 ---------------- .../TransactionEventProcessorConfig.scala | 56 ---- .../TransactionEventProcessorStreamTask.scala | 72 ----- .../src/test/resources/logback-test.xml | 16 -- .../src/test/resources/test.conf | 25 -- .../sunbird/job/fixture/EventFixture.scala | 64 ----- ...ransactionEventProcessorTaskTestSpec.scala | 207 -------------- ...sactionEventProcessorServiceTestSpec.scala | 158 ---------- 36 files changed, 38 insertions(+), 1463 deletions(-) rename audit-event-generator/src/main/scala/org/sunbird/job/{transaction => auditevent}/domain/Event.scala (96%) rename audit-event-generator/src/main/scala/org/sunbird/job/{transaction => auditevent}/functions/AuditEventGenerator.scala (89%) rename audit-event-generator/src/main/scala/org/sunbird/job/{transaction => auditevent}/service/AuditEventGeneratorService.scala (98%) rename audit-event-generator/src/main/scala/org/sunbird/job/{transaction => auditevent}/task/AuditEventGeneratorConfig.scala (97%) rename audit-event-generator/src/main/scala/org/sunbird/job/{transaction => auditevent}/task/AuditEventGeneratorStreamTask.scala (91%) delete mode 100644 transaction-event-processor/README.md delete mode 100644 transaction-event-processor/pom.xml delete mode 100644 transaction-event-processor/src/main/resources/log4j.properties delete mode 100644 transaction-event-processor/src/main/resources/transaction-event-processor.conf delete mode 100644 transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/CaseClasses.scala delete mode 100644 transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/Event.scala delete mode 100644 transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditEventGenerator.scala delete mode 100644 transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditHistoryIndexer.scala delete mode 100644 transaction-event-processor/src/main/scala/org/sunbird/job/transaction/service/TransactionEventProcessorService.scala delete mode 100644 transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorConfig.scala delete mode 100644 transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorStreamTask.scala delete mode 100644 transaction-event-processor/src/test/resources/logback-test.xml delete mode 100644 transaction-event-processor/src/test/resources/test.conf delete mode 100644 transaction-event-processor/src/test/scala/org/sunbird/job/fixture/EventFixture.scala delete mode 100644 transaction-event-processor/src/test/scala/org/sunbird/job/spec/TransactionEventProcessorTaskTestSpec.scala delete mode 100644 transaction-event-processor/src/test/scala/org/sunbird/job/spec/service/TransactionEventProcessorServiceTestSpec.scala diff --git a/audit-event-generator/pom.xml b/audit-event-generator/pom.xml index 55cd7a284..be8ba4e72 100644 --- a/audit-event-generator/pom.xml +++ b/audit-event-generator/pom.xml @@ -27,15 +27,8 @@ org.apache.flink flink-streaming-scala_${scala.version} ${flink.version} - + provided - - - com.google.code.gson - gson - 2.10.1 - - org.sunbird jobs-core @@ -159,7 +152,7 @@ - org.sunbird.job.transaction.task.AuditEventGeneratorStreamTask + org.sunbird.job.auditevent.task.AuditEventGeneratorStreamTask true - - org.jacoco - jacoco-maven-plugin - 0.8.5 - - - - prepare-agent - - - - - report - test - - report - - - - diff --git a/audit-event-generator/src/main/resources/log4j.properties b/audit-event-generator/src/main/resources/log4j.properties index 33e8e93ef..775b16009 100644 --- a/audit-event-generator/src/main/resources/log4j.properties +++ b/audit-event-generator/src/main/resources/log4j.properties @@ -8,7 +8,4 @@ log4j.appender.file.MaxBackupIndex=4 log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Suppress the irrelevant (wrong) warnings from the Netty channel handler -log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file - - - +log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file \ No newline at end of file diff --git a/audit-event-generator/src/main/scala/org/sunbird/job/transaction/domain/Event.scala b/audit-event-generator/src/main/scala/org/sunbird/job/auditevent/domain/Event.scala similarity index 96% rename from audit-event-generator/src/main/scala/org/sunbird/job/transaction/domain/Event.scala rename to audit-event-generator/src/main/scala/org/sunbird/job/auditevent/domain/Event.scala index 547bc5cd5..ebba0b8f4 100644 --- a/audit-event-generator/src/main/scala/org/sunbird/job/transaction/domain/Event.scala +++ b/audit-event-generator/src/main/scala/org/sunbird/job/auditevent/domain/Event.scala @@ -1,4 +1,4 @@ -package org.sunbird.job.transaction.domain +package org.sunbird.job.auditevent.domain import org.apache.commons.lang3.StringUtils import org.sunbird.job.domain.reader.JobRequest diff --git a/audit-event-generator/src/main/scala/org/sunbird/job/transaction/functions/AuditEventGenerator.scala b/audit-event-generator/src/main/scala/org/sunbird/job/auditevent/functions/AuditEventGenerator.scala similarity index 89% rename from audit-event-generator/src/main/scala/org/sunbird/job/transaction/functions/AuditEventGenerator.scala rename to audit-event-generator/src/main/scala/org/sunbird/job/auditevent/functions/AuditEventGenerator.scala index 6242d5385..f6b591f68 100644 --- a/audit-event-generator/src/main/scala/org/sunbird/job/transaction/functions/AuditEventGenerator.scala +++ b/audit-event-generator/src/main/scala/org/sunbird/job/auditevent/functions/AuditEventGenerator.scala @@ -1,13 +1,13 @@ -package org.sunbird.job.transaction.functions +package org.sunbird.job.auditevent.functions import java.util import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.slf4j.LoggerFactory -import org.sunbird.job.transaction.domain.Event -import org.sunbird.job.transaction.service.AuditEventGeneratorService -import org.sunbird.job.transaction.task.AuditEventGeneratorConfig +import org.sunbird.job.auditevent.domain.Event +import org.sunbird.job.auditevent.service.AuditEventGeneratorService +import org.sunbird.job.auditevent.task.AuditEventGeneratorConfig import org.sunbird.job.exception.InvalidEventException import org.sunbird.job.{BaseProcessFunction, Metrics} diff --git a/audit-event-generator/src/main/scala/org/sunbird/job/transaction/service/AuditEventGeneratorService.scala b/audit-event-generator/src/main/scala/org/sunbird/job/auditevent/service/AuditEventGeneratorService.scala similarity index 98% rename from audit-event-generator/src/main/scala/org/sunbird/job/transaction/service/AuditEventGeneratorService.scala rename to audit-event-generator/src/main/scala/org/sunbird/job/auditevent/service/AuditEventGeneratorService.scala index 0b2039a70..aab6acaa7 100644 --- a/audit-event-generator/src/main/scala/org/sunbird/job/transaction/service/AuditEventGeneratorService.scala +++ b/audit-event-generator/src/main/scala/org/sunbird/job/auditevent/service/AuditEventGeneratorService.scala @@ -1,14 +1,14 @@ -package org.sunbird.job.transaction.service +package org.sunbird.job.auditevent.service import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.commons.lang3.StringUtils import org.slf4j.LoggerFactory import org.sunbird.job.Metrics -import org.sunbird.job.transaction.task.AuditEventGeneratorConfig +import org.sunbird.job.auditevent.task.AuditEventGeneratorConfig import org.sunbird.job.util.JSONUtil import org.sunbird.telemetry.TelemetryGenerator import org.sunbird.telemetry.TelemetryParams -import org.sunbird.job.transaction.domain.Event +import org.sunbird.job.auditevent.domain.Event import org.sunbird.job.domain.`object`.{DefinitionCache, ObjectDefinition} import com.google.gson.Gson import org.sunbird.job.exception.InvalidEventException diff --git a/audit-event-generator/src/main/scala/org/sunbird/job/transaction/task/AuditEventGeneratorConfig.scala b/audit-event-generator/src/main/scala/org/sunbird/job/auditevent/task/AuditEventGeneratorConfig.scala similarity index 97% rename from audit-event-generator/src/main/scala/org/sunbird/job/transaction/task/AuditEventGeneratorConfig.scala rename to audit-event-generator/src/main/scala/org/sunbird/job/auditevent/task/AuditEventGeneratorConfig.scala index 01ae5849f..2ca0ac023 100644 --- a/audit-event-generator/src/main/scala/org/sunbird/job/transaction/task/AuditEventGeneratorConfig.scala +++ b/audit-event-generator/src/main/scala/org/sunbird/job/auditevent/task/AuditEventGeneratorConfig.scala @@ -1,4 +1,4 @@ -package org.sunbird.job.transaction.task +package org.sunbird.job.auditevent.task import java.util import com.typesafe.config.Config diff --git a/audit-event-generator/src/main/scala/org/sunbird/job/transaction/task/AuditEventGeneratorStreamTask.scala b/audit-event-generator/src/main/scala/org/sunbird/job/auditevent/task/AuditEventGeneratorStreamTask.scala similarity index 91% rename from audit-event-generator/src/main/scala/org/sunbird/job/transaction/task/AuditEventGeneratorStreamTask.scala rename to audit-event-generator/src/main/scala/org/sunbird/job/auditevent/task/AuditEventGeneratorStreamTask.scala index 7684c6880..0898de6b3 100644 --- a/audit-event-generator/src/main/scala/org/sunbird/job/transaction/task/AuditEventGeneratorStreamTask.scala +++ b/audit-event-generator/src/main/scala/org/sunbird/job/auditevent/task/AuditEventGeneratorStreamTask.scala @@ -1,4 +1,4 @@ -package org.sunbird.job.transaction.task +package org.sunbird.job.auditevent.task import java.io.File import java.util @@ -9,15 +9,14 @@ import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.sunbird.job.connector.FlinkKafkaConnector -import org.sunbird.job.transaction.domain.Event -import org.sunbird.job.transaction.functions.AuditEventGenerator +import org.sunbird.job.auditevent.domain.Event +import org.sunbird.job.auditevent.functions.AuditEventGenerator import org.sunbird.job.util.{FlinkUtil, HttpUtil} class AuditEventGeneratorStreamTask(config: AuditEventGeneratorConfig, kafkaConnector: FlinkKafkaConnector) { def process(): Unit = { implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(config) -// implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment() implicit val eventTypeInfo: TypeInformation[Event] = TypeExtractor.getForClass(classOf[Event]) implicit val mapTypeInfo: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]]) implicit val stringTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String]) diff --git a/audit-event-generator/src/test/scala/org/sunbird/job/fixture/EventFixture.scala b/audit-event-generator/src/test/scala/org/sunbird/job/fixture/EventFixture.scala index 0385a90d9..08797b294 100644 --- a/audit-event-generator/src/test/scala/org/sunbird/job/fixture/EventFixture.scala +++ b/audit-event-generator/src/test/scala/org/sunbird/job/fixture/EventFixture.scala @@ -14,7 +14,7 @@ object EventFixture { val EVENT_3: String = """ - |{"ets":1552464380225,"channel":"in.ekstep","transactionData":{"properties":{"s3Key":{"ov":null,"nv":"content/do_11271778298376192013/artifact/pdf_1552464372724.pdf"},"size":{"ov":null,"nv":433994.0},"artifactUrl":{"ov":null,"nv":"https://sunbirddevbbpublic.blob.core.windows.net/sunbird-content-staging-knowlg/content/assets/do_21385816669265920016/pdf_233.pdf"},"lastUpdatedOn":{"ov":"2019-03-13T13:25:43.129+0530","nv":"2019-03-13T13:36:20.093+0530"},"versionKey":{"ov":"1552463743129","nv":"1552464380093"}}},"label":"Resource Content 1","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2019-03-13T13:36:20.223+0530","objectType":"Content","nodeUniqueId":"do_11271778298376192013","requestId":null,"operationType":"UPDATE","nodeGraphId":590883,"graphId":"domain"} + |{"ets":1552464380225,"channel":"in.ekstep","transactionData":{"properties":{"s3Key":{"ov":null,"nv":"content/do_11271778298376192013/artifact/pdf_1552464372724.pdf"},"size":{"ov":null,"nv":433994.0},"artifactUrl":{"ov":null,"nv":"https://ekstep-public-dev.s3-ap-south-1.amazonaws.com/content/do_11271778298376192013/artifact/pdf_1552464372724.pdf"},"lastUpdatedOn":{"ov":"2019-03-13T13:25:43.129+0530","nv":"2019-03-13T13:36:20.093+0530"},"versionKey":{"ov":"1552463743129","nv":"1552464380093"}}},"label":"Resource Content 1","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2019-03-13T13:36:20.223+0530","objectType":"Content","nodeUniqueId":"do_11271778298376192013","requestId":null,"operationType":"UPDATE","nodeGraphId":590883,"graphId":"domain"} |""".stripMargin val EVENT_4: String = diff --git a/audit-event-generator/src/test/scala/org/sunbird/job/spec/AuditEventGeneratorTaskTestSpec.scala b/audit-event-generator/src/test/scala/org/sunbird/job/spec/AuditEventGeneratorTaskTestSpec.scala index 46cd46469..66e533da5 100644 --- a/audit-event-generator/src/test/scala/org/sunbird/job/spec/AuditEventGeneratorTaskTestSpec.scala +++ b/audit-event-generator/src/test/scala/org/sunbird/job/spec/AuditEventGeneratorTaskTestSpec.scala @@ -13,9 +13,9 @@ import org.sunbird.job.util.JSONUtil import org.mockito.Mockito import org.mockito.Mockito.when import org.sunbird.job.connector.FlinkKafkaConnector -import org.sunbird.job.transaction.domain.Event +import org.sunbird.job.auditevent.domain.Event import org.sunbird.job.fixture.EventFixture -import org.sunbird.job.transaction.task.{AuditEventGeneratorConfig, AuditEventGeneratorStreamTask} +import org.sunbird.job.auditevent.task.{AuditEventGeneratorConfig, AuditEventGeneratorStreamTask} import org.sunbird.spec.{BaseMetricsReporter, BaseTestSpec} class AuditEventGeneratorTaskTestSpec extends BaseTestSpec { diff --git a/audit-event-generator/src/test/scala/org/sunbird/job/spec/service/AuditEventGeneratorServiceTestSpec.scala b/audit-event-generator/src/test/scala/org/sunbird/job/spec/service/AuditEventGeneratorServiceTestSpec.scala index e7b30987b..8eb00e3bf 100644 --- a/audit-event-generator/src/test/scala/org/sunbird/job/spec/service/AuditEventGeneratorServiceTestSpec.scala +++ b/audit-event-generator/src/test/scala/org/sunbird/job/spec/service/AuditEventGeneratorServiceTestSpec.scala @@ -5,10 +5,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor import org.mockito.Mockito import org.sunbird.job.Metrics -import org.sunbird.job.transaction.domain.Event +import org.sunbird.job.auditevent.domain.Event import org.sunbird.job.fixture.EventFixture -import org.sunbird.job.transaction.functions.AuditEventGenerator -import org.sunbird.job.transaction.task.AuditEventGeneratorConfig +import org.sunbird.job.auditevent.functions.AuditEventGenerator +import org.sunbird.job.auditevent.task.AuditEventGeneratorConfig import org.sunbird.job.util.JSONUtil import org.sunbird.spec.BaseTestSpec diff --git a/audit-history-indexer/pom.xml b/audit-history-indexer/pom.xml index e4d04001a..45e056768 100644 --- a/audit-history-indexer/pom.xml +++ b/audit-history-indexer/pom.xml @@ -27,7 +27,7 @@ org.apache.flink flink-streaming-scala_${scala.version} ${flink.version} - + provided org.sunbird diff --git a/audit-history-indexer/src/main/scala/org/sunbird/job/audithistory/task/AuditHistoryIndexerConfig.scala b/audit-history-indexer/src/main/scala/org/sunbird/job/audithistory/task/AuditHistoryIndexerConfig.scala index e6d93042a..49a9480e7 100644 --- a/audit-history-indexer/src/main/scala/org/sunbird/job/audithistory/task/AuditHistoryIndexerConfig.scala +++ b/audit-history-indexer/src/main/scala/org/sunbird/job/audithistory/task/AuditHistoryIndexerConfig.scala @@ -1,10 +1,10 @@ package org.sunbird.job.audithistory.task +import com.typesafe.config.Config import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor import org.sunbird.job.BaseJobConfig -import com.typesafe.config.Config import java.util class AuditHistoryIndexerConfig(override val config: Config) extends BaseJobConfig(config, "audit-history-indexer") { diff --git a/audit-history-indexer/src/main/scala/org/sunbird/job/audithistory/task/AuditHistoryIndexerStreamTask.scala b/audit-history-indexer/src/main/scala/org/sunbird/job/audithistory/task/AuditHistoryIndexerStreamTask.scala index 87d55577f..8833666b0 100644 --- a/audit-history-indexer/src/main/scala/org/sunbird/job/audithistory/task/AuditHistoryIndexerStreamTask.scala +++ b/audit-history-indexer/src/main/scala/org/sunbird/job/audithistory/task/AuditHistoryIndexerStreamTask.scala @@ -18,7 +18,6 @@ import java.util class AuditHistoryIndexerStreamTask(config: AuditHistoryIndexerConfig, kafkaConnector: FlinkKafkaConnector, esUtil: ElasticSearchUtil) { def process(): Unit = { implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(config) -// implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment() implicit val eventTypeInfo: TypeInformation[Event] = TypeExtractor.getForClass(classOf[Event]) implicit val mapTypeInfo: TypeInformation[util.Map[String, Any]] = TypeExtractor.getForClass(classOf[util.Map[String, Any]]) implicit val stringTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String]) diff --git a/pom.xml b/pom.xml index fbe22157a..3de752d78 100644 --- a/pom.xml +++ b/pom.xml @@ -8,12 +8,8 @@ 3.0.0 - - transaction-event-processor - newMod - - org.sunbird + org.sunbird knowledge-platform-jobs 1.0 pom diff --git a/publish-pipeline/content-publish/src/test/scala/org/sunbird/job/publish/helpers/spec/ContentPublisherSpec.scala b/publish-pipeline/content-publish/src/test/scala/org/sunbird/job/publish/helpers/spec/ContentPublisherSpec.scala index 1dd5e99ca..eb77995e4 100644 --- a/publish-pipeline/content-publish/src/test/scala/org/sunbird/job/publish/helpers/spec/ContentPublisherSpec.scala +++ b/publish-pipeline/content-publish/src/test/scala/org/sunbird/job/publish/helpers/spec/ContentPublisherSpec.scala @@ -184,13 +184,13 @@ class ContentPublisherSpec extends FlatSpec with BeforeAndAfterAll with Matchers } "validateMetadata with mimeType application/msword and .pptx " should " not return exception messages if content is having valid artifactUrl" in { - val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://sunbirddev.blob.core.windows.net/sunbird-content-dev/content/do_112216616320983040129/artifact/performance_out_1491286194831.pptx"), None) + val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://ekstep-public-dev.s3-ap-south-1.amazonaws.com/content/do_112216616320983040129/artifact/performance_out_1491286194831.pptx"), None) val result: List[String] = new TestContentPublisher().validateMetadata(data, data.identifier, jobConfig) result.size should be(0) } "validateMetadata with mimeType application/msword and .docx " should " not return exception messages if content is having valid artifactUrl" in { - val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://sunbirddev.blob.core.windows.net/sunbird-content-dev/content/do_112216615190192128128/artifact/prdassetstagging-2_1491286084107.docx"), None) + val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://ekstep-public-dev.s3-ap-south-1.amazonaws.com/content/do_112216615190192128128/artifact/prdassetstagging-2_1491286084107.docx"), None) val result: List[String] = new TestContentPublisher().validateMetadata(data, data.identifier, jobConfig) result.size should be(0) } diff --git a/publish-pipeline/live-node-publisher/src/test/scala/org/sunbird/job/livenodepublisher/publish/helpers/spec/LiveContentPublisherSpec.scala b/publish-pipeline/live-node-publisher/src/test/scala/org/sunbird/job/livenodepublisher/publish/helpers/spec/LiveContentPublisherSpec.scala index 1b61077eb..a75132b84 100644 --- a/publish-pipeline/live-node-publisher/src/test/scala/org/sunbird/job/livenodepublisher/publish/helpers/spec/LiveContentPublisherSpec.scala +++ b/publish-pipeline/live-node-publisher/src/test/scala/org/sunbird/job/livenodepublisher/publish/helpers/spec/LiveContentPublisherSpec.scala @@ -184,13 +184,13 @@ class LiveContentPublisherSpec extends FlatSpec with BeforeAndAfterAll with Matc } "validateMetadata with mimeType application/msword and .pptx " should " not return exception messages if content is having valid artifactUrl" in { - val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://sunbirddev.blob.core.windows.net/sunbird-content-dev/content/do_112216616320983040129/artifact/performance_out_1491286194831.pptx"), None) + val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://ekstep-public-dev.s3-ap-south-1.amazonaws.com/content/do_112216616320983040129/artifact/performance_out_1491286194831.pptx"), None) val result: List[String] = new TestLiveContentPublisher().validateMetadata(data, data.identifier, jobConfig) result.size should be(0) } "validateMetadata with mimeType application/msword and .docx " should " not return exception messages if content is having valid artifactUrl" in { - val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://sunbirddev.blob.core.windows.net/sunbird-content-dev/content/do_112216615190192128128/artifact/prdassetstagging-2_1491286084107.docx"), None) + val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://ekstep-public-dev.s3-ap-south-1.amazonaws.com/content/do_112216615190192128128/artifact/prdassetstagging-2_1491286084107.docx"), None) val result: List[String] = new TestLiveContentPublisher().validateMetadata(data, data.identifier, jobConfig) result.size should be(0) } diff --git a/publish-pipeline/pom.xml b/publish-pipeline/pom.xml index 0546c73bb..8a0824a30 100644 --- a/publish-pipeline/pom.xml +++ b/publish-pipeline/pom.xml @@ -19,12 +19,6 @@ content-publish live-node-publisher - - org.apache.flink - flink-clients_${scala.version} - ${flink.version} - - diff --git a/qrcode-image-generator/src/main/scala/org/sunbird/job/qrimagegenerator/functions/QRCodeImageGeneratorFunction.scala b/qrcode-image-generator/src/main/scala/org/sunbird/job/qrimagegenerator/functions/QRCodeImageGeneratorFunction.scala index 348ab95f2..66e346823 100644 --- a/qrcode-image-generator/src/main/scala/org/sunbird/job/qrimagegenerator/functions/QRCodeImageGeneratorFunction.scala +++ b/qrcode-image-generator/src/main/scala/org/sunbird/job/qrimagegenerator/functions/QRCodeImageGeneratorFunction.scala @@ -104,16 +104,11 @@ class QRCodeImageGeneratorFunction(config: QRCodeImageGeneratorConfig, val zipDownloadUrl = cloudStorageUtil.uploadFile(event.storagePath.replace("/", ""), zipFile, Some(false), container = event.storageContainer) logger.info("QRCodeImageGeneratorService:processMessage: zipDownloadUrl - " + zipDownloadUrl(1)) logger.info("QRCodeImageGeneratorService:processMessage: config.cloudStorageEndpoint - " + config.cloudStorageEndpoint+" config.cloudStorageProxyHost - "+config.cloudStorageProxyHost) + var newDownloadUrl = zipDownloadUrl(1).replaceAll(config.cloudStorageEndpoint, config.cloudStorageProxyHost) + logger.info("QRCodeImageGeneratorService:processMessage: newDownloadUrl - " + newDownloadUrl) metrics.incCounter(config.cloudDbHitCount) - if(config.cloudStorageEndpoint.nonEmpty){ - var newDownloadUrl = zipDownloadUrl(1).replaceAll(config.cloudStorageEndpoint, config.cloudStorageProxyHost) - logger.info("QRCodeImageGeneratorService:processMessage: newDownloadUrl - " + newDownloadUrl) - logger.info("QRCodeImageGeneratorService:processMessage: event - " + event) - qRCodeImageGeneratorUtil.updateCassandra(config.cassandraDialCodeBatchTable, 2, newDownloadUrl, "processid", event.processId, metrics) - } else { - qRCodeImageGeneratorUtil.updateCassandra(config.cassandraDialCodeBatchTable, 2, zipDownloadUrl(1), "processid", event.processId, metrics) - } - + logger.info("QRCodeImageGeneratorService:processMessage: event - " + event) + qRCodeImageGeneratorUtil.updateCassandra(config.cassandraDialCodeBatchTable, 2, newDownloadUrl, "processid", event.processId, metrics) } else { logger.info("QRCodeImageGeneratorService:processMessage: Skipping zip creation due to missing processId.") diff --git a/qrcode-image-generator/src/main/scala/org/sunbird/job/qrimagegenerator/task/QRCodeImageGeneratorConfig.scala b/qrcode-image-generator/src/main/scala/org/sunbird/job/qrimagegenerator/task/QRCodeImageGeneratorConfig.scala index 3fe4f5b10..2572a9da4 100644 --- a/qrcode-image-generator/src/main/scala/org/sunbird/job/qrimagegenerator/task/QRCodeImageGeneratorConfig.scala +++ b/qrcode-image-generator/src/main/scala/org/sunbird/job/qrimagegenerator/task/QRCodeImageGeneratorConfig.scala @@ -1,7 +1,6 @@ package org.sunbird.job.qrimagegenerator.task import com.typesafe.config.Config -import org.apache.commons.lang3.StringUtils import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.streaming.api.scala.OutputTag @@ -59,7 +58,7 @@ class QRCodeImageGeneratorConfig(override val config: Config) extends BaseJobCon val cassandraDialCodeImageTable: String = config.getString("lms-cassandra.table.image") val cassandraDialCodeBatchTable: String = config.getString("lms-cassandra.table.batch") - val cloudStorageEndpoint: String = if (config.hasPath("cloud_storage_endpoint")) config.getString("cloud_storage_endpoint") else "" - val cloudStorageProxyHost: String = if (config.hasPath("cloud_storage_proxy_host")) config.getString("cloud_storage_endpoint") else "" + val cloudStorageEndpoint: String = config.getString("cloud_storage_endpoint") + val cloudStorageProxyHost: String = config.getString("cloud_storage_proxy_host") val indexImageURL: Boolean = if (config.hasPath("qr.image.indexImageUrl")) config.getBoolean("qr.image.indexImageUrl") else true } diff --git a/qrcode-image-generator/src/main/scala/org/sunbird/job/qrimagegenerator/util/QRCodeImageGeneratorUtil.scala b/qrcode-image-generator/src/main/scala/org/sunbird/job/qrimagegenerator/util/QRCodeImageGeneratorUtil.scala index 22aa842db..4b0d9b41e 100644 --- a/qrcode-image-generator/src/main/scala/org/sunbird/job/qrimagegenerator/util/QRCodeImageGeneratorUtil.scala +++ b/qrcode-image-generator/src/main/scala/org/sunbird/job/qrimagegenerator/util/QRCodeImageGeneratorUtil.scala @@ -60,15 +60,10 @@ class QRCodeImageGeneratorUtil(config: QRCodeImageGeneratorConfig, cassandraUtil logger.info("QRCodeImageGeneratorUtil:createQRImages: path after - " + path.replace("/", "")) val imageDownloadUrl = cloudStorageUtil.uploadFile(path.replace("/", ""), finalImageFile, Some(false), container = container) logger.info("QRCodeImageGeneratorUtil:createQRImages: imageDownloadUrl - " + imageDownloadUrl(1)) - if(config.cloudStorageEndpoint.nonEmpty){ - logger.info("QRCodeImageGeneratorUtil:createQRImages: config.cloudStorageEndpoint - " + config.cloudStorageEndpoint+" config.cloudStorageProxyHost - "+config.cloudStorageProxyHost) - var newDownloadUrl = imageDownloadUrl(1).replaceAll(config.cloudStorageEndpoint, config.cloudStorageProxyHost) - logger.info("QRCodeImageGeneratorService:processMessage: newDownloadUrl before - " + newDownloadUrl) - updateCassandra(config.cassandraDialCodeImageTable, 2, newDownloadUrl, "filename", fileName, metrics) - } else { - updateCassandra(config.cassandraDialCodeImageTable, 2, imageDownloadUrl(1), "filename", fileName, metrics) - } - + logger.info("QRCodeImageGeneratorUtil:createQRImages: config.cloudStorageEndpoint - " + config.cloudStorageEndpoint+" config.cloudStorageProxyHost - "+config.cloudStorageProxyHost) + var newDownloadUrl = imageDownloadUrl(1).replaceAll(config.cloudStorageEndpoint, config.cloudStorageProxyHost) + logger.info("QRCodeImageGeneratorService:processMessage: newDownloadUrl before - " + newDownloadUrl) + updateCassandra(config.cassandraDialCodeImageTable, 2, newDownloadUrl, "filename", fileName, metrics) } catch { case e: Exception => metrics.incCounter(config.dbFailureEventCount) diff --git a/transaction-event-processor/README.md b/transaction-event-processor/README.md deleted file mode 100644 index 1521e3611..000000000 --- a/transaction-event-processor/README.md +++ /dev/null @@ -1,65 +0,0 @@ -# Audit Event Generator - -Audit Event Generator job generates an audit event for every transaction in Graph updation for content data modification - -## Getting Started - -These instructions will get you a copy of the project up and running on your local machine for development and testing purposes. See deployment for notes on how to deploy the project on a yarn or kubernetes. -### Prerequisites - -1. Download flink-1.13.6-scala_2.12 from [apache-flink-downloads](https://www.apache.org/dyn/closer.lua/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz). -2. Download [hadoop dependencies](https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar) (only for running on Yarn). Copy the hadoop dependency jar under lib folder of the flink download. -3. export HADOOP_CLASSPATH=`/hadoop classpath` either in .bashrc or current execution shell. -4. Docker installed. -5. A running yarn cluster or a kubernetes cluster. - -### Build - -mvn clean install - -## Deployment - -### Yarn - -Flink requires memory to be allocated for both job-manager and task manager. -yjm parameter assigns job-manager memory and -ytm assigns task-manager memory. - -``` -./bin/flink run -m yarn-cluster -p 2 -yjm 1024m -ytm 1024m /audit-event-generator/target/audit-event-generator-0.0.1.jar -``` - -### Kubernetes - -``` -# Create a single node cluster -k3d create --server-arg --no-deploy --server-arg traefik --name flink-cluster --image rancher/k3s:v1.0.0 -# Export the single node cluster into KUBECONFIG in the current shell or in ~/.bashrc. -export KUBECONFIG="$(k3d get-kubeconfig --name='flink-cluster')" - -# Only for Mac OSX -# /usr/local/bin/kubectl -> /Applications/Docker.app/Contents/Resources/bin/kubectl -rm /usr/local/bin/kubectl -brew link --overwrite kubernetes-cli - -# Create a configmap using the flink-configuration-configmap.yaml -kubectl create -f knowledge-platform-job/kubernetes/flink-configuration-configmap.yaml - -# Create pods for jobmanager-service, job-manager and task-manager using the yaml files -kubectl create -f knowledge-platform-job/kubernetes/jobmanager-service.yaml -kubectl create -f knowledge-platform-job/kubernetes/jobmanager-deployment.yaml -kubectl create -f knowledge-platform-job/kubernetes/taskmanager-deployment.yaml - -# Create a port-forwarding for accessing the job-manager UI on localhost:8081 -kubectl port-forward deployment/flink-jobmanager 8081:8081 - -# Submit the job to the Kubernetes single node cluster flink-cluster -./bin/flink run -m localhost:8081 /audit-event-generator/target/audit-event-generator-0.0.1.jar - -# Commands to delete the pods created in the cluster -kubectl delete deployment/flink-jobmanager -kubectl delete deployment/flink-taskmanager -kubectl delete service/flink-jobmanager -kubectl delete configmaps/flink-config - -# Command to stop the single-node cluster -k3d stop --name="flink-cluster" -``` diff --git a/transaction-event-processor/pom.xml b/transaction-event-processor/pom.xml deleted file mode 100644 index e5a489cd4..000000000 --- a/transaction-event-processor/pom.xml +++ /dev/null @@ -1,269 +0,0 @@ - - - - 4.0.0 - - org.sunbird - knowledge-platform-jobs - 1.0 - - transaction-event-processor - 1.0.0 - jar - transaction-event-processor - - Transaction Event Processor Flink Job - - - - UTF-8 - 1.4.0 - - - - - org.apache.flink - flink-streaming-scala_${scala.version} - ${flink.version} - - - - - com.google.code.gson - gson - 2.10.1 - - - - org.sunbird - jobs-core - 1.0.0 - - - joda-time - joda-time - 2.10.6 - - - org.sunbird - jobs-core - 1.0.0 - test-jar - test - - - org.apache.flink - flink-test-utils_${scala.version} - ${flink.version} - test - - - org.apache.flink - flink-runtime_${scala.version} - ${flink.version} - test - tests - - - org.apache.flink - flink-streaming-java_${scala.version} - ${flink.version} - test - tests - - - org.scalatest - scalatest_${scala.version} - 3.0.6 - test - - - org.mockito - mockito-core - 3.3.3 - test - - - org.sunbird - platform-common - 1.0-beta - - - com.fasterxml.jackson.core - jackson-databind - - - com.fasterxml.jackson.core - jackson-core - - - org.apache.logging.log4j - log4j-core - - - org.apache.logging.log4j - log4j-api - - - - - org.sunbird - platform-telemetry - 1.0-beta - - - com.squareup.okhttp3 - mockwebserver - 4.9.1 - test - - - - - src/main/scala - src/test/scala - - - org.apache.maven.plugins - maven-compiler-plugin - 3.8.1 - - 11 - - - - org.apache.maven.plugins - maven-shade-plugin - 3.2.1 - - - - package - - shade - - - - - com.google.code.findbugs:jsr305 - - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - org.sunbird.job.transaction.task.TransactionEventProcessorStreamTask - - - - reference.conf - - - - - - - - - net.alchim31.maven - scala-maven-plugin - 4.4.0 - - 11 - 11 - ${scala.maj.version} - false - - - - scala-compile-first - process-resources - - add-source - compile - - - - scala-test-compile - process-test-resources - - testCompile - - - - - - - maven-surefire-plugin - 2.22.2 - - true - - - - - org.scalatest - scalatest-maven-plugin - 1.0 - - ${project.build.directory}/surefire-reports - . - audit-event-generator-testsuite.txt - - - - test - - test - - - - - - org.scoverage - scoverage-maven-plugin - ${scoverage.plugin.version} - - ${scala.version} - true - true - - - - org.jacoco - jacoco-maven-plugin - 0.8.5 - - - - prepare-agent - - - - - report - test - - report - - - - - - - - diff --git a/transaction-event-processor/src/main/resources/log4j.properties b/transaction-event-processor/src/main/resources/log4j.properties deleted file mode 100644 index 33e8e93ef..000000000 --- a/transaction-event-processor/src/main/resources/log4j.properties +++ /dev/null @@ -1,14 +0,0 @@ -# log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file=org.apache.log4j.RollingFileAppender -log4j.appender.file.file=audit-event-generator.log -log4j.appender.file.append=true -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.MaxFileSize=256KB -log4j.appender.file.MaxBackupIndex=4 -log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n - -# Suppress the irrelevant (wrong) warnings from the Netty channel handler -log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file - - - diff --git a/transaction-event-processor/src/main/resources/transaction-event-processor.conf b/transaction-event-processor/src/main/resources/transaction-event-processor.conf deleted file mode 100644 index 9275a65b3..000000000 --- a/transaction-event-processor/src/main/resources/transaction-event-processor.conf +++ /dev/null @@ -1,25 +0,0 @@ -include "base-config.conf" - -kafka { - input.topic = "sunbirddev.learning.graph.events" - output.topic = "sunbirddev.telemetry.raw" - groupId = "sunbirddev-transaction-event-processor-group" -} - -task { - consumer.parallelism = 1 - parallelism = 1 - producer.parallelism = 1 - window.time = 60 -} - -schema { - basePath = "https://sunbirddev.blob.core.windows.net/sunbird-content-dev/schemas/local" -} - -channel.default = "org.sunbird" - -job { - audit-event-generator = true - audit-history-indexer = true -} diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/CaseClasses.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/CaseClasses.scala deleted file mode 100644 index e6dbfa930..000000000 --- a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/CaseClasses.scala +++ /dev/null @@ -1,7 +0,0 @@ -package org.sunbird.job.transaction.domain - -import java.io.Serializable -import java.util.Date - -@SerialVersionUID(-5779950964487302125L) -case class AuditHistoryRecord(var objectId: String, objectType: String, label: String, graphId: String, var userId: String, requestId: String, logRecord: String, operation: String, createdOn: Date) extends Serializable \ No newline at end of file diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/Event.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/Event.scala deleted file mode 100644 index c73f5d21a..000000000 --- a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/domain/Event.scala +++ /dev/null @@ -1,61 +0,0 @@ - -package org.sunbird.job.transaction.domain - -import org.apache.commons.lang3.StringUtils -import org.sunbird.job.domain.reader.JobRequest - -import java.text.{DateFormat, SimpleDateFormat} -import java.util -import java.util.Date - -class Event(eventMap: java.util.Map[String, Any], partition: Int, offset: Long) extends JobRequest(eventMap, partition, offset) { - - private val df:DateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); - - private val IMAGE_SUFFIX = ".img" - - private val jobName = "TransactionEventProcessor" - - def id: String = readOrDefault("nodeUniqueId", "") - - def operationType: String = readOrDefault("operationType", null) - - def nodeUniqueId: String = readOrDefault("nodeUniqueId", "") - - def createdOn: String = readOrDefault("createdOn", "") - - def channelId(channel: String): String = readOrDefault("channel", channel) - - def objectId: String = if (null != nodeUniqueId) nodeUniqueId.replaceAll(IMAGE_SUFFIX, "") else nodeUniqueId - - def objectType: String = readOrDefault[String]("objectType", null) - - def userId: String = readOrDefault[String]("userId", "") - - def transactionData: Map[String, AnyRef] = { - readOrDefault("transactionData", new util.HashMap[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]] - } - - def nodeType: String = readOrDefault("nodeType", "") - - def ets: Long = readOrDefault("ets", 0L) - - def label: String = readOrDefault("label", "") - - def graphId: String = readOrDefault("graphId", "") - - def requestId: String = readOrDefault("requestId", "") - - def syncMessage: String = readOrDefault("syncMessage", null) - - def createdOnDate: Date = if (createdOn.isEmpty) new Date else df.parse(createdOn) - - def audit: Boolean = readOrDefault("audit", true) - - def isValid: Boolean = { - StringUtils.isNotBlank(objectType) || - operationType != null && null == syncMessage && audit - } - - -} diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditEventGenerator.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditEventGenerator.scala deleted file mode 100644 index a096260e8..000000000 --- a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditEventGenerator.scala +++ /dev/null @@ -1,49 +0,0 @@ -package org.sunbird.job.transaction.functions - -import java.util -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.configuration.Configuration -import org.apache.flink.streaming.api.functions.ProcessFunction -import org.slf4j.LoggerFactory -import org.sunbird.job.transaction.domain.Event -import org.sunbird.job.transaction.service.TransactionEventProcessorService -import org.sunbird.job.transaction.task.TransactionEventProcessorConfig -import org.sunbird.job.exception.InvalidEventException -import org.sunbird.job.{BaseProcessFunction, Metrics} - -class AuditEventGenerator(config: TransactionEventProcessorConfig) - (implicit mapTypeInfo: TypeInformation[util.Map[String, AnyRef]], - stringTypeInfo: TypeInformation[String]) - extends BaseProcessFunction[Event, String](config) with TransactionEventProcessorService{ - - private[this] lazy val logger = LoggerFactory.getLogger(classOf[AuditEventGenerator]) - - override def metricsList(): List[String] = { - List(config.totalEventsCount, config.successEventCount, config.failedEventCount, config.skippedEventCount, config.emptySchemaEventCount, config.emptyPropsEventCount) - } - - override def open(parameters: Configuration): Unit = { - super.open(parameters) - } - - override def close(): Unit = { - super.close() - } - - @throws(classOf[InvalidEventException]) - override def processElement(event: Event, - context: ProcessFunction[Event, String]#Context, - metrics: Metrics): Unit = { - try { - metrics.incCounter(config.totalEventsCount) - if(event.isValid) { - logger.info("valid event: "+event.nodeUniqueId) - processEvent(event, context, metrics)(config) - } else metrics.incCounter(config.skippedEventCount) - } catch { - case ex: Exception => - metrics.incCounter(config.failedEventCount) - throw new InvalidEventException(ex.getMessage, Map("partition" -> event.partition,"offset" -> event.offset),ex) - } - } -} diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditHistoryIndexer.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditHistoryIndexer.scala deleted file mode 100644 index fe6fe2ae0..000000000 --- a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/functions/AuditHistoryIndexer.scala +++ /dev/null @@ -1,44 +0,0 @@ -package org.sunbird.job.transaction.functions - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.configuration.Configuration -import org.apache.flink.streaming.api.functions.KeyedProcessFunction -import org.sunbird.job.transaction.domain.Event -import org.sunbird.job.transaction.service.TransactionEventProcessorService -import org.sunbird.job.transaction.task.TransactionEventProcessorConfig -import org.sunbird.job.util.ElasticSearchUtil -import org.sunbird.job.{BaseProcessKeyedFunction, Metrics} - -import java.util - -class AuditHistoryIndexer(config: TransactionEventProcessorConfig, var esUtil: ElasticSearchUtil) - (implicit mapTypeInfo: TypeInformation[util.Map[String, Any]], - stringTypeInfo: TypeInformation[String]) - extends BaseProcessKeyedFunction[String, Event, String](config) with TransactionEventProcessorService{ - - - override def metricsList(): List[String] = { - List(config.totalEventsCount, config.successEventCount, config.failedEventCount, config.esFailedEventCount, config.skippedEventCount) - } - - override def open(parameters: Configuration): Unit = { - super.open(parameters) - if (esUtil == null) { - esUtil = new ElasticSearchUtil(config.esConnectionInfo, config.auditHistoryIndex, config.auditHistoryIndexType) - } - } - - override def close(): Unit = { - esUtil.close() - super.close() - } - - override def processElement(event: Event, - context: KeyedProcessFunction[String, Event, String]#Context, - metrics: Metrics): Unit = { - metrics.incCounter(config.totalEventsCount) - if(event.isValid) { - processEvent(event, metrics)(esUtil, config) - } else metrics.incCounter(config.skippedEventCount) - } -} diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/service/TransactionEventProcessorService.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/service/TransactionEventProcessorService.scala deleted file mode 100644 index 7b4f043c5..000000000 --- a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/service/TransactionEventProcessorService.scala +++ /dev/null @@ -1,240 +0,0 @@ -package org.sunbird.job.transaction.service -import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.commons.lang3.StringUtils -import org.slf4j.LoggerFactory -import org.sunbird.job.Metrics -import org.sunbird.job.util.{ElasticSearchUtil, JSONUtil} -import org.sunbird.telemetry.TelemetryGenerator -import org.sunbird.telemetry.TelemetryParams -import org.sunbird.job.transaction.domain.{AuditHistoryRecord, Event} -import org.sunbird.job.domain.`object`.{DefinitionCache, ObjectDefinition} -import com.google.gson.Gson -import org.sunbird.job.exception.InvalidEventException -import org.sunbird.job.transaction.task.TransactionEventProcessorConfig - -import java.io.IOException -import java.util -import java.text.SimpleDateFormat -import java.util.{Calendar, Date, TimeZone} - -trait TransactionEventProcessorService { - private[this] lazy val logger = LoggerFactory.getLogger(classOf[TransactionEventProcessorService]) - private val OBJECT_TYPE_IMAGE_SUFFIX = "Image" - private val SKIP_AUDIT = """{"object": {"type":null}}""" - private lazy val definitionCache = new DefinitionCache - private lazy val gson = new Gson - - private val systemPropsList = List("IL_SYS_NODE_TYPE", "IL_FUNC_OBJECT_TYPE", "IL_UNIQUE_ID", "IL_TAG_NAME", "IL_ATTRIBUTE_NAME", "IL_INDEXABLE_METADATA_KEY", "IL_NON_INDEXABLE_METADATA_KEY", - "IL_IN_RELATIONS_KEY", "IL_OUT_RELATIONS_KEY", "IL_REQUIRED_PROPERTIES", "IL_SYSTEM_TAGS_KEY", "IL_SEQUENCE_INDEX", "SYS_INTERNAL_LAST_UPDATED_ON", "lastUpdatedOn", "versionKey", "lastStatusChangedOn") - - private def getContext(channelId: String, env: String): Map[String, String] = { - val context = Map( - TelemetryParams.ACTOR.name -> "org.ekstep.learning.platform", - TelemetryParams.CHANNEL.name -> channelId, - TelemetryParams.ENV.name -> env - ) - context - } - - @throws(classOf[InvalidEventException]) - def processEvent(message: Event, context: ProcessFunction[Event, String]#Context, metrics: Metrics)(implicit config: TransactionEventProcessorConfig): Unit = { - logger.info("AUDIT Event::" + JSONUtil.serialize(message)) - logger.info("Input Message Received for : [" + message.nodeUniqueId + "], Txn Event createdOn:" + message.createdOn + ", Operation Type:" + message.operationType) - try { - val (auditEventStr, objectType) = getAuditMessage(message)(config, metrics) - if (StringUtils.isNotBlank(objectType)) { - context.output(config.auditOutputTag, auditEventStr) - logger.info("Telemetry Audit Message Successfully Sent for : " + message.objectId + " :: event ::" + auditEventStr) - metrics.incCounter(config.successEventCount) - } - else { - logger.info("Skipped event as the objectype is not available, event =" + auditEventStr) - metrics.incCounter(config.emptyPropsEventCount) - } - } catch { - case e: Exception => - logger.error("Failed to process message :: " + JSONUtil.serialize(message), e) - throw e - } - } - - def getDefinition(objectType: String)(implicit config: TransactionEventProcessorConfig, metrics: Metrics): ObjectDefinition = { - try { - definitionCache.getDefinition(objectType, config.configVersion, config.basePath) - } catch { - case ex: Exception => { - metrics.incCounter(config.emptySchemaEventCount) - new ObjectDefinition(objectType, config.configVersion, Map[String, AnyRef](), Map[String, AnyRef]()) - } - } - } - - - def getAuditMessage(message: Event)(implicit config: TransactionEventProcessorConfig, metrics: Metrics): (String, String) = { - var auditMap: String = null - var objectType = message.objectType - val env = if (null != objectType) objectType.toLowerCase.replace("image", "") else "system" - - val definitionNode: ObjectDefinition = getDefinition(objectType) - - val propertyMap = message.transactionData("properties").asInstanceOf[Map[String, AnyRef]] - val statusMap = propertyMap.getOrElse("status", null).asInstanceOf[Map[String, AnyRef]] - val lastStatusChangedOn = propertyMap.getOrElse("lastStatusChangedOn", null).asInstanceOf[Map[String, AnyRef]] - val addedRelations = message.transactionData.getOrElse("addedRelations", List[Map[String, AnyRef]]()).asInstanceOf[List[Map[String, AnyRef]]] - val removedRelations = message.transactionData.getOrElse("removedRelations", List[Map[String, AnyRef]]()).asInstanceOf[List[Map[String, AnyRef]]] - - var pkgVersion = "" - var prevStatus = "" - var currStatus = "" - var duration = "" - val pkgVerMap = propertyMap.getOrElse("pkgVersion", null).asInstanceOf[Map[String, AnyRef]] - if (null != pkgVerMap) pkgVersion = s"${pkgVerMap.get("nv")}" - - if (null != statusMap) { - prevStatus = statusMap.getOrElse("ov", null).asInstanceOf[String] - currStatus = statusMap.getOrElse("nv", null).asInstanceOf[String] - // Compute Duration for Status Change - if (StringUtils.isNotBlank(currStatus) && StringUtils.isNotBlank(prevStatus) && null != lastStatusChangedOn) { - var ov = lastStatusChangedOn.getOrElse("ov", null).asInstanceOf[String] - val nv = lastStatusChangedOn.getOrElse("nv", null).asInstanceOf[String] - if (null == ov) ov = propertyMap.getOrElse("lastUpdatedOn", null).asInstanceOf[Map[String, AnyRef]].getOrElse("ov", null).asInstanceOf[String] - if (null != ov && null != nv) duration = String.valueOf(computeDuration(ov, nv)) - } - } - - var props: List[String] = propertyMap.keys.toList - props ++= getRelationProps(addedRelations, definitionNode) - props ++= getRelationProps(removedRelations, definitionNode) - val propsExceptSystemProps = props.filter(prop => !systemPropsList.contains(prop)) - val cdata = getCData(addedRelations, removedRelations, propertyMap) - - var context: Map[String, String] = getContext(message.channelId(config.defaultChannel), env) - - objectType = if (null != objectType) objectType.replaceAll(OBJECT_TYPE_IMAGE_SUFFIX, "") else objectType - context ++= Map("objectId" -> message.objectId, "objectType" -> objectType) - - if (StringUtils.isNotBlank(duration)) context ++= Map("duration" -> duration) - if (StringUtils.isNotBlank(pkgVersion)) context ++= Map("pkgVersion" -> pkgVersion) - if (StringUtils.isNotBlank(message.userId)) context ++= Map(TelemetryParams.ACTOR.name -> message.userId) - - if (propsExceptSystemProps.nonEmpty) { - val cdataList = gson.fromJson(JSONUtil.serialize(cdata), classOf[java.util.List[java.util.Map[String, Object]]]) - - TelemetryGenerator.setComponent("audit-event-generator") - auditMap = TelemetryGenerator.audit( - JSONUtil.deserialize[util.Map[String, String]](JSONUtil.serialize(context)), - JSONUtil.deserialize[util.List[String]](JSONUtil.serialize(propsExceptSystemProps)), - currStatus, - prevStatus, - cdataList) - logger.info("Audit Message for Content Id [" + message.objectId + "] : " + auditMap); - - (auditMap, message.objectType) - } - else { - logger.info("Skipping Audit log as props is null or empty") - (SKIP_AUDIT, "") - } - } - - /** - * @param addedRelations - * @param removedRelations - * @param propertyMap - * @return - */ - private def getCData(addedRelations: List[Map[String, AnyRef]], removedRelations: List[Map[String, AnyRef]], propertyMap: Map[String, AnyRef]): List[Map[String, AnyRef]] = { - var cdata = List[Map[String, AnyRef]]() - if (null != propertyMap && propertyMap.nonEmpty && propertyMap.contains("dialcodes")) { - val dialcodeMap = propertyMap("dialcodes").asInstanceOf[Map[String, AnyRef]] - val dialcodes = dialcodeMap("nv").asInstanceOf[List[String]] - if (null != dialcodes) { - cdata :+= Map[String, AnyRef]("id" -> dialcodes, "type" -> "DialCode") - } - } - if (null != addedRelations && addedRelations.nonEmpty) cdata ++= prepareCMap(addedRelations) - if (null != removedRelations && removedRelations.nonEmpty) cdata ++= prepareCMap(removedRelations) - cdata - } - - /** - * @param relations - */ - private def prepareCMap(relations: List[Map[String, AnyRef]]): List[Map[String, AnyRef]] = { - relations.map(relation => { - Map[String, AnyRef]("id" -> relation("id"), "type" -> relation("type")) - }) - } - - /** - * - * @param relations - */ - private def getRelationProps(relations: List[Map[String, AnyRef]], objectDefinition: ObjectDefinition)(implicit config: TransactionEventProcessorConfig):List[String] = { - var relationProps = List[String]() - if (relations.nonEmpty) { - relations.foreach(rel => { - val direction = rel.getOrElse("dir", "").asInstanceOf[String] - val relationType = rel.getOrElse("rel", "").asInstanceOf[String] - val targetObjType = rel.getOrElse("type", "").asInstanceOf[String] - val relationProp = objectDefinition.relationLabel(targetObjType, direction, relationType) - if (relationProp.nonEmpty) { - relationProps :+= relationProp.get - } - }) - } - relationProps - } - - /** - * @param oldDate - * @param newDate - * @return - */ - def computeDuration(oldDate: String, newDate: String): Long = { - val sdf:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); - val od = sdf.parse(oldDate) - val nd = sdf.parse(newDate) - val diff = nd.getTime - od.getTime - val diffSeconds = diff / 1000 - diffSeconds - } - - @throws(classOf[InvalidEventException]) - def processEvent(event: Event, metrics: Metrics)(implicit esUtil: ElasticSearchUtil, config: TransactionEventProcessorConfig): Unit = { - if (event.isValid) { - val identifier = event.nodeUniqueId - logger.info("Audit learning event received : " + identifier) - try { - val record = getAuditHistory(event) - val document = JSONUtil.serialize(record) - val indexName = getIndexName(event.ets) - esUtil.addDocumentWithIndex(document, indexName) - logger.info("Audit record created for " + identifier) - metrics.incCounter(config.successEventCount) - } catch { - case ex: IOException => - logger.error("Error while indexing message :: " + event.getJson + " :: " + ex.getMessage) - ex.printStackTrace() - metrics.incCounter(config.esFailedEventCount) - throw new InvalidEventException(ex.getMessage, Map("partition" -> event.partition, "offset" -> event.offset), ex) - case ex: Exception => - logger.error("Error while processing message :: " + event.getJson + " :: ", ex) - metrics.incCounter(config.failedEventCount) - } - } - else logger.info("Learning event not qualified for audit") - } - - private def getIndexName(ets: Long)(implicit config: TransactionEventProcessorConfig): String = { - val cal = Calendar.getInstance(TimeZone.getTimeZone(config.timeZone)) - cal.setTime(new Date(ets)) - config.auditHistoryIndex + "_" + cal.get(Calendar.YEAR) + "_" + cal.get(Calendar.WEEK_OF_YEAR) - } - - def getAuditHistory(transactionDataMap: Event): AuditHistoryRecord = { - val nodeUniqueId = StringUtils.replace(transactionDataMap.nodeUniqueId, ".img", "") - AuditHistoryRecord(nodeUniqueId, transactionDataMap.objectType, transactionDataMap.label, transactionDataMap.graphId, transactionDataMap.userId, transactionDataMap.requestId, JSONUtil.serialize(transactionDataMap.transactionData), transactionDataMap.operationType, transactionDataMap.createdOnDate) - } - -} \ No newline at end of file diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorConfig.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorConfig.scala deleted file mode 100644 index 7af09f05e..000000000 --- a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorConfig.scala +++ /dev/null @@ -1,56 +0,0 @@ -package org.sunbird.job.transaction.task - -import java.util -import com.typesafe.config.Config -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.streaming.api.scala.OutputTag -import org.sunbird.job.BaseJobConfig - -class TransactionEventProcessorConfig(override val config: Config) extends BaseJobConfig(config, "transaction-event-processor") { - - private val serialVersionUID = 2905979434303791379L - - implicit val mapTypeInfo: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]]) - implicit val stringTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String]) - - // Kafka Topics Configuration - val kafkaInputTopic: String = config.getString("kafka.input.topic") - val kafkaOutputTopic: String = config.getString("kafka.output.topic") - override val kafkaConsumerParallelism: Int = config.getInt("task.consumer.parallelism") - override val parallelism: Int = config.getInt("task.parallelism") - val kafkaProducerParallelism: Int = config.getInt("task.producer.parallelism") - val auditEventGenerator: Boolean = config.getBoolean("job.audit-event-generator") - val auditHistoryIndexer: Boolean = config.getBoolean("job.audit-history-indexer") - - val auditOutputTag: OutputTag[String] = OutputTag[String]("audit-event-tag") - - val defaultChannel: String =config.getString("channel.default") - - // Metric List - val totalEventsCount = "total-events-count" - val successEventCount = "success-events-count" - val failedEventCount = "failed-events-count" - val skippedEventCount = "skipped-events-count" - val emptySchemaEventCount = "empty-schema-events-count" - val emptyPropsEventCount = "empty-props-events-count" - val esFailedEventCount = "elasticsearch-error-events-count" - - // Consumers - val transactionEventConsumer = "transaction-event-processor-consumer" - val auditEventGeneratorFunction = "audit-event-generator-function" - val auditHistoryIndexerFunction = "audit-history-indexer-function" - val transactionEventProducer = "transaction-event-processor-producer" - - - val basePath = config.getString("schema.basePath") - val configVersion = "1.0" - - // ES Configs - val esConnectionInfo = config.getString("es.basePath") - - val timeZone = if (config.hasPath("timezone")) config.getString("timezone") else "IST" - val auditHistoryIndex = "kp_audit_log" - val operationCreate = "CREATE" - val auditHistoryIndexType = "ah" -} diff --git a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorStreamTask.scala b/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorStreamTask.scala deleted file mode 100644 index 8bc584c77..000000000 --- a/transaction-event-processor/src/main/scala/org/sunbird/job/transaction/task/TransactionEventProcessorStreamTask.scala +++ /dev/null @@ -1,72 +0,0 @@ -package org.sunbird.job.transaction.task - -import java.io.File -import java.util -import com.typesafe.config.ConfigFactory -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.api.java.utils.ParameterTool -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.sunbird.job.connector.FlinkKafkaConnector -import org.sunbird.job.transaction.domain.Event -import org.sunbird.job.transaction.functions.{AuditEventGenerator, AuditHistoryIndexer} -import org.sunbird.job.util.{ElasticSearchUtil, FlinkUtil, HttpUtil} - - -class TransactionEventProcessorStreamTask(config: TransactionEventProcessorConfig, kafkaConnector: FlinkKafkaConnector, esUtil: ElasticSearchUtil) { - def process(): Unit = { - implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(config) - // implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment() - implicit val eventTypeInfo: TypeInformation[Event] = TypeExtractor.getForClass(classOf[Event]) - implicit val mapTypeInfo: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]]) - implicit val mapTypeInfoEs: TypeInformation[util.Map[String, Any]] = TypeExtractor.getForClass(classOf[util.Map[String, Any]]) - implicit val stringTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String]) - - val inputStream = env.addSource(kafkaConnector.kafkaJobRequestSource[Event](config.kafkaInputTopic)).name(config.transactionEventConsumer) - .uid(config.transactionEventConsumer).setParallelism(config.kafkaConsumerParallelism) - - if(config.auditEventGenerator) { - val auditEventGeneratorStreamTask = inputStream.rebalance - .process(new AuditEventGenerator(config)) - .name(config.auditEventGeneratorFunction) - .uid(config.auditEventGeneratorFunction) - .setParallelism(config.parallelism) - - auditEventGeneratorStreamTask.getSideOutput(config.auditOutputTag).addSink(kafkaConnector.kafkaStringSink(config.kafkaOutputTopic)) - .name(config.transactionEventProducer).uid(config.transactionEventProducer).setParallelism(config.kafkaProducerParallelism) - } - - if(config.auditHistoryIndexer){ - inputStream.rebalance - .keyBy(new TransactionEventKeySelector) - .process(new AuditHistoryIndexer(config, esUtil)) - .name(config.auditHistoryIndexerFunction) - .uid(config.auditHistoryIndexerFunction) - .setParallelism(config.parallelism) - } - env.execute(config.jobName) - } -} - -// $COVERAGE-OFF$ Disabling scoverage as the below code can only be invoked within flink cluster -object TransactionEventProcessorStreamTask { - - def main(args: Array[String]): Unit = { - val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path")) - val config = configFilePath.map { - path => ConfigFactory.parseFile(new File(path)).resolve() - }.getOrElse(ConfigFactory.load("transaction-event-processor.conf").withFallback(ConfigFactory.systemEnvironment())) - val transactionEventProcessorConfig = new TransactionEventProcessorConfig(config) - val kafkaUtil = new FlinkKafkaConnector(transactionEventProcessorConfig) - val esUtil:ElasticSearchUtil = null - val task = new TransactionEventProcessorStreamTask(transactionEventProcessorConfig, kafkaUtil, esUtil) - task.process() - } -} - -// $COVERAGE-ON$ - -class TransactionEventKeySelector extends KeySelector[Event, String] { - override def getKey(in: Event): String = in.id.replace(".img", "") -} diff --git a/transaction-event-processor/src/test/resources/logback-test.xml b/transaction-event-processor/src/test/resources/logback-test.xml deleted file mode 100644 index e81294323..000000000 --- a/transaction-event-processor/src/test/resources/logback-test.xml +++ /dev/null @@ -1,16 +0,0 @@ - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - - - - - - - - - - - \ No newline at end of file diff --git a/transaction-event-processor/src/test/resources/test.conf b/transaction-event-processor/src/test/resources/test.conf deleted file mode 100644 index 26207d65d..000000000 --- a/transaction-event-processor/src/test/resources/test.conf +++ /dev/null @@ -1,25 +0,0 @@ -include "base-config.conf" - -kafka { - input.topic = "sunbirddev.learning.graph.events" - output.topic = "sunbirddev.telemetry.raw" - groupId = "sunbirddev-transaction-event-processor-group" -} - -task { - consumer.parallelism = 1 - parallelism = 1 - producer.parallelism = 1 - window.time = 60 -} - -schema { - basePath = "https://sunbirddev.blob.core.windows.net/sunbird-content-dev/schemas/local" -} - -channel.default = "org.sunbird" - -job { - audit-event-generator = true - audit-history-indexer = false -} diff --git a/transaction-event-processor/src/test/scala/org/sunbird/job/fixture/EventFixture.scala b/transaction-event-processor/src/test/scala/org/sunbird/job/fixture/EventFixture.scala deleted file mode 100644 index c61a47429..000000000 --- a/transaction-event-processor/src/test/scala/org/sunbird/job/fixture/EventFixture.scala +++ /dev/null @@ -1,64 +0,0 @@ -package org.sunbird.job.fixture - -object EventFixture { - - val EVENT_1: String = - """ - |{"ets":1518517878987,"nodeUniqueId":"do_11243969846440755213","requestId":null,"transactionData":{"properties":{"code":{"ov":null,"nv":"test_code"},"keywords":{"ov":null,"nv":["colors","games"]},"channel":{"ov":null,"nv":"in.ekstep"},"language":{"ov":null,"nv":["English"]},"mimeType":{"ov":null,"nv":"application/pdf"},"idealScreenSize":{"ov":null,"nv":"normal"},"createdOn":{"ov":null,"nv":"2018-02-13T16:01:18.947+0530"},"contentDisposition":{"ov":null,"nv":"inline"},"contentEncoding":{"ov":null,"nv":"identity"},"lastUpdatedOn":{"ov":null,"nv":"2018-02-13T16:01:18.947+0530"},"contentType":{"ov":null,"nv":"Story"},"audience":{"ov":null,"nv":["Learner"]},"IL_SYS_NODE_TYPE":{"ov":null,"nv":"DATA_NODE"},"os":{"ov":null,"nv":["All"]},"visibility":{"ov":null,"nv":"Default"},"mediaType":{"ov":null,"nv":"content"},"osId":{"ov":null,"nv":"org.ekstep.quiz.app"},"versionKey":{"ov":null,"nv":"1518517878947"},"idealScreenDensity":{"ov":null,"nv":"hdpi"},"framework":{"ov":null,"nv":"NCF"},"compatibilityLevel":{"ov":null,"nv":1.0},"IL_FUNC_OBJECT_TYPE":{"ov":null,"nv":"Content"},"name":{"ov":null,"nv":"Untitled Resource"},"IL_UNIQUE_ID":{"ov":null,"nv":"do_11243969846440755213"},"status":{"ov":null,"nv":"Draft"},"resourceType":{"ov":null,"nv":"Story"}}},"operationType":"CREATE","nodeGraphId":113603,"label":"Untitled Resource","graphId":"domain","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2018-02-13T16:01:18.987+0530","objectType":"Content"} - |""".stripMargin - - val EVENT_2: String = - """ - |{"ets":1552464504681,"channel":"in.ekstep","transactionData":{"properties":{"lastStatusChangedOn":{"ov":"2019-03-13T13:25:43.129+0530","nv":"2019-03-13T13:38:24.358+0530"},"lastSubmittedOn":{"ov":null,"nv":"2019-03-13T13:38:21.901+0530"},"lastUpdatedOn":{"ov":"2019-03-13T13:36:20.093+0530","nv":"2019-03-13T13:38:24.399+0530"},"status":{"ov":"Draft","nv":"Review"},"versionKey":{"ov":"1552464380093","nv":"1552464504399"}}},"label":"Resource Content 1","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2019-03-13T13:38:24.680+0530","objectType":"Content","nodeUniqueId":"do_11271778298376192013","requestId":null,"operationType":"UPDATE","nodeGraphId":590883,"graphId":"domain"} - |""".stripMargin - - val EVENT_3: String = - """ - |{"ets":1552464380225,"channel":"in.ekstep","transactionData":{"properties":{"s3Key":{"ov":null,"nv":"content/do_11271778298376192013/artifact/pdf_1552464372724.pdf"},"size":{"ov":null,"nv":433994.0},"artifactUrl":{"ov":null,"nv":"https://ekstep-public-dev.s3-ap-south-1.amazonaws.com/content/do_11271778298376192013/artifact/pdf_1552464372724.pdf"},"lastUpdatedOn":{"ov":"2019-03-13T13:25:43.129+0530","nv":"2019-03-13T13:36:20.093+0530"},"versionKey":{"ov":"1552463743129","nv":"1552464380093"}}},"label":"Resource Content 1","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2019-03-13T13:36:20.223+0530","objectType":"Content","nodeUniqueId":"do_11271778298376192013","requestId":null,"operationType":"UPDATE","nodeGraphId":590883,"graphId":"domain"} - |""".stripMargin - - val EVENT_4: String = - """ - |{"ets":1552645516180,"channel":"in.ekstep","transactionData":{"properties":{"ownershipType":{"ov":null,"nv":["createdBy"]},"code":{"ov":null,"nv":"test.res.1"},"channel":{"ov":null,"nv":"in.ekstep"},"language":{"ov":null,"nv":["English"]},"mimeType":{"ov":null,"nv":"application/pdf"},"idealScreenSize":{"ov":null,"nv":"normal"},"createdOn":{"ov":null,"nv":"2019-03-15T15:55:16.071+0530"},"contentDisposition":{"ov":null,"nv":"inline"},"lastUpdatedOn":{"ov":null,"nv":"2019-03-15T15:55:16.071+0530"},"contentEncoding":{"ov":null,"nv":"identity"},"dialcodeRequired":{"ov":null,"nv":"No"},"contentType":{"ov":null,"nv":"Resource"},"lastStatusChangedOn":{"ov":null,"nv":"2019-03-15T15:55:16.071+0530"},"audience":{"ov":null,"nv":["Learner"]},"IL_SYS_NODE_TYPE":{"ov":null,"nv":"DATA_NODE"},"visibility":{"ov":null,"nv":"Default"},"os":{"ov":null,"nv":["All"]},"mediaType":{"ov":null,"nv":"content"},"osId":{"ov":null,"nv":"org.ekstep.quiz.app"},"versionKey":{"ov":null,"nv":"1552645516071"},"idealScreenDensity":{"ov":null,"nv":"hdpi"},"framework":{"ov":null,"nv":"NCF"},"compatibilityLevel":{"ov":null,"nv":1.0},"IL_FUNC_OBJECT_TYPE":{"ov":null,"nv":"Content"},"name":{"ov":null,"nv":"Resource Content 1"},"IL_UNIQUE_ID":{"ov":null,"nv":"do_11271927206783385611"},"status":{"ov":null,"nv":"Draft"}}},"mid":"9ea9ae7a-9cc1-493d-aac3-3c66cd9ff01b","label":"Resource Content 1","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2019-03-15T15:55:16.178+0530","objectType":"Content","nodeUniqueId":"do_11271927206783385611","requestId":null,"operationType":"CREATE","nodeGraphId":590921,"graphId":"domain"} - |""".stripMargin - - val EVENT_5: String = - """ - |{"ets":1.614227716965E12,"transactionData":{"removedTags":[],"addedRelations":[{"rel":"hasSequenceMember","id":"do_11322375329257062411","label":"event 4.1: 11-1","dir":"OUT","type":"Event","relMetadata":{"IL_SEQUENCE_INDEX":1.0}}],"removedRelations":[],"addedTags":[],"properties":{}},"mid":"fc0bb006-7269-4b10-96c3-10672fca53a0","label":"eventset 4","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2021-02-25T04:35:16.965+0000","objectType":null,"partition":1,"nodeUniqueId":"do_11322375344215654413","operationType":"UPDATE","nodeGraphId":509461.0,"graphId":"domain"} - |""".stripMargin - - val EVENT_6: String = - """ - |{"ets":1502102183388,"nodeUniqueId":"do_112276071067320320114","requestId":null,"transactionData":{"addedRelations":[{"rel":"hasSequenceMember","id":"do_1123032073439723521148","label":"Test unit 11","dir":"IN","type":"Content"}],"removedRelations":[],"properties":{"name":{"nv":"","ov":""}}},"operationType":"CREATE","nodeGraphId":105631,"label":"collaborator test","graphId":"domain","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2017-08-07T10:36:23.388+0000","objectType":"Content"} - |""".stripMargin - - val EVENT_7: String = - """ - |{"ets":1614776853781,"channel":"b00bc992ef25f1a9a8d63291e20efc8d","transactionData":{"properties":{"dialcodes":{"ov":null,"nv":["K1W6L6"]}}},"mid":"5b5633a2-3c18-49a6-8822-6d7b85338104","label":"Test Again","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2021-03-03T13:07:33.781+0000","objectType":"Collection","nodeUniqueId":"do_1132282511204024321262","requestId":null,"operationType":"UPDATE","nodeGraphId":510086,"graphId":"domain"} - |""".stripMargin - - val EVENT_8: String = - """ - |{"ets":1.614227716965E12,"transactionData":{"removedTags":[],"addedRelations":[{"rel":"hasSequenceMember","id":"do_11322375329257062411","label":"event 4.1: 11-1","dir":"OUT","type":"Event","relMetadata":{"IL_SEQUENCE_INDEX":1.0}}],"removedRelations":[],"addedTags":[],"properties":{}},"mid":"fc0bb006-7269-4b10-96c3-10672fca53a0","label":"eventset 4","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2021-02-25T04:35:16.965+0000","objectType":"RandomSchemaType","partition":1,"nodeUniqueId":"do_11322375344215654413","operationType":"UPDATE","nodeGraphId":509461.0,"graphId":"domain"} - |""".stripMargin - - val EVENT_9: String = - """ - |{"ets":1518517878987,"nodeUniqueId":"do_11243969846440755213","requestId":null,"transactionData":{"properties":{"code":{"ov":null,"nv":"test_code"},"keywords":{"ov":null,"nv":["colors","games"]},"channel":{"ov":null,"nv":"in.ekstep"},"language":{"ov":null,"nv":["English"]},"mimeType":{"ov":null,"nv":"application/pdf"},"idealScreenSize":{"ov":null,"nv":"normal"},"createdOn":{"ov":null,"nv":"2018-02-13T16:01:18.947+0530"},"contentDisposition":{"ov":null,"nv":"inline"},"contentEncoding":{"ov":null,"nv":"identity"},"lastUpdatedOn":{"ov":null,"nv":"2018-02-13T16:01:18.947+0530"},"contentType":{"ov":null,"nv":"Story"},"audience":{"ov":null,"nv":["Learner"]},"IL_SYS_NODE_TYPE":{"ov":null,"nv":"DATA_NODE"},"os":{"ov":null,"nv":["All"]},"visibility":{"ov":null,"nv":"Default"},"mediaType":{"ov":null,"nv":"content"},"osId":{"ov":null,"nv":"org.ekstep.quiz.app"},"versionKey":{"ov":null,"nv":"1518517878947"},"idealScreenDensity":{"ov":null,"nv":"hdpi"},"framework":{"ov":null,"nv":"NCF"},"compatibilityLevel":{"ov":null,"nv":1.0},"IL_FUNC_OBJECT_TYPE":{"ov":null,"nv":"Content"},"name":{"ov":null,"nv":"Untitled Resource"},"IL_UNIQUE_ID":{"ov":null,"nv":"do_11243969846440755213"},"status":{"ov":null,"nv":"Draft"},"resourceType":{"ov":null,"nv":"Story"}}},"operationType":"CREATE","nodeGraphId":113603,"label":"Untitled Resource","graphId":"domain","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2018-02-13T16:01:18.987+0530","objectType":"Content"} - |""".stripMargin - - val EVENT_10: String = - """ - |{"ets":1502102183388,"nodeUniqueId":"do_112276071067320320114","requestId":null,"transactionData":{"addedRelations":[{"rel":"hasSequenceMember","id":"do_1123032073439723521148","label":"Test unit 11","dir":"IN","type":"Content"}],"removedRelations":[],"properties":{"name":{"nv":"","ov":""}}},"operationType":"CREATE","nodeGraphId":105631,"label":"collaborator test","graphId":"domain","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2017-08-07T10:36:23.388+0000","objectType":"Content"} - |""".stripMargin - - val EVENT_11: String = - """ - |{"ets":1615191835547,"channel":"01309282781705830427","transactionData":{"removedTags":[],"addedRelations":[],"removedRelations":[{"rel":"associatedTo","id":"do_113198273083662336127","label":"qq\n","dir":"OUT","type":"AssessmentItem","relMetadata":{}}],"addedTags":[],"properties":{}},"mid":"98145983-63dc-4d55-866c-248d49306ad8","label":"ECML_CHANGES","nodeType":"DATA_NODE","userId":"5a587cc1-e018-4859-a0a8-e842650b9d64","createdOn":"2021-03-08T08:23:55.547+0000","objectType":"Content","nodeUniqueId":"do_1132316371218268161118","requestId":null,"operationType":"UPDATE","nodeGraphId":510477,"graphId":"domain"} - |""".stripMargin - - val EVENT_12: String = - """ - |{"ets":1500888709490,"requestId":null,"transactionData":{"properties":{"IL_SYS_NODE_TYPE":{"ov":null,"nv":"DATA_NODE"},"morphology":{"ov":null,"nv":false},"consumerId":{"ov":null,"nv":"a6654129-b58d-4dd8-9cf2-f8f3c2f458bc"},"channel":{"ov":null,"nv":"in.ekstep"},"lemma":{"ov":null,"nv":"ವಿಶ್ಲೇಷಣೆ"},"createdOn":{"ov":null,"nv":"2017-07-24T09:32:18.130+0000"},"versionKey":{"ov":null,"nv":"1500888738130"},"IL_FUNC_OBJECT_TYPE":{"ov":null,"nv":"Word"},"ekstepWordnet":{"ov":null,"nv":false},"lastUpdatedOn":{"ov":null,"nv":"2017-07-24T09:32:18.130+0000"},"isPhrase":{"ov":null,"nv":false},"IL_UNIQUE_ID":{"ov":null,"nv":"ka_11229528054276096015"},"status":{"ov":null,"nv":"Draft"}}},"nodeGraphId":433342,"label":"ವಿಶ್ಲೇಷಣೆ","graphId":"ka","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2017-07-24T09:31:49.490+0000","objectType":"Word"} - |""".stripMargin -} \ No newline at end of file diff --git a/transaction-event-processor/src/test/scala/org/sunbird/job/spec/TransactionEventProcessorTaskTestSpec.scala b/transaction-event-processor/src/test/scala/org/sunbird/job/spec/TransactionEventProcessorTaskTestSpec.scala deleted file mode 100644 index fb6a63a0a..000000000 --- a/transaction-event-processor/src/test/scala/org/sunbird/job/spec/TransactionEventProcessorTaskTestSpec.scala +++ /dev/null @@ -1,207 +0,0 @@ -package org.sunbird.job.spec - -import java.util -import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} -import okhttp3.mockwebserver.{MockResponse, MockWebServer} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.runtime.client.JobExecutionException -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration -import org.apache.flink.streaming.api.functions.sink.SinkFunction -import org.apache.flink.streaming.api.functions.source.SourceFunction -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext -import org.apache.flink.test.util.MiniClusterWithClientResource -import org.sunbird.job.util.{ElasticSearchUtil, JSONUtil} -import org.mockito.Mockito -import org.mockito.Mockito.when -import org.sunbird.job.connector.FlinkKafkaConnector -import org.sunbird.job.transaction.domain.Event -import org.sunbird.job.fixture.EventFixture -import org.sunbird.job.transaction.task.{TransactionEventProcessorConfig, TransactionEventProcessorStreamTask} -import org.sunbird.spec.{BaseMetricsReporter, BaseTestSpec} - -class TransactionEventProcessorTaskTestSpec extends BaseTestSpec { - implicit val mapTypeInfo: TypeInformation[java.util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[java.util.Map[String, AnyRef]]) - - val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() - .setConfiguration(testConfiguration()) - .setNumberSlotsPerTaskManager(1) - .setNumberTaskManagers(1) - .build) - val mockKafkaUtil: FlinkKafkaConnector = mock[FlinkKafkaConnector](Mockito.withSettings().serializable()) - val config: Config = ConfigFactory.load("test.conf") - val jobConfig: TransactionEventProcessorConfig = new TransactionEventProcessorConfig(config) - val esUtil:ElasticSearchUtil = null - val server = new MockWebServer() - - var currentMilliSecond = 1605816926271L - - override protected def beforeAll(): Unit = { - BaseMetricsReporter.gaugeMetrics.clear() - flinkCluster.before() - super.beforeAll() - } - - override protected def afterAll(): Unit = { - flinkCluster.after() - super.afterAll() - } - - "TransactionEventProcessorStreamTask" should "generate audit event" in { - when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new AuditEventMapSource) - when(mockKafkaUtil.kafkaStringSink(jobConfig.kafkaOutputTopic)).thenReturn(new AuditEventSink) - if (jobConfig.auditEventGenerator) { - new TransactionEventProcessorStreamTask(jobConfig, mockKafkaUtil, esUtil).process() - - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.totalEventsCount}").getValue() should be(2) - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.skippedEventCount}").getValue() should be(0) - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.successEventCount}").getValue() should be(1) - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.failedEventCount}").getValue() should be(0) - - AuditEventSink.values.size() should be(1) - AuditEventSink.values.forEach(event => { - val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](event) - eventMap("eid") should be("AUDIT") - eventMap("ver") should be("3.0") - eventMap("edata") shouldNot be(null) - }) - } - } - - "TransactionEventProcessorStreamTask" should "not generate audit event" in { - when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new AuditEventMapSource) - when(mockKafkaUtil.kafkaStringSink(jobConfig.kafkaOutputTopic)).thenReturn(new AuditEventSink) - - val setBoolean = config.withValue("job.audit-event-generator", ConfigValueFactory.fromAnyRef(false)) - val newConfig: TransactionEventProcessorConfig = new TransactionEventProcessorConfig(setBoolean) - if (newConfig.auditEventGenerator) { - - new TransactionEventProcessorStreamTask(newConfig, mockKafkaUtil, esUtil).process() - - BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.totalEventsCount}").getValue() should be(2) - BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.skippedEventCount}").getValue() should be(2) - BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.successEventCount}").getValue() should be(0) - - } - } - - "TransactionEventProcessorStreamTask" should "increase metric for unknown schema" in { - when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new RandomObjectTypeAuditEventGeneratorMapSource) - when(mockKafkaUtil.kafkaStringSink(jobConfig.kafkaOutputTopic)).thenReturn(new AuditEventSink) - if (jobConfig.auditEventGenerator) { - - new TransactionEventProcessorStreamTask(jobConfig, mockKafkaUtil, esUtil).process() - - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.totalEventsCount}").getValue() should be(1) - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.emptySchemaEventCount}").getValue() should be(1) - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.emptyPropsEventCount}").getValue() should be(1) - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.successEventCount}").getValue() should be(0) - } - } - - - "TransactionEventProcessorStreamTask" should "not generate audit history indexer event" in { - server.start(9200) - server.enqueue(new MockResponse().setHeader( - "Content-Type", "application/json" - ).setBody("""{"_index":"kp_audit_log_2018_7","_type":"ah","_id":"HLZ-1ngBtZ15DPx6ENjU","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1}""")) - - when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new AuditHistoryMapSource) - if (jobConfig.auditHistoryIndexer) { - new TransactionEventProcessorStreamTask(jobConfig, mockKafkaUtil, esUtil).process() - - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.totalEventsCount}").getValue() should be(2) - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.successEventCount}").getValue() should be(0) - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.failedEventCount}").getValue() should be(1) - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.esFailedEventCount}").getValue() should be(1) - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.skippedEventCount}").getValue() should be(0) - } - } - - "TransactionEventProcessorStreamTask" should "generate audit history indexer event" in { - server.enqueue(new MockResponse().setHeader( - "Content-Type", "application/json" - ).setBody("""{"_index":"kp_audit_log_2018_7","_type":"ah","_id":"HLZ-1ngBtZ15DPx6ENjU","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1}""")) - - when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new AuditHistoryMapSource) - val setBoolean = config.withValue("job.audit-history-indexer", ConfigValueFactory.fromAnyRef(true)) - val newConfig: TransactionEventProcessorConfig = new TransactionEventProcessorConfig(setBoolean) - if (newConfig.auditHistoryIndexer) { - new TransactionEventProcessorStreamTask(newConfig, mockKafkaUtil, esUtil).process() - BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.totalEventsCount}").getValue() should be(2) - BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.successEventCount}").getValue() should be(2) - BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.failedEventCount}").getValue() should be(0) - BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.esFailedEventCount}").getValue() should be(0) - BaseMetricsReporter.gaugeMetrics(s"${newConfig.jobName}.${newConfig.skippedEventCount}").getValue() should be(0) - server.close() - } - } - - "TransactionEventProcessorStreamTask" should "throw exception and increase es error count" in { - when(mockKafkaUtil.kafkaJobRequestSource[Event](jobConfig.kafkaInputTopic)).thenReturn(new AuditHistoryMapSource) - - try { - new TransactionEventProcessorStreamTask(jobConfig, mockKafkaUtil, esUtil).process() - } catch { - case ex: JobExecutionException => - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.totalEventsCount}").getValue() should be(1) - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.successEventCount}").getValue() should be(0) - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.failedEventCount}").getValue() should be(0) - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.esFailedEventCount}").getValue() should be(1) - BaseMetricsReporter.gaugeMetrics(s"${jobConfig.jobName}.${jobConfig.skippedEventCount}").getValue() should be(0) - } - } -} - -class AuditEventMapSource extends SourceFunction[Event] { - - override def run(ctx: SourceContext[Event]) { - ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_1), 0, 10)) - ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_5), 0, 11)) -} - - override def cancel(): Unit = {} -} - -class AuditHistoryMapSource extends SourceFunction[Event] { - - override def run(ctx: SourceContext[Event]) { - - ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_9), 0, 10)) - ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_12),0,11)) - } - - override def cancel(): Unit = {} -} - -class EventMapSource extends SourceFunction[Event] { - - override def run(ctx: SourceContext[Event]) { - - ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_1), 0, 10)) - ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_9),0,11)) - } - - override def cancel(): Unit = {} -} -class RandomObjectTypeAuditEventGeneratorMapSource extends SourceFunction[Event] { - - override def run(ctx: SourceContext[Event]) { - ctx.collect(new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_8), 0, 10)) - } - - override def cancel(): Unit = {} -} - -class AuditEventSink extends SinkFunction[String] { - - override def invoke(value: String): Unit = { - synchronized { - AuditEventSink.values.add(value) - } - } -} - -object AuditEventSink { - val values: util.List[String] = new util.ArrayList() -} \ No newline at end of file diff --git a/transaction-event-processor/src/test/scala/org/sunbird/job/spec/service/TransactionEventProcessorServiceTestSpec.scala b/transaction-event-processor/src/test/scala/org/sunbird/job/spec/service/TransactionEventProcessorServiceTestSpec.scala deleted file mode 100644 index b70a23e7c..000000000 --- a/transaction-event-processor/src/test/scala/org/sunbird/job/spec/service/TransactionEventProcessorServiceTestSpec.scala +++ /dev/null @@ -1,158 +0,0 @@ -package org.sunbird.job.spec.service - -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.TypeExtractor -import org.mockito.Mockito -import org.slf4j.LoggerFactory -import org.sunbird.job.Metrics -import org.sunbird.job.transaction.domain.{AuditHistoryRecord, Event} -import org.sunbird.job.fixture.EventFixture -import org.sunbird.job.transaction.functions.{AuditEventGenerator, AuditHistoryIndexer} -import org.sunbird.job.transaction.task.TransactionEventProcessorConfig -import org.sunbird.job.util.{ElasticSearchUtil, JSONUtil} -import org.sunbird.spec.BaseTestSpec - -import java.util - -class TransactionEventProcessorTestSpec extends BaseTestSpec { - implicit val mapTypeInfo: TypeInformation[java.util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[java.util.Map[String, AnyRef]]) - implicit val mpTypeInfo: TypeInformation[util.Map[String, Any]] = TypeExtractor.getForClass(classOf[util.Map[String, Any]]) - implicit val strTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String]) - - val config: Config = ConfigFactory.load("test.conf") - lazy val jobConfig: TransactionEventProcessorConfig = new TransactionEventProcessorConfig(config) - lazy val mockMetrics = mock[Metrics](Mockito.withSettings().serializable()) - lazy val auditEventGenerator: AuditEventGenerator = new AuditEventGenerator(jobConfig) - lazy val mockElasticUtil:ElasticSearchUtil = mock[ElasticSearchUtil](Mockito.withSettings().serializable()) - lazy val auditHistoryIndexer: AuditHistoryIndexer = new AuditHistoryIndexer(jobConfig,mockElasticUtil) - - override protected def beforeAll(): Unit = { - super.beforeAll() - } - - override protected def afterAll(): Unit = { - super.afterAll() - } - - "TransactionEventProcessorService" should "generate audit event" in { - val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_1) - - val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics) - val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr) - - eventMap("eid") should be("AUDIT") - eventMap("ver") should be("3.0") - eventMap("edata") shouldNot be(null) - } - - "TransactionEventProcessorService" should "add duration of status change" in { - val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_2) - - val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics) - val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr) - - eventMap("eid") should be("AUDIT") - eventMap("ver") should be("3.0") - eventMap("edata") shouldNot be(null) - val duration = eventMap("edata").asInstanceOf[Map[String, AnyRef]]("duration").asInstanceOf[Int] - duration should be(761) - } - - "TransactionEventProcessorService" should "add Duration as null" in { - val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_3) - - val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics) - val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr) - - eventMap("eid") should be("AUDIT") - eventMap("ver") should be("3.0") - eventMap("edata") shouldNot be(null) - val duration = eventMap("edata").asInstanceOf[Map[String, AnyRef]].getOrElse("duration", null) - duration should be(null) - } - - "TransactionEventProcessorService" should "generate audit for content creation" in { - val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_4) - - val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics) - val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr) - - eventMap("eid") should be("AUDIT") - eventMap("ver") should be("3.0") - eventMap("edata") shouldNot be(null) - val duration = eventMap("edata").asInstanceOf[Map[String, AnyRef]].getOrElse("duration", null) - duration should be(null) - } - - "TransactionEventProcessorService" should "skip audit for objectType is null" in { - val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_5) - - val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics) - - eventStr should be("{\"object\": {\"type\":null}}") - } - - "TransactionEventProcessorService" should "event for addedRelations" in { - val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_6) - - val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics) - val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr) - eventMap("eid") should be("AUDIT") - eventMap("ver") should be("3.0") - eventMap("edata").asInstanceOf[Map[String, AnyRef]]("props").asInstanceOf[List[String]] should contain ("name") - eventMap("edata").asInstanceOf[Map[String, AnyRef]]("props").asInstanceOf[List[String]] should contain ("collections") - val duration = eventMap("edata").asInstanceOf[Map[String, AnyRef]].getOrElse("duration", null) - duration should be(null) - } - - "TransactionEventProcessorService" should "generate audit for update dialcode" in { - val inputEvent:util.Map[String, Any] = JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_7) - - val (eventStr, objectType) = auditEventGenerator.getAuditMessage(new Event(inputEvent, 0, 10))(jobConfig, mockMetrics) - val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](eventStr) - - eventMap("eid") should be("AUDIT") - eventMap("edata").asInstanceOf[Map[String, AnyRef]]("props").asInstanceOf[List[String]] should contain ("dialcodes") - val cdata = eventMap("cdata").asInstanceOf[List[Map[String, AnyRef]]] - cdata.head("id").asInstanceOf[List[String]] should contain ("K1W6L6") - cdata.head("type") should be("DialCode") - } - - "TransactionEventProcessorService" should "compute duration" in { - val ov = "2019-03-13T13:25:43.129+0530" - val nv = "2019-03-13T13:38:24.358+0530" - val duration = auditEventGenerator.computeDuration(ov, nv) - duration should be(761) - } - - "TransactionEventProcessorService" should "generate es log" in { - val inputEvent: Event = new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_9), 0, 10) - - val auditHistoryRec: AuditHistoryRecord = auditHistoryIndexer.getAuditHistory(inputEvent); - - auditHistoryRec.objectId should be(inputEvent.nodeUniqueId) - auditHistoryRec.objectType should be(inputEvent.objectType) - auditHistoryRec.logRecord should be("""{"properties":{"mediaType":{"nv":"content"},"name":{"nv":"Untitled Resource"},"createdOn":{"nv":"2018-02-13T16:01:18.947+0530"},"channel":{"nv":"in.ekstep"},"lastUpdatedOn":{"nv":"2018-02-13T16:01:18.947+0530"},"IL_FUNC_OBJECT_TYPE":{"nv":"Content"},"resourceType":{"nv":"Story"},"compatibilityLevel":{"nv":1.0},"audience":{"nv":["Learner"]},"os":{"nv":["All"]},"IL_SYS_NODE_TYPE":{"nv":"DATA_NODE"},"framework":{"nv":"NCF"},"versionKey":{"nv":"1518517878947"},"mimeType":{"nv":"application/pdf"},"code":{"nv":"test_code"},"contentType":{"nv":"Story"},"language":{"nv":["English"]},"status":{"nv":"Draft"},"keywords":{"nv":["colors","games"]},"idealScreenSize":{"nv":"normal"},"contentEncoding":{"nv":"identity"},"osId":{"nv":"org.ekstep.quiz.app"},"IL_UNIQUE_ID":{"nv":"do_11243969846440755213"},"contentDisposition":{"nv":"inline"},"visibility":{"nv":"Default"},"idealScreenDensity":{"nv":"hdpi"}}}""") - } - - "TransactionEventProcessorService" should "generate with added relations" in { - val inputEvent: Event = new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_10), 0, 11) - - val auditHistoryRec: AuditHistoryRecord = auditHistoryIndexer.getAuditHistory(inputEvent); - - auditHistoryRec.objectId should be(inputEvent.nodeUniqueId) - auditHistoryRec.objectType should be(inputEvent.objectType) - auditHistoryRec.logRecord should be("""{"addedRelations":[{"label":"Test unit 11","rel":"hasSequenceMember","dir":"IN","id":"do_1123032073439723521148","type":"Content"}],"removedRelations":[],"properties":{"name":{"nv":"","ov":""}}}""") - } - - "TransactionEventProcessorService" should "generate with removed relations" in { - val inputEvent: Event = new Event(JSONUtil.deserialize[util.Map[String, Any]](EventFixture.EVENT_11), 0, 12) - - val auditHistoryRec: AuditHistoryRecord = auditHistoryIndexer.getAuditHistory(inputEvent); - - auditHistoryRec.objectId should be(inputEvent.nodeUniqueId) - auditHistoryRec.objectType should be(inputEvent.objectType) - auditHistoryRec.logRecord should be("""{"addedTags":[],"addedRelations":[],"properties":{},"removedRelations":[{"label":"qq\n","rel":"associatedTo","dir":"OUT","id":"do_113198273083662336127","relMetadata":{},"type":"AssessmentItem"}],"removedTags":[]}""") - } -}