Skip to content

Commit

Permalink
Issue KN-923 feat: Added Transaction event processor job
Browse files Browse the repository at this point in the history
This is to merge both 'audit-event-generator' and 'audit-history-indexer' jobs
  • Loading branch information
SurabhiAngadi committed Sep 5, 2023
1 parent 037e2e9 commit c0ca29d
Show file tree
Hide file tree
Showing 17 changed files with 1,378 additions and 8 deletions.
14 changes: 6 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
<prerequisites>
<maven>3.0.0</maven>
</prerequisites>
<modules>
<module>transaction-event-processor</module>
<module>newMod</module>
</modules>

<groupId>org.sunbird</groupId>
<groupId>org.sunbird</groupId>
<artifactId>knowledge-platform-jobs</artifactId>
<version>1.0</version>
<packaging>pom</packaging>
Expand All @@ -27,18 +31,12 @@
</activation>
<modules>
<module>jobs-core</module>
<!-- <module>relation-cache-updater</module>-->
<!-- <module>activity-aggregate-updater</module>-->
<module>post-publish-processor</module>
<!-- <module>credential-generator</module>-->
<module>publish-pipeline</module>
<module>search-indexer</module>
<!-- <module>enrolment-reconciliation</module>-->
<module>auto-creator-v2</module>
<module>content-auto-creator</module>
<module>audit-history-indexer</module>
<module>audit-event-generator</module>
<!-- <module>metrics-data-transformer</module>-->
<module>transaction-event-processor</module>
<module>qrcode-image-generator</module>
<module>dialcode-context-updater</module>
<module>cassandra-data-migration</module>
Expand Down
65 changes: 65 additions & 0 deletions transaction-event-processor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Audit Event Generator

Audit Event Generator job generates an audit event for every transaction in Graph updation for content data modification

## Getting Started

These instructions will get you a copy of the project up and running on your local machine for development and testing purposes. See deployment for notes on how to deploy the project on a yarn or kubernetes.
### Prerequisites

1. Download flink-1.13.6-scala_2.12 from [apache-flink-downloads](https://www.apache.org/dyn/closer.lua/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz).
2. Download [hadoop dependencies](https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar) (only for running on Yarn). Copy the hadoop dependency jar under lib folder of the flink download.
3. export HADOOP_CLASSPATH=`<hadoop-executable-dir>/hadoop classpath` either in .bashrc or current execution shell.
4. Docker installed.
5. A running yarn cluster or a kubernetes cluster.

### Build

mvn clean install

## Deployment

### Yarn

Flink requires memory to be allocated for both job-manager and task manager. -yjm parameter assigns job-manager memory and -ytm assigns task-manager memory.

```
./bin/flink run -m yarn-cluster -p 2 -yjm 1024m -ytm 1024m <knowledge-platform-jobs>/audit-event-generator/target/audit-event-generator-0.0.1.jar
```

### Kubernetes

```
# Create a single node cluster
k3d create --server-arg --no-deploy --server-arg traefik --name flink-cluster --image rancher/k3s:v1.0.0
# Export the single node cluster into KUBECONFIG in the current shell or in ~/.bashrc.
export KUBECONFIG="$(k3d get-kubeconfig --name='flink-cluster')"
# Only for Mac OSX
# /usr/local/bin/kubectl -> /Applications/Docker.app/Contents/Resources/bin/kubectl
rm /usr/local/bin/kubectl
brew link --overwrite kubernetes-cli
# Create a configmap using the flink-configuration-configmap.yaml
kubectl create -f knowledge-platform-job/kubernetes/flink-configuration-configmap.yaml
# Create pods for jobmanager-service, job-manager and task-manager using the yaml files
kubectl create -f knowledge-platform-job/kubernetes/jobmanager-service.yaml
kubectl create -f knowledge-platform-job/kubernetes/jobmanager-deployment.yaml
kubectl create -f knowledge-platform-job/kubernetes/taskmanager-deployment.yaml
# Create a port-forwarding for accessing the job-manager UI on localhost:8081
kubectl port-forward deployment/flink-jobmanager 8081:8081
# Submit the job to the Kubernetes single node cluster flink-cluster
./bin/flink run -m localhost:8081 <knowledge-platform-job>/audit-event-generator/target/audit-event-generator-0.0.1.jar
# Commands to delete the pods created in the cluster
kubectl delete deployment/flink-jobmanager
kubectl delete deployment/flink-taskmanager
kubectl delete service/flink-jobmanager
kubectl delete configmaps/flink-config
# Command to stop the single-node cluster
k3d stop --name="flink-cluster"
```
269 changes: 269 additions & 0 deletions transaction-event-processor/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.sunbird</groupId>
<artifactId>knowledge-platform-jobs</artifactId>
<version>1.0</version>
</parent>
<artifactId>transaction-event-processor</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>transaction-event-processor</name>
<description>
Transaction Event Processor Flink Job
</description>

<properties>
<encoding>UTF-8</encoding>
<scoverage.plugin.version>1.4.0</scoverage.plugin.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>

<dependency>
<groupId>org.sunbird</groupId>
<artifactId>jobs-core</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10.6</version>
</dependency>
<dependency>
<groupId>org.sunbird</groupId>
<artifactId>jobs-core</artifactId>
<version>1.0.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
<version>3.0.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.3.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.sunbird</groupId>
<artifactId>platform-common</artifactId>
<version>1.0-beta</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.sunbird</groupId>
<artifactId>platform-telemetry</artifactId>
<version>1.0-beta</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<version>4.9.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sunbird.job.transaction.task.TransactionEventProcessorStreamTask</mainClass>
</transformer>
<!-- append default configs -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
<configuration>
<source>11</source>
<target>11</target>
<scalaVersion>${scala.maj.version}</scalaVersion>
<checkMultipleScalaVersions>false</checkMultipleScalaVersions>
</configuration>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>

<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>audit-event-generator-testsuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scoverage</groupId>
<artifactId>scoverage-maven-plugin</artifactId>
<version>${scoverage.plugin.version}</version>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<aggregate>true</aggregate>
<highlighting>true</highlighting>
</configuration>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.5</version>
<executions>
<execution>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<!-- attached to Maven test phase -->
<execution>
<id>report</id>
<phase>test</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
14 changes: 14 additions & 0 deletions transaction-event-processor/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.file=audit-event-generator.log
log4j.appender.file.append=true
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.MaxFileSize=256KB
log4j.appender.file.MaxBackupIndex=4
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file



Loading

0 comments on commit c0ca29d

Please sign in to comment.