Skip to content

Commit

Permalink
Revert "Code Refactoring"
Browse files Browse the repository at this point in the history
This reverts commit bfec31c.
  • Loading branch information
SurabhiAngadi committed Sep 4, 2023
1 parent bfec31c commit f5d8f13
Show file tree
Hide file tree
Showing 36 changed files with 38 additions and 1,463 deletions.
31 changes: 2 additions & 29 deletions audit-event-generator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,8 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>

<dependency>
<groupId>org.sunbird</groupId>
<artifactId>jobs-core</artifactId>
Expand Down Expand Up @@ -159,7 +152,7 @@
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sunbird.job.transaction.task.AuditEventGeneratorStreamTask</mainClass>
<mainClass>org.sunbird.job.auditevent.task.AuditEventGeneratorStreamTask</mainClass>
</transformer>
<!-- append default configs -->
<transformer
Expand Down Expand Up @@ -237,26 +230,6 @@
<highlighting>true</highlighting>
</configuration>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.5</version>
<executions>
<execution>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<!-- attached to Maven test phase -->
<execution>
<id>report</id>
<phase>test</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
5 changes: 1 addition & 4 deletions audit-event-generator/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,4 @@ log4j.appender.file.MaxBackupIndex=4
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file



log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.sunbird.job.transaction.domain
package org.sunbird.job.auditevent.domain

import org.apache.commons.lang3.StringUtils
import org.sunbird.job.domain.reader.JobRequest
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package org.sunbird.job.transaction.functions
package org.sunbird.job.auditevent.functions

import java.util
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.slf4j.LoggerFactory
import org.sunbird.job.transaction.domain.Event
import org.sunbird.job.transaction.service.AuditEventGeneratorService
import org.sunbird.job.transaction.task.AuditEventGeneratorConfig
import org.sunbird.job.auditevent.domain.Event
import org.sunbird.job.auditevent.service.AuditEventGeneratorService
import org.sunbird.job.auditevent.task.AuditEventGeneratorConfig
import org.sunbird.job.exception.InvalidEventException
import org.sunbird.job.{BaseProcessFunction, Metrics}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package org.sunbird.job.transaction.service
package org.sunbird.job.auditevent.service

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.commons.lang3.StringUtils
import org.slf4j.LoggerFactory
import org.sunbird.job.Metrics
import org.sunbird.job.transaction.task.AuditEventGeneratorConfig
import org.sunbird.job.auditevent.task.AuditEventGeneratorConfig
import org.sunbird.job.util.JSONUtil
import org.sunbird.telemetry.TelemetryGenerator
import org.sunbird.telemetry.TelemetryParams
import org.sunbird.job.transaction.domain.Event
import org.sunbird.job.auditevent.domain.Event
import org.sunbird.job.domain.`object`.{DefinitionCache, ObjectDefinition}
import com.google.gson.Gson
import org.sunbird.job.exception.InvalidEventException
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.sunbird.job.transaction.task
package org.sunbird.job.auditevent.task

import java.util
import com.typesafe.config.Config
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.sunbird.job.transaction.task
package org.sunbird.job.auditevent.task

import java.io.File
import java.util
Expand All @@ -9,15 +9,14 @@ import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.sunbird.job.connector.FlinkKafkaConnector
import org.sunbird.job.transaction.domain.Event
import org.sunbird.job.transaction.functions.AuditEventGenerator
import org.sunbird.job.auditevent.domain.Event
import org.sunbird.job.auditevent.functions.AuditEventGenerator
import org.sunbird.job.util.{FlinkUtil, HttpUtil}


class AuditEventGeneratorStreamTask(config: AuditEventGeneratorConfig, kafkaConnector: FlinkKafkaConnector) {
def process(): Unit = {
implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(config)
// implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
implicit val eventTypeInfo: TypeInformation[Event] = TypeExtractor.getForClass(classOf[Event])
implicit val mapTypeInfo: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]])
implicit val stringTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object EventFixture {

val EVENT_3: String =
"""
|{"ets":1552464380225,"channel":"in.ekstep","transactionData":{"properties":{"s3Key":{"ov":null,"nv":"content/do_11271778298376192013/artifact/pdf_1552464372724.pdf"},"size":{"ov":null,"nv":433994.0},"artifactUrl":{"ov":null,"nv":"https://sunbirddevbbpublic.blob.core.windows.net/sunbird-content-staging-knowlg/content/assets/do_21385816669265920016/pdf_233.pdf"},"lastUpdatedOn":{"ov":"2019-03-13T13:25:43.129+0530","nv":"2019-03-13T13:36:20.093+0530"},"versionKey":{"ov":"1552463743129","nv":"1552464380093"}}},"label":"Resource Content 1","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2019-03-13T13:36:20.223+0530","objectType":"Content","nodeUniqueId":"do_11271778298376192013","requestId":null,"operationType":"UPDATE","nodeGraphId":590883,"graphId":"domain"}
|{"ets":1552464380225,"channel":"in.ekstep","transactionData":{"properties":{"s3Key":{"ov":null,"nv":"content/do_11271778298376192013/artifact/pdf_1552464372724.pdf"},"size":{"ov":null,"nv":433994.0},"artifactUrl":{"ov":null,"nv":"https://ekstep-public-dev.s3-ap-south-1.amazonaws.com/content/do_11271778298376192013/artifact/pdf_1552464372724.pdf"},"lastUpdatedOn":{"ov":"2019-03-13T13:25:43.129+0530","nv":"2019-03-13T13:36:20.093+0530"},"versionKey":{"ov":"1552463743129","nv":"1552464380093"}}},"label":"Resource Content 1","nodeType":"DATA_NODE","userId":"ANONYMOUS","createdOn":"2019-03-13T13:36:20.223+0530","objectType":"Content","nodeUniqueId":"do_11271778298376192013","requestId":null,"operationType":"UPDATE","nodeGraphId":590883,"graphId":"domain"}
|""".stripMargin

val EVENT_4: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import org.sunbird.job.util.JSONUtil
import org.mockito.Mockito
import org.mockito.Mockito.when
import org.sunbird.job.connector.FlinkKafkaConnector
import org.sunbird.job.transaction.domain.Event
import org.sunbird.job.auditevent.domain.Event
import org.sunbird.job.fixture.EventFixture
import org.sunbird.job.transaction.task.{AuditEventGeneratorConfig, AuditEventGeneratorStreamTask}
import org.sunbird.job.auditevent.task.{AuditEventGeneratorConfig, AuditEventGeneratorStreamTask}
import org.sunbird.spec.{BaseMetricsReporter, BaseTestSpec}

class AuditEventGeneratorTaskTestSpec extends BaseTestSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.mockito.Mockito
import org.sunbird.job.Metrics
import org.sunbird.job.transaction.domain.Event
import org.sunbird.job.auditevent.domain.Event
import org.sunbird.job.fixture.EventFixture
import org.sunbird.job.transaction.functions.AuditEventGenerator
import org.sunbird.job.transaction.task.AuditEventGeneratorConfig
import org.sunbird.job.auditevent.functions.AuditEventGenerator
import org.sunbird.job.auditevent.task.AuditEventGeneratorConfig
import org.sunbird.job.util.JSONUtil
import org.sunbird.spec.BaseTestSpec

Expand Down
2 changes: 1 addition & 1 deletion audit-history-indexer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.sunbird</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.sunbird.job.audithistory.task

import com.typesafe.config.Config
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.sunbird.job.BaseJobConfig

import com.typesafe.config.Config
import java.util

class AuditHistoryIndexerConfig(override val config: Config) extends BaseJobConfig(config, "audit-history-indexer") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import java.util
class AuditHistoryIndexerStreamTask(config: AuditHistoryIndexerConfig, kafkaConnector: FlinkKafkaConnector, esUtil: ElasticSearchUtil) {
def process(): Unit = {
implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(config)
// implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
implicit val eventTypeInfo: TypeInformation[Event] = TypeExtractor.getForClass(classOf[Event])
implicit val mapTypeInfo: TypeInformation[util.Map[String, Any]] = TypeExtractor.getForClass(classOf[util.Map[String, Any]])
implicit val stringTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String])
Expand Down
6 changes: 1 addition & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@
<prerequisites>
<maven>3.0.0</maven>
</prerequisites>
<modules>
<module>transaction-event-processor</module>
<module>newMod</module>
</modules>

<groupId>org.sunbird</groupId>
<groupId>org.sunbird</groupId>
<artifactId>knowledge-platform-jobs</artifactId>
<version>1.0</version>
<packaging>pom</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ class ContentPublisherSpec extends FlatSpec with BeforeAndAfterAll with Matchers
}

"validateMetadata with mimeType application/msword and .pptx " should " not return exception messages if content is having valid artifactUrl" in {
val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://sunbirddev.blob.core.windows.net/sunbird-content-dev/content/do_112216616320983040129/artifact/performance_out_1491286194831.pptx"), None)
val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://ekstep-public-dev.s3-ap-south-1.amazonaws.com/content/do_112216616320983040129/artifact/performance_out_1491286194831.pptx"), None)
val result: List[String] = new TestContentPublisher().validateMetadata(data, data.identifier, jobConfig)
result.size should be(0)
}

"validateMetadata with mimeType application/msword and .docx " should " not return exception messages if content is having valid artifactUrl" in {
val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://sunbirddev.blob.core.windows.net/sunbird-content-dev/content/do_112216615190192128128/artifact/prdassetstagging-2_1491286084107.docx"), None)
val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://ekstep-public-dev.s3-ap-south-1.amazonaws.com/content/do_112216615190192128128/artifact/prdassetstagging-2_1491286084107.docx"), None)
val result: List[String] = new TestContentPublisher().validateMetadata(data, data.identifier, jobConfig)
result.size should be(0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ class LiveContentPublisherSpec extends FlatSpec with BeforeAndAfterAll with Matc
}

"validateMetadata with mimeType application/msword and .pptx " should " not return exception messages if content is having valid artifactUrl" in {
val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://sunbirddev.blob.core.windows.net/sunbird-content-dev/content/do_112216616320983040129/artifact/performance_out_1491286194831.pptx"), None)
val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://ekstep-public-dev.s3-ap-south-1.amazonaws.com/content/do_112216616320983040129/artifact/performance_out_1491286194831.pptx"), None)
val result: List[String] = new TestLiveContentPublisher().validateMetadata(data, data.identifier, jobConfig)
result.size should be(0)
}

"validateMetadata with mimeType application/msword and .docx " should " not return exception messages if content is having valid artifactUrl" in {
val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://sunbirddev.blob.core.windows.net/sunbird-content-dev/content/do_112216615190192128128/artifact/prdassetstagging-2_1491286084107.docx"), None)
val data = new ObjectData("do_123", Map[String, AnyRef]("name" -> "Content Name", "identifier" -> "do_123", "pkgVersion" -> 0.0.asInstanceOf[AnyRef], "mimeType" -> "application/msword", "artifactUrl" -> "https://ekstep-public-dev.s3-ap-south-1.amazonaws.com/content/do_112216615190192128128/artifact/prdassetstagging-2_1491286084107.docx"), None)
val result: List[String] = new TestLiveContentPublisher().validateMetadata(data, data.identifier, jobConfig)
result.size should be(0)
}
Expand Down
6 changes: 0 additions & 6 deletions publish-pipeline/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@
<module>content-publish</module>
<module>live-node-publisher</module>
</modules>
<dependencies><dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

<build>
<pluginManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,11 @@ class QRCodeImageGeneratorFunction(config: QRCodeImageGeneratorConfig,
val zipDownloadUrl = cloudStorageUtil.uploadFile(event.storagePath.replace("/", ""), zipFile, Some(false), container = event.storageContainer)
logger.info("QRCodeImageGeneratorService:processMessage: zipDownloadUrl - " + zipDownloadUrl(1))
logger.info("QRCodeImageGeneratorService:processMessage: config.cloudStorageEndpoint - " + config.cloudStorageEndpoint+" config.cloudStorageProxyHost - "+config.cloudStorageProxyHost)
var newDownloadUrl = zipDownloadUrl(1).replaceAll(config.cloudStorageEndpoint, config.cloudStorageProxyHost)
logger.info("QRCodeImageGeneratorService:processMessage: newDownloadUrl - " + newDownloadUrl)
metrics.incCounter(config.cloudDbHitCount)
if(config.cloudStorageEndpoint.nonEmpty){
var newDownloadUrl = zipDownloadUrl(1).replaceAll(config.cloudStorageEndpoint, config.cloudStorageProxyHost)
logger.info("QRCodeImageGeneratorService:processMessage: newDownloadUrl - " + newDownloadUrl)
logger.info("QRCodeImageGeneratorService:processMessage: event - " + event)
qRCodeImageGeneratorUtil.updateCassandra(config.cassandraDialCodeBatchTable, 2, newDownloadUrl, "processid", event.processId, metrics)
} else {
qRCodeImageGeneratorUtil.updateCassandra(config.cassandraDialCodeBatchTable, 2, zipDownloadUrl(1), "processid", event.processId, metrics)
}

logger.info("QRCodeImageGeneratorService:processMessage: event - " + event)
qRCodeImageGeneratorUtil.updateCassandra(config.cassandraDialCodeBatchTable, 2, newDownloadUrl, "processid", event.processId, metrics)
}
else {
logger.info("QRCodeImageGeneratorService:processMessage: Skipping zip creation due to missing processId.")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.sunbird.job.qrimagegenerator.task

import com.typesafe.config.Config
import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.scala.OutputTag
Expand Down Expand Up @@ -59,7 +58,7 @@ class QRCodeImageGeneratorConfig(override val config: Config) extends BaseJobCon
val cassandraDialCodeImageTable: String = config.getString("lms-cassandra.table.image")
val cassandraDialCodeBatchTable: String = config.getString("lms-cassandra.table.batch")

val cloudStorageEndpoint: String = if (config.hasPath("cloud_storage_endpoint")) config.getString("cloud_storage_endpoint") else ""
val cloudStorageProxyHost: String = if (config.hasPath("cloud_storage_proxy_host")) config.getString("cloud_storage_endpoint") else ""
val cloudStorageEndpoint: String = config.getString("cloud_storage_endpoint")
val cloudStorageProxyHost: String = config.getString("cloud_storage_proxy_host")
val indexImageURL: Boolean = if (config.hasPath("qr.image.indexImageUrl")) config.getBoolean("qr.image.indexImageUrl") else true
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,10 @@ class QRCodeImageGeneratorUtil(config: QRCodeImageGeneratorConfig, cassandraUtil
logger.info("QRCodeImageGeneratorUtil:createQRImages: path after - " + path.replace("/", ""))
val imageDownloadUrl = cloudStorageUtil.uploadFile(path.replace("/", ""), finalImageFile, Some(false), container = container)
logger.info("QRCodeImageGeneratorUtil:createQRImages: imageDownloadUrl - " + imageDownloadUrl(1))
if(config.cloudStorageEndpoint.nonEmpty){
logger.info("QRCodeImageGeneratorUtil:createQRImages: config.cloudStorageEndpoint - " + config.cloudStorageEndpoint+" config.cloudStorageProxyHost - "+config.cloudStorageProxyHost)
var newDownloadUrl = imageDownloadUrl(1).replaceAll(config.cloudStorageEndpoint, config.cloudStorageProxyHost)
logger.info("QRCodeImageGeneratorService:processMessage: newDownloadUrl before - " + newDownloadUrl)
updateCassandra(config.cassandraDialCodeImageTable, 2, newDownloadUrl, "filename", fileName, metrics)
} else {
updateCassandra(config.cassandraDialCodeImageTable, 2, imageDownloadUrl(1), "filename", fileName, metrics)
}

logger.info("QRCodeImageGeneratorUtil:createQRImages: config.cloudStorageEndpoint - " + config.cloudStorageEndpoint+" config.cloudStorageProxyHost - "+config.cloudStorageProxyHost)
var newDownloadUrl = imageDownloadUrl(1).replaceAll(config.cloudStorageEndpoint, config.cloudStorageProxyHost)
logger.info("QRCodeImageGeneratorService:processMessage: newDownloadUrl before - " + newDownloadUrl)
updateCassandra(config.cassandraDialCodeImageTable, 2, newDownloadUrl, "filename", fileName, metrics)
} catch {
case e: Exception =>
metrics.incCounter(config.dbFailureEventCount)
Expand Down
Loading

0 comments on commit f5d8f13

Please sign in to comment.