diff --git a/.github/workflows/ci-workflow.yml b/.github/workflows/ci-workflow.yml new file mode 100644 index 0000000..5bf597c --- /dev/null +++ b/.github/workflows/ci-workflow.yml @@ -0,0 +1,57 @@ +name: CI Workflow + +on: + push: + branches: [ '**' ] + tags: [ '**' ] + workflow_dispatch: + +concurrency: ci-${{ github.ref }} + +jobs: + buildAndTest: + name: Build, Test, and Package + runs-on: ubuntu-latest + env: + RESOURCE_GROUP: hri-dev1-wdc-kube + + steps: + - name: Set branch name environment variable + uses: nelonoel/branch-name@v1.0.1 + + - uses: actions/checkout@v2 + + - name: Copyright Check + run: ./copyrightCheck.sh + + - name: Set up adoptopenjdk8 + uses: actions/setup-java@v2 + with: + java-version: '8' + distribution: 'adopt' + cache: 'gradle' + + - name: Set GitHub Tag environment variable + if: startsWith(github.ref, 'refs/tags/v') == true + run: | + TAG_ID=${GITHUB_REF##*/} + echo "ACTIONS_TAG=$TAG_ID" >> $GITHUB_ENV + + - name: Gradle build + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: ./gradlew clean build + + - name: Gradle publish + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: ./gradlew publish + + - name: Post Slack Update + if: ${{ failure() && ( github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop' || startsWith(github.ref, 'refs/heads/support-') ) }} + id: slack + uses: slackapi/slack-github-action@v1.14.0 + with: + payload: "{\"Repo\":\"${{ github.repository }}\",\"Workflow\":\"${{ github.workflow }}\",\"Branch\":\"${{ env.BRANCH_NAME }}\",\"Link\":\"https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}\"}" + env: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} diff --git a/.github/workflows/code-scans.yml b/.github/workflows/code-scans.yml new file mode 100644 index 0000000..76acfae --- /dev/null +++ b/.github/workflows/code-scans.yml @@ -0,0 +1,45 @@ +name: Code Scans + +on: + push: + branches: [ 'main', 'develop', 'support-*' ] + pull_request: + types: [opened, synchronize, reopened] + +concurrency: code-scans-${{ github.ref }} + +jobs: + buildAndTest: + name: Static Code Analysis + runs-on: ubuntu-latest + + steps: + - name: Set branch name environment variable + uses: nelonoel/branch-name@v1.0.1 + + - uses: actions/checkout@v2 + with: + # full checkout for SonarCloud analysis + fetch-depth: 0 + + - name: Set up Java 11 + uses: actions/setup-java@v2 + with: + java-version: '11' + distribution: 'adopt' + cache: 'gradle' + + - name: SonarCloud Scan + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} + run: ./gradlew clean build sonarqube -Dsonar.login=$SONAR_TOKEN + + - name: Post Slack Update + if: ${{ failure() && ( github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop' || startsWith(github.ref, 'refs/heads/support-') ) }} + id: slack + uses: slackapi/slack-github-action@v1.14.0 + with: + payload: "{\"Repo\":\"${{ github.repository }}\",\"Workflow\":\"${{ github.workflow }}\",\"Branch\":\"${{ env.BRANCH_NAME }}\",\"Link\":\"https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}\"}" + env: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml new file mode 100644 index 0000000..f07c6cb --- /dev/null +++ b/.github/workflows/pull_request.yml @@ -0,0 +1,61 @@ +name: Pull Request Checks + +on: + pull_request: + types: [opened] + pull_request_review: + types: [submitted] + workflow_dispatch: + +concurrency: pr-${{ github.ref }} + +jobs: + dependency-checks: + name: Dependency Checks + # run if the PR was opened or there was a comment containing '/pr_checks' + if: (github.event_name == 'pull_request') || (github.event_name == 'pull_request_review' && contains(github.event.review.body, '/pr_checks')) + runs-on: ubuntu-latest + env: + RESOURCE_GROUP: hri-dev1-wdc-kube + + steps: + - name: Set branch name environment variable + run: echo "BRANCH_NAME=$(echo ${{github.event.pull_request.head.ref}} | tr / -)" >> $GITHUB_ENV + + - uses: actions/checkout@v2 + + - name: Set up adoptopenjdk8 + uses: actions/setup-java@v2 + with: + java-version: '8' + distribution: 'adopt-hotspot' + cache: 'gradle' + + - name: Gradle build + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: ./gradlew clean build publishToMavenLocal + + - name: Check OpenSource Dependencies + # override default -eo pipefail + shell: bash --noprofile --norc {0} + env: + SONATYPE_OSS_PASSWORD: ${{ secrets.SONATYPE_OSS_PASSWORD }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + ./gradlew ossIndexAudit --info + found=$? + echo "VULNERABILITIES=$found" >> $GITHUB_ENV + [[ "$found" == 1 ]] && echo "::error ::Vulnerabilities found in dependencies." + exit 0 + + - uses: actions/github-script@v4 + if: ${{ env.VULNERABILITIES != 0 }} + with: + script: | + github.issues.createComment({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + body: "## ⚠️ Vulnerabilities found in dependencies.\nSee the 'PR -> Vulnerability Checks' logs for more details: https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}" + }) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9b48b61 --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +/build + +# IntelliJ +.idea +.idea/ +*.iml +*.iws +*.ipr +.DS_Store + +# Gradle +.gradle/ +.gradletasknamecache +gradle-app.setting + +# AppScan on Cloud - ASoC +/dependencies +appscan.irx +appscan_logs.zip + +*.class +*.log +out diff --git a/README.md b/README.md index aa552ce..4b6f1ce 100644 --- a/README.md +++ b/README.md @@ -5,16 +5,16 @@ This repo contains the Alvearie Health Record Ingestion Flink pipeline processin *_This is a draw.io diagram embedded in a png. Use draw.io to make changes._ ### Input/Sources -Records are read from both the `*.in` and `*.notification` Kafka topics. The `*.notification` topic supplies Batch metadata information to the `Validation Processor` and the `Tracker`. + Records are read from both the `*.in` and `*.notification` Kafka topics. The `*.notification` topic supplies Batch metadata information to the `Validation Processor` and the `Tracker`. ### Validation Processor -The Validation processor stores Batch notification messages in it's broadcast state, which is used for initial checks for every record. It then calls a provided record validator and outputs the record to the valid or invalid downstream Kafka sinks. Additionally, a message is sent to the Tracker indicating if the record is valid. +The Validation processor stores Batch notification messages in it's broadcast state, which is used for initial checks for every record. It then calls a provided record validator and outputs the record to the valid or invalid downstream Kafka sinks. Additionally, a message is sent to the Tracker indicating if the record is valid. Initial record checks: -* record has `batchId` header -* `batchId` exists -* Batch status is not `failed`, `terminated`, or `completed` -* provided Validator checks record + * record has `batchId` header + * `batchId` exists + * Batch status is not `failed`, `terminated`, or `completed` + * provided Validator checks record The Validation processor uses the job parallelism, same as both Kafka sources. It uses a Broadcast state, because the stream of records is not keyed, so every instance needs to know about every batch. Keying the stream by batch id would limit the parallelism to 1 for each batch. The parallelism should be less than or equal to the number of Kafka partitions for the `*.in` topic. @@ -23,19 +23,19 @@ When the validation processor attempts to get a batch by batchId or tenantId, if ### Tracker The Tracker keeps track of every batch's progress and calls the HRI Management API when it is complete or fails. Batches are failed if the number of invalid records reaches the threshold, which is set per batch, or if more records are received then expected. Batches are completed once all the records are received and after a configurable delay. The Batch completion delay is used to wait for any additional records, which would fail the batch. Once the batch is completed, additional records are sent to the invalid topic and do not affect the status of the batch. -Because both Batch notification messages and Validation valid count messages could trigger a Batch status change, both of these input streams have to be keyed by the Batch ID. This ensures that all the messages for a given batch are always processed by the same Tracker instance, which enables it to store the total count and Batch notification together. +Because both Batch notification messages and Validation valid count messages could trigger a Batch status change, both of these input streams have to be keyed by the Batch ID. This ensures that all the messages for a given batch are always processed by the same Tracker instance, which enables it to store the total count and Batch notification together. This however does limit the parallelism of the Tracker. Every Batch can only be processed by one Tracker instance. We don't believe this will be a significant performance/throughput bottleneck since the computation is very minimal. Currently, the Tracker's parallelism is set to one quarter of the job's parallelism. ### Output/Sinks Records that pass validation are written to `*.out` topic. For invalid records, an Invalid record is written to the `*.invalid` topic. The Invalid records contains the error message and a pointer (topic, partition, offset) to the original record, and is defined in the `hri-api-spec` [repo](https://github.com/Alvearie/hri-api-spec/blob/main/notifications/invalidRecord.json). Once a Batch is failed or terminated, the Validation processor stops forwarding valid or invalid records down stream. -The Tracker uses a HRI Management API sink to complete or fail a Batch, which is described above. The sink calls the Batch `processingComplete` or `fail` endpoints respectively. Note that the HRI Management API will write a new Batch notification reflecting the status change to the `*.notification` topic. This creates an external loop in our flow, but is necessary for the Validation processor to pick up these changes. The Tracker keeps additional state about whether it has completed or failed a batch to prevent infinite loops. +The Tracker uses a HRI Management API sink to complete or fail a Batch, which is described above. The sink calls the Batch `processingComplete` or `fail` endpoints respectively. Note that the HRI Management API will write a new Batch notification reflecting the status change to the `*.notification` topic. This creates an external loop in our flow, but is necessary for the Validation processor to pick up these changes. The Tracker keeps additional state about whether it has completed or failed a batch to prevent infinite loops. If, when updating a batch as either `processingComplete` or `fail`, the returned HTTP status code is a `409` (Conflict), indicating that the batch is already in the desired state, a warning message will be logged but no other errors will be raised. For all other HTTP status codes in the `400`s (e.g. `400-Bad Request`, `401-Unauthorized`) returned, the attempt to update the status of the batch will immediately fail. This scenario indicates something is wrong with the HTTP requests, and there is little point in retrying. For any other HTTP error status code, the HRI Management API sink will retry the status update for 24 hours, with an exponentially increasing amount of time between each attempt (with a maximum wait time of 5 minutes.) ### Testing -There are three constructors for the Base Validation Job class, one for production and two for testing purposes. One test constructor takes mock sources and sinks for end-to-end unit tests of the entire job. Any changes to the job will require updating and or adding additional end-to-end unit tests to `ValidationJobIntTest.scala`. The other test constructor is for stand-alone integration testing without the HRI Management API. Instead of using the HRI Management API sink, it uses a Kafka sink that writes Batch notification messages directly to the `*.notification` topic. This enables testing error scenarios that are difficult to simulate with the HRI Management API. +There are three constructors for the Base Validation Job class, one for production and two for testing purposes. One test constructor takes mock sources and sinks for end-to-end unit tests of the entire job. Any changes to the job will require updating and or adding additional end-to-end unit tests to `ValidationJobIntTest.scala`. The other test constructor is for stand-alone integration testing without the HRI Management API. Instead of using the HRI Management API sink, it uses a Kafka sink that writes Batch notification messages directly to the `*.notification` topic. This enables testing error scenarios that are difficult to simulate with the HRI Management API. ## Integration Testing hri-flink-pipeline-core In order to fully test your branch of hri-flink-pipeline-core, you must create a branch of a hri-flink-validation repo, set it to point to your branch and run the test on that branch as well. @@ -68,7 +68,7 @@ Next, these are the steps to actively test your work in hri-flink-pipeline-core After you pass all the tests, create a PR, have the PR approved, merge/rebase this branch back into develop, and then delete this branch... The very last step is to delete the hri-flink-validation branch you made. ## Releases -Releases are created by creating Git tags, which trigger a GitHub Actions build that publishes a release version in GitHub packages, see [Overall strategy](https://github.com/Alvearie/HRI/wiki/Overall-Project-Branching,-Test,-and-Release-Strategy) for more details. +Releases are created by creating Git tags, which trigger a GitHub Actions build that publishes a release version in GitHub packages. ## Contribution Guide Please read [CONTRIBUTING.md](CONTRIBUTING.md) for details on our code of conduct, and the process for submitting pull requests to us. diff --git a/build.gradle b/build.gradle index 2314852..9ca467a 100644 --- a/build.gradle +++ b/build.gradle @@ -4,42 +4,59 @@ * SPDX-License-Identifier: Apache-2.0 */ +buildscript { + repositories { + mavenCentral() + } + dependencies { + classpath "org.sonatype.gradle.plugins:scan-gradle-plugin:2.2.0" + } +} + plugins { // Apply the scala plugin to add support for Scala id 'scala' id 'maven-publish' id "com.github.maiflai.scalatest" version "0.25" id "org.scoverage" version "4.0.1" + id "org.sonatype.gradle.plugins.scan" version "2.2.0" + id "org.sonarqube" version "3.3" } group = 'org.alvearie.hri.flink' -version = '3.0-1.0.2' +version = '3.1-1.0.3' // Update version number in releases description = """HRI Flink Pipeline Core Library""" ext { javaVersion = '1.8' - flinkVersion = '1.10.0' + flinkVersion = '1.10.3' scalaBinaryVersion = '2.12' scalaVersion = '2.12.11' scalaTestVersion = '3.1.1' - slf4jVersion = '1.7.7' - log4jVersion = '1.2.17' + slf4jVersion = '1.7.32' + log4jVersion = '2.13.2' jacksonVersion = '2.12.0' - branch = System.getenv('TRAVIS_BRANCH') != null - ? System.getenv('TRAVIS_BRANCH') + branch = System.getenv('BRANCH_NAME') != null + ? System.getenv('BRANCH_NAME') : getWorkingBranch() - + ossPassword = findProperty("SONATYPE_OSS_PASSWORD") ?: System.getenv("SONATYPE_OSS_PASSWORD") } -// If not running in travis add 'local' to the version to support local development -if (System.getenv('TRAVIS_BRANCH') == null || System.getenv('TRAVIS_BRANCH') == "") { +/** + * Set the version name + * - If developing locally, no BRANCH_NAME should be set and the version suffix is "-local-SNAPSHOT" + * - If pushing to a branch, no ACTIONS_TAG should be set and the version suffix is "-SNAPSHOT" + * - If pushing to a tag, the tag should match the version number, and the version number is now the version name. + * Note: The "should be set" variables are attempted to be set in ci-workflow.yml. + */ +if (System.getenv('BRANCH_NAME') == null || System.getenv('BRANCH_NAME') == "") { version = "${branch}-local-SNAPSHOT" -} else if (System.getenv('TRAVIS_TAG') == null || System.getenv('TRAVIS_TAG') == "") { +} else if (System.getenv('ACTIONS_TAG') == null || System.getenv('ACTIONS_TAG') == "") { version = "${branch}-SNAPSHOT" -} else if (System.getenv('TRAVIS_TAG') == "v${version}") { +} else if (System.getenv('ACTIONS_TAG') == "v${version}") { version = "${version}" } else { - throw new InvalidUserDataException(String.format("The tag '%s' does not match with the current release version '%s'",System.getenv('TRAVIS_TAG'),"${version}")); + throw new InvalidUserDataException(String.format("The tag '%s' does not match with the current release version '%s'",System.getenv('ACTIONS_TAG'),"${version}")); } task jarTests(type: Jar) { @@ -54,45 +71,114 @@ tasks.withType(JavaCompile) { options.encoding = 'UTF-8' } -// declare where to find the dependencies of your project repositories { - ['TBD'].each { repo -> - maven { - credentials { - username = findProperty('user') ?: System.getenv('user') - password = findProperty('password') ?: System.getenv('password') - } - - url "TBD/$repo/" + maven { + url = uri("https://maven.pkg.github.com/Alvearie/hri-api-spec") + credentials { + username = findProperty("GITHUB_ACTOR") ?: System.getenv("GITHUB_ACTOR") + password = findProperty("GITHUB_TOKEN") ?: System.getenv("GITHUB_TOKEN") } } mavenCentral() mavenLocal() } +ossIndexAudit { + username = 'hribld@us.ibm.com' + password = "${ossPassword}" + allConfigurations = true // if true includes the dependencies in all resolvable configurations. By default is false, meaning only 'compileClasspath', 'runtimeClasspath', 'releaseCompileClasspath' and 'releaseRuntimeClasspath' are considered + useCache = true // true by default + cacheExpiration = 'PT86400S' // note: time in seconds (24hrs); 12 hours if omitted. It must follow the Joda Time specification at https://www.javadoc.io/doc/joda-time/joda-time/2.10.4/org/joda/time/Duration.html#parse-java.lang.String- + colorEnabled = true // if true prints vulnerability description in color. By default is true. + printBanner = true // if true will print ASCII text banner. By default is true. + + // Vulnerabilities that can safely be ignored + excludeVulnerabilityIds = [ + // pkg:maven/org.apache.zookeeper/zookeeper@3.4.8 (from flink-test-utils) + '1775f19b-5e9c-48f8-8c8f-ef245350531b', '43b8cbe5-324d-416b-b613-3dcaba1b5a6c', 'bec057e0-9945-49c4-92c9-4be669bb5331', '1e65ed27-7a06-4464-a115-c8421c879281', + + // pkg:maven/io.netty/netty@3.7.0.Final (from flink-test-utils) + '20be5124-16a3-4d77-8668-d83f04a67808', '20167979-f872-4765-85ef-9b7be870cecb', 'f2a31abe-1af6-4f6f-aeeb-60e0e0d3e981', '6ff63cbd-c4ae-4268-be95-43322746b6be', + '8b7d8928-61ee-4708-bf37-feece927a872', 'cbde3175-9c07-491c-836d-2a146b61e3b6', '846fbf13-a0b9-4cab-b820-8415a30326bd', 'b3c3a56f-37c0-4706-bf54-2f61c5c9786f', + 'a3df5795-2bdb-4ec6-b9c2-babe9ec470e5', '20b79835-7a43-4bea-a985-5e12df1d0b0a', + // pkg:maven/io.netty/netty@3.7.0.Final (from Snappy and Bzip2 compression components. Can’t force Flink to upgrade, and not used in Flink.) + '14ed6543-1974-4241-b353-3afd846eadc9', '99b9cf63-0f91-43fb-b227-e53917370ed9', + + // pkg:maven/com.google.guava/guava@16.0.1 (from flink-test-utils) + '24585a7f-eb6b-4d8d-a2a9-a6f16cc7c1d0', '8e973be2-4220-410d-a4cb-2de7a755bdbe', + + // pkg:maven/junit/junit@4.12 (from flink-test-utils) + '7ea56ad4-8a8b-4e51-8ed9-5aad83d8efb1', + + // pkg:maven/org.jsoup/jsoup@1.10.2 (from com.vladsch.flexmark:flexmark-all) + '5dbdb043-212c-4971-9653-d04e1cfc5080', + + // pkg:maven/org.apache.pdfbox/pdfbox@2.0.8 (from com.vladsch.flexmark:flexmark-all) + '32bccbbb-bd3f-44f3-a530-5827a93d74db', '2418a058-dfb7-4303-b1d1-ac5262d9e499', '1be5c4d8-4994-4dd6-92b1-ca53ac7e6a17', 'f984e75c-06bf-43dc-86db-3f8dfe82f430', + 'bd0f0dd9-1356-4dec-a8a3-edc777cc92d5', 'b70e447a-06a1-48b0-a02b-cd5b78862777', + + // pkg:maven/org.apache.pdfbox/xmpbox@2.0.8 (from com.vladsch.flexmark:flexmark-all) + '32bccbbb-bd3f-44f3-a530-5827a93d74db', '2418a058-dfb7-4303-b1d1-ac5262d9e499', '1be5c4d8-4994-4dd6-92b1-ca53ac7e6a17', 'f984e75c-06bf-43dc-86db-3f8dfe82f430', + 'bd0f0dd9-1356-4dec-a8a3-edc777cc92d5', 'b70e447a-06a1-48b0-a02b-cd5b78862777', + + //pkg:maven/org.apache.commons/commons-compress@1.18 (from org.apache.flink:flink-scala_2.12:1.10.3) + //NOTE: these vulnerabilities cannot be eliminated UNTIL we upgrade our Flink version to Flink 1.14.0 + //Need to get Security Exception until that happens + '68232267-bb25-4b04-8dec-caf7c11c7293', '69b8043a-3002-48fa-9762-8f6040d83de1', '4102317d-8250-465e-a46d-179d42792b14', + '7a6a9dd2-67de-4e2a-b406-7aa4a4ce29cc', '8ea14e38-e6cc-48d9-bfe4-ec89f93596e7', + + //pkg:maven/org.apache.logging.log4j/log4j-core@2.11.2 (from org.scala-sbt:zinc_2.12:1.3.5 - Used by gradle scala plugin only) + 'd3477f9c-032a-44a7-a5e1-02ae35e4737c', + + //pkg:maven/log4j/log4j@1.2.17 (from com.vladsch.flexmark:flexmark-all:0.35.10 - Used in Unit Test runs only) + 'e6e4ebea-da12-4bde-8f24-6272925ad093', + ] + // excludeCoordinates = ['commons-fileupload:commons-fileupload:1.3'] // list containing coordinate of components which if vulnerable should be ignored +} + +sonarqube { + properties { + property "sonar.host.url", "https://sonarcloud.io" + property "sonar.sourceEncoding", "UTF-8" + property "sonar.projectKey", "Alvearie_hri-flink-pipeline-core" + property "sonar.organization", "alvearie" + property "sonar.scala.coverage.reportPaths", "./build/reports/scoverage/scoverage.xml,./build/reports/scoverage/cobertura.xml" + } +} + dependencies { // Scala lib implementation "org.scala-lang:scala-library:${scalaVersion}" // -------------------------------------------------------------- - // Flink dependencies that should not be included as transitive - // dependencies of this library, because they should not be + // Flink dependencies that should not be included as transitive + // dependencies of this library, because they should not be // included in Flink job jars. // -------------------------------------------------------------- compileOnly "org.apache.flink:flink-scala_${scalaBinaryVersion}:${flinkVersion}" compileOnly "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}" - compileOnly "log4j:log4j:${log4jVersion}" + compileOnly("org.scala-lang:scala-compiler") { + version { + require("${scalaVersion}") + } + } + compileOnly "org.apache.logging.log4j:log4j-core:${log4jVersion}" compileOnly "org.slf4j:slf4j-log4j12:${slf4jVersion}" testImplementation "org.apache.flink:flink-scala_${scalaBinaryVersion}:${flinkVersion}" testImplementation "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}" - testImplementation "log4j:log4j:${log4jVersion}" + testImplementation("org.scala-lang:scala-compiler") { + version { + require("${scalaVersion}") + } + } + testImplementation "org.apache.logging.log4j:log4j-core:${log4jVersion}" testImplementation "org.slf4j:slf4j-log4j12:${slf4jVersion}" // -------------------------------------------------------------- // Dependencies that library users should include in their job // shadow jar // -------------------------------------------------------------- - implementation "org.alvearie.hri:hri-api-batch-notification:3.0-2.0.1" + implementation "org.alvearie.hri:hri-api-batch-notification:3.1-2.0.2" implementation "org.apache.flink:flink-connector-kafka_${scalaBinaryVersion}:${flinkVersion}" implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" implementation "com.fasterxml.jackson.module:jackson-module-scala_${scalaBinaryVersion}:${jacksonVersion}" @@ -110,9 +196,10 @@ dependencies { testImplementation "org.apache.flink:flink-runtime_${scalaBinaryVersion}:${flinkVersion}:tests" testImplementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}:tests" + testRuntimeOnly "com.vladsch.flexmark:flexmark-all:0.35.10" - testRuntime 'org.pegdown:pegdown:1.4.2' - testCompile "info.picocli:picocli:4.2.0" + testRuntimeOnly 'org.pegdown:pegdown:1.6.0' + testImplementation 'info.picocli:picocli:4.6.1' } publishing { @@ -121,6 +208,7 @@ publishing { from components.java artifactId = 'hri-flink-pipeline-core' } + mavenJava(MavenPublication) { artifact jarTests } @@ -128,10 +216,11 @@ publishing { repositories { maven { - url 'TBD' + name = "GitHubPackages" + url = uri("https://maven.pkg.github.com/Alvearie/hri-flink-pipeline-core") credentials { - username findProperty('user') ?: System.getenv('user') - password findProperty('password') ?: System.getenv('password') + username = findProperty("GITHUB_ACTOR") ?: System.getenv("GITHUB_ACTOR") + password = findProperty("GITHUB_TOKEN") ?: System.getenv("GITHUB_TOKEN") } } } @@ -148,6 +237,7 @@ scoverage { coverageOutputXML = true coverageOutputHTML = true } + reportScoverage.doLast { println "Scoverage report:\n file:///$buildDir/reports/scoverage/index.html" } @@ -166,7 +256,5 @@ def getWorkingBranch() { def workingBranch = """git --git-dir=${rootDir}/.git --work-tree=${rootDir} rev-parse --abbrev-ref HEAD""".execute().text.trim() - println "Working branch: " + workingBranch return workingBranch } - diff --git a/copyrightCheck.sh b/copyrightCheck.sh new file mode 100755 index 0000000..dd43ba4 --- /dev/null +++ b/copyrightCheck.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +# (C) Copyright IBM Corp. 2020 +# +# SPDX-License-Identifier: Apache-2.0 + +files=$(find . -name "*.scala" -or -name "*.sh") + +rtn=0 +for file in $files; do + #echo $file + if ! head -n 5 $file | grep -qE '(Copyright IBM Corp)|(MockGen)'; then + rtn=1 + echo $file + fi +done + +if [ $rtn -ne 0 ]; then + echo "Found files without copyright, exiting." + exit 1 +fi diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 4b7e1f3..4c5803d 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.5.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.4-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew old mode 100644 new mode 100755 diff --git a/src/main/scala/org/alvearie/hri/flink/core/BaseValidationJob.scala b/src/main/scala/org/alvearie/hri/flink/core/BaseValidationJob.scala index 6434933..a705a1a 100644 --- a/src/main/scala/org/alvearie/hri/flink/core/BaseValidationJob.scala +++ b/src/main/scala/org/alvearie/hri/flink/core/BaseValidationJob.scala @@ -272,7 +272,7 @@ class BaseValidationJob( // used for functional testing of the Validation Jobs without the HRI Management API. def getRecordCountSink(props: Properties): SinkFunction[NotificationRecord] = { if (useMgmtApi) { - log.info("Creating HRI MgmtApiSink({}) for Tracker output", mgmtApiUrl) + log.info("Creating MgmtApiSink({}) for Tracker output", mgmtApiUrl) return new MgmtApiSink(tenantId, mgmtApiUrl, mgmtClientId, mgmtClientSecret, mgmtClientAudience, oauthServiceBaseUrl) } else { log.info("Creating KafkaProducer({}) for Tracker output", notificationTopic) diff --git a/src/main/scala/org/alvearie/hri/flink/core/ValidationProcessFunction.scala b/src/main/scala/org/alvearie/hri/flink/core/ValidationProcessFunction.scala index f55b42d..2ac57de 100644 --- a/src/main/scala/org/alvearie/hri/flink/core/ValidationProcessFunction.scala +++ b/src/main/scala/org/alvearie/hri/flink/core/ValidationProcessFunction.scala @@ -9,11 +9,10 @@ package org.alvearie.hri.flink.core import java.nio.charset.StandardCharsets import java.time.Instant import java.util.concurrent.TimeUnit - import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.node.ObjectNode import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper} -import org.alvearie.hri.api.{BatchLookup, BatchNotification, MgmtClient, RequestException, InvalidRecord} +import org.alvearie.hri.api.{BatchLookup, BatchNotification, InvalidRecord, MgmtClient, RequestException} import org.alvearie.hri.flink.core.serialization.{HriRecord, NotificationRecord} import org.apache.flink.api.common.state.MapStateDescriptor import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction @@ -22,9 +21,10 @@ import org.apache.http.HttpStatus import org.apache.kafka.common.header.Headers import org.slf4j.LoggerFactory +import java.time import scala.util.{Failure, Success, Try} import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration +import scala.concurrent.duration.{Duration, FiniteDuration} class ValidationProcessFunction( notificationDescriptor: MapStateDescriptor[String, BatchNotification], @@ -52,9 +52,9 @@ class ValidationProcessFunction( // so instead generate it on demand @transient lazy val mgmtClient: BatchLookup = createMgmtClient() - var initialBackOff = Duration.apply(1, TimeUnit.SECONDS) - var maxBackOff = Duration.apply(5, TimeUnit.MINUTES) - var maxRetry = java.time.Duration.ofHours(24) + var initialBackOff: FiniteDuration = Duration.apply(1, TimeUnit.SECONDS) + var maxBackOff: FiniteDuration = Duration.apply(5, TimeUnit.MINUTES) + var maxRetry: time.Duration = java.time.Duration.ofHours(24) def getUseMgmtApi(): Boolean = {useMgmtApi} @@ -226,8 +226,7 @@ class ValidationProcessFunction( def extractBatchId(headers: Headers): String = { if (headers != null) { return headers.asScala.find(_.key() == "batchId") - .map(h => new String(h.value(), StandardCharsets.UTF_8)) - .getOrElse(null) + .map(h => new String(h.value(), StandardCharsets.UTF_8)).orNull } null } diff --git a/src/main/scala/org/alvearie/hri/flink/core/serialization/HriRecordDeserializer.scala b/src/main/scala/org/alvearie/hri/flink/core/serialization/HriRecordDeserializer.scala index f137d8b..d0b0d9a 100644 --- a/src/main/scala/org/alvearie/hri/flink/core/serialization/HriRecordDeserializer.scala +++ b/src/main/scala/org/alvearie/hri/flink/core/serialization/HriRecordDeserializer.scala @@ -19,7 +19,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord class HriRecordDeserializer() extends KafkaDeserializationSchema[HriRecord] { override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): HriRecord = { - return new HriRecord(record.headers(), record.key(), record.value(), record.topic(), record.partition(), record.offset()) + new HriRecord(record.headers(), record.key(), record.value(), record.topic(), record.partition(), record.offset()) } override def isEndOfStream(nextElement: HriRecord): Boolean = false diff --git a/src/main/scala/org/alvearie/hri/flink/core/serialization/NotificationSerializer.scala b/src/main/scala/org/alvearie/hri/flink/core/serialization/NotificationSerializer.scala index 89e537e..0a234ec 100644 --- a/src/main/scala/org/alvearie/hri/flink/core/serialization/NotificationSerializer.scala +++ b/src/main/scala/org/alvearie/hri/flink/core/serialization/NotificationSerializer.scala @@ -18,7 +18,7 @@ import scala.tools.jline_embedded.internal.Nullable /** * An implementation of a KafkaSerializationSchema, which encodes the Value as a BatchNotification. * It is expected that this class will be used by HRI Flink processors to handle "notification" records. These - * records are produced by the HRI Mgmt-API in a known format and Flink processors must have access to the individual + * records are produced by the hri-mgmt-api in a known format and Flink processors must have access to the individual * field values within the message. * * @param topic Kafka topic to be inserted in serialized ProducerRecords diff --git a/src/test/scala/org/alvearie/hri/flink/core/BaseValidationJobPropertiesTest.scala b/src/test/scala/org/alvearie/hri/flink/core/BaseValidationJobPropertiesTest.scala index 2afa33d..51170d9 100644 --- a/src/test/scala/org/alvearie/hri/flink/core/BaseValidationJobPropertiesTest.scala +++ b/src/test/scala/org/alvearie/hri/flink/core/BaseValidationJobPropertiesTest.scala @@ -19,10 +19,10 @@ import org.scalatest.matchers.should.Matchers._ class BaseValidationJobPropertiesTest extends AnyFunSuite { val TopicBase = "ingest.tenant1.da2" - val TestInputTopic = TopicBase + ".in" - val TestTopicArray = TestInputTopic.split('.').init + val TestInputTopic: String = TopicBase + ".in" + val TestTopicArray: Array[String] = TestInputTopic.split('.').init val TestPassword = "FakePassword" - val TestBrokers = Array[String]("broker1:9092","broker2:9092") + val TestBrokers: Array[String] = Array[String]("broker1:9092","broker2:9092") val TestInputTopic2 = "ingest.22.da3.in" implicit val hriRecordTypeInfo: TypeInformation[HriRecord] = TypeInformation.of(classOf[HriRecord]) @@ -110,7 +110,7 @@ class BaseValidationJobPropertiesTest extends AnyFunSuite { exMsg should include ("It must have 4 elements separated by a '.'") } - test("getRecordCountSink() should return a HRI MgmtApiSink when calling the default constructor") { + test("getRecordCountSink() should return a MgmtApiSink when calling the default constructor") { val baseUrl = "https://mydomain.com/hri" val clientId = "myClientId" val clientSecret = "myClientSecret" @@ -152,7 +152,7 @@ class BaseValidationJobPropertiesTest extends AnyFunSuite { val countsOutputTag: OutputTag[ObjectNode] = new OutputTag[ObjectNode]("counts") val baseJob = new BaseValidationJob(TestInputTopic, TestBrokers, TestPassword, (_ => (true,null)) : Validator, "mgmtApiUrl", "mgmtClientId", "mgmtClientSecret", "audience", "https://oauthdomain.com/hri", 100) val validationProcessFunction = baseJob.getValidationProcessFunction(notificationDescriptor, invalidOutputTag, countsOutputTag) - assert(validationProcessFunction.getUseMgmtApi() == true) + assert(validationProcessFunction.getUseMgmtApi()) } test("constructs validation process function without mgmt client") { @@ -161,6 +161,6 @@ class BaseValidationJobPropertiesTest extends AnyFunSuite { val countsOutputTag: OutputTag[ObjectNode] = new OutputTag[ObjectNode]("counts") val baseJob = new BaseValidationJob(TestInputTopic, TestBrokers, TestPassword, (_ => (true,null)) : Validator, 100) val validationProcessFunction = baseJob.getValidationProcessFunction(notificationDescriptor, invalidOutputTag, countsOutputTag) - assert(validationProcessFunction.getUseMgmtApi() == false) + assert(!validationProcessFunction.getUseMgmtApi()) } } diff --git a/src/test/scala/org/alvearie/hri/flink/core/ValidationProcessFunctionTest.scala b/src/test/scala/org/alvearie/hri/flink/core/ValidationProcessFunctionTest.scala index 51a034a..712704f 100644 --- a/src/test/scala/org/alvearie/hri/flink/core/ValidationProcessFunctionTest.scala +++ b/src/test/scala/org/alvearie/hri/flink/core/ValidationProcessFunctionTest.scala @@ -58,7 +58,7 @@ class ValidationProcessFunctionTest extends AnyFunSuite with MockitoSugar { NotificationDescriptor.enableTimeToLive(getStateTtlConfig) - test("processElement should call getBatch, retry call HRI mgmtClient and return batchNotification") { + test("processElement should call getBatch, retry call mgmtClient and return batchNotification") { val mockClient = mock[MgmtClient] val batchNotification = createTestBatchNotification(DefaultTestBatchId, BatchNotification.Status.SEND_COMPLETED, DefaultTestRecordCount) @@ -126,7 +126,7 @@ class ValidationProcessFunctionTest extends AnyFunSuite with MockitoSugar { thrown.getMessage should startWith("Reached max HRI Management API retry timeout of") } - test("processElement should ship records to invalid output when the batch Id is not in the broadcast state and HRI mgmtClient responds with 'Not Found'") { + test("processElement should ship records to invalid output when the batch Id is not in the broadcast state and mgmtClient responds with 'Not Found'") { val mockClient = mock[MgmtClient] val validator = new TestValidationProcessFunction( diff --git a/src/test/scala/org/alvearie/hri/flink/core/jobtest/JsonValidationJob.scala b/src/test/scala/org/alvearie/hri/flink/core/jobtest/JsonValidationJob.scala index 27d055a..71cae91 100644 --- a/src/test/scala/org/alvearie/hri/flink/core/jobtest/JsonValidationJob.scala +++ b/src/test/scala/org/alvearie/hri/flink/core/jobtest/JsonValidationJob.scala @@ -30,13 +30,13 @@ import scala.util.{Failure, Success, Try} description = Array("HRI JSON-only TEST validation job") ) class JsonValidationJob extends Callable[Integer] with Serializable { - @CommandLine.Option(names = Array("-b", "--brokers"), split = ",", description = Array("Comma-separated list of Event Streams (Kafka) brokers"), required = true) + @CommandLine.Option(names = Array("-b", "--brokers"), split = ",", description = Array("Comma-separated list of Kafka brokers"), required = true) private var brokers: Array[String] = null - @CommandLine.Option(names = Array("-p", "--password"), description = Array("IBM Cloud Event Streams password"), hidden = true, required = false) + @CommandLine.Option(names = Array("-p", "--password"), description = Array("IBM Cloud Kafka password"), hidden = true, required = false) private var password: String = null - @CommandLine.Option(names = Array("-i", "--input"), description = Array("IBM Cloud Event Streams (Kafka) input topic"), required = true) + @CommandLine.Option(names = Array("-i", "--input"), description = Array("IBM Cloud Kafka input topic"), required = true) private var inputTopic: String = null @CommandLine.Option(names = Array("-d", "--batch-delay"), defaultValue = "300000", description = Array("Amount of time to wait in milliseconds for extra records before completing a batch. Default ${DEFAULT-VALUE}")) @@ -66,7 +66,6 @@ class JsonValidationJob extends Callable[Integer] with Serializable { } object JsonValidationJob { - def main(args: Array[String]): Unit = { val command = new CommandLine(new JsonValidationJob()).execute(args:_*) System.exit(command) diff --git a/src/test/scala/org/alvearie/hri/flink/core/jobtest/ValidationJobIntTest.scala b/src/test/scala/org/alvearie/hri/flink/core/jobtest/ValidationJobIntTest.scala index c27371e..f10cb13 100644 --- a/src/test/scala/org/alvearie/hri/flink/core/jobtest/ValidationJobIntTest.scala +++ b/src/test/scala/org/alvearie/hri/flink/core/jobtest/ValidationJobIntTest.scala @@ -310,7 +310,6 @@ class ValidationJobIntTest extends AnyFlatSpec with BeforeAndAfter { } private def getTwoValidHriRecords(testHeaders: TestRecordHeaders, recTwoKey:String): mutable.Seq[HriRecord] = { - val hriRecOne = TestHelper.createOneValidHriRecord(testHeaders, DefaultHriRecordKey.getBytes(StandardCharsets.UTF_8), TestHelper.ValidJsonOne) val hriRecTwo = TestHelper.createOneValidHriRecord(testHeaders, diff --git a/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/HriTestRecsSourceFunction.scala b/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/HriTestRecsSourceFunction.scala index 80fbb6b..a052ac1 100644 --- a/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/HriTestRecsSourceFunction.scala +++ b/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/HriTestRecsSourceFunction.scala @@ -50,7 +50,7 @@ class HriTestRecsSourceFunction() extends RichParallelSourceFunction[HriRecord] } val numHriRecs = hriRecs.size - println(s"Starting HRI Test Recs Sources Processing of " + + println(s"Starting Hri Test Recs Sources Processing of " + s"${numHriRecs} recs at startTime: $startTime") val itr = hriRecs.iterator while(isRunning && itr.hasNext) { diff --git a/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/NotificationSourceFunction.scala b/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/NotificationSourceFunction.scala index 607684b..0185f6a 100644 --- a/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/NotificationSourceFunction.scala +++ b/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/NotificationSourceFunction.scala @@ -87,4 +87,5 @@ class NotificationSourceFunction() extends RichParallelSourceFunction[Notificati delayProcessing } + } diff --git a/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/TestRecordHeaders.scala b/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/TestRecordHeaders.scala index 2dced10..8d778ca 100644 --- a/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/TestRecordHeaders.scala +++ b/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/TestRecordHeaders.scala @@ -12,7 +12,6 @@ import org.apache.kafka.common.header.internals.RecordHeader import org.apache.kafka.common.header.internals.RecordHeaders class TestRecordHeaders extends RecordHeaders with Serializable { - private val serialVersionUID = - 1749935200821233226L private def writeObject(oos: ObjectOutputStream): Unit = {