Skip to content

Commit

Permalink
Merge pull request #14 from Alvearie/release-3.1.0
Browse files Browse the repository at this point in the history
Release 3.1-1.0.3
  • Loading branch information
David-N-Perkins authored Oct 27, 2021
2 parents 9b817eb + 69b68e4 commit 031d380
Show file tree
Hide file tree
Showing 20 changed files with 362 additions and 70 deletions.
57 changes: 57 additions & 0 deletions .github/workflows/ci-workflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
name: CI Workflow

on:
push:
branches: [ '**' ]
tags: [ '**' ]
workflow_dispatch:

concurrency: ci-${{ github.ref }}

jobs:
buildAndTest:
name: Build, Test, and Package
runs-on: ubuntu-latest
env:
RESOURCE_GROUP: hri-dev1-wdc-kube

steps:
- name: Set branch name environment variable
uses: nelonoel/branch-name@v1.0.1

- uses: actions/checkout@v2

- name: Copyright Check
run: ./copyrightCheck.sh

- name: Set up adoptopenjdk8
uses: actions/setup-java@v2
with:
java-version: '8'
distribution: 'adopt'
cache: 'gradle'

- name: Set GitHub Tag environment variable
if: startsWith(github.ref, 'refs/tags/v') == true
run: |
TAG_ID=${GITHUB_REF##*/}
echo "ACTIONS_TAG=$TAG_ID" >> $GITHUB_ENV
- name: Gradle build
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: ./gradlew clean build

- name: Gradle publish
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: ./gradlew publish

- name: Post Slack Update
if: ${{ failure() && ( github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop' || startsWith(github.ref, 'refs/heads/support-') ) }}
id: slack
uses: slackapi/slack-github-action@v1.14.0
with:
payload: "{\"Repo\":\"${{ github.repository }}\",\"Workflow\":\"${{ github.workflow }}\",\"Branch\":\"${{ env.BRANCH_NAME }}\",\"Link\":\"https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}\"}"
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
45 changes: 45 additions & 0 deletions .github/workflows/code-scans.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: Code Scans

on:
push:
branches: [ 'main', 'develop', 'support-*' ]
pull_request:
types: [opened, synchronize, reopened]

concurrency: code-scans-${{ github.ref }}

jobs:
buildAndTest:
name: Static Code Analysis
runs-on: ubuntu-latest

steps:
- name: Set branch name environment variable
uses: nelonoel/branch-name@v1.0.1

- uses: actions/checkout@v2
with:
# full checkout for SonarCloud analysis
fetch-depth: 0

- name: Set up Java 11
uses: actions/setup-java@v2
with:
java-version: '11'
distribution: 'adopt'
cache: 'gradle'

- name: SonarCloud Scan
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
run: ./gradlew clean build sonarqube -Dsonar.login=$SONAR_TOKEN

- name: Post Slack Update
if: ${{ failure() && ( github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop' || startsWith(github.ref, 'refs/heads/support-') ) }}
id: slack
uses: slackapi/slack-github-action@v1.14.0
with:
payload: "{\"Repo\":\"${{ github.repository }}\",\"Workflow\":\"${{ github.workflow }}\",\"Branch\":\"${{ env.BRANCH_NAME }}\",\"Link\":\"https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}\"}"
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
61 changes: 61 additions & 0 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
name: Pull Request Checks

on:
pull_request:
types: [opened]
pull_request_review:
types: [submitted]
workflow_dispatch:

concurrency: pr-${{ github.ref }}

jobs:
dependency-checks:
name: Dependency Checks
# run if the PR was opened or there was a comment containing '/pr_checks'
if: (github.event_name == 'pull_request') || (github.event_name == 'pull_request_review' && contains(github.event.review.body, '/pr_checks'))
runs-on: ubuntu-latest
env:
RESOURCE_GROUP: hri-dev1-wdc-kube

steps:
- name: Set branch name environment variable
run: echo "BRANCH_NAME=$(echo ${{github.event.pull_request.head.ref}} | tr / -)" >> $GITHUB_ENV

- uses: actions/checkout@v2

- name: Set up adoptopenjdk8
uses: actions/setup-java@v2
with:
java-version: '8'
distribution: 'adopt-hotspot'
cache: 'gradle'

- name: Gradle build
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: ./gradlew clean build publishToMavenLocal

- name: Check OpenSource Dependencies
# override default -eo pipefail
shell: bash --noprofile --norc {0}
env:
SONATYPE_OSS_PASSWORD: ${{ secrets.SONATYPE_OSS_PASSWORD }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
./gradlew ossIndexAudit --info
found=$?
echo "VULNERABILITIES=$found" >> $GITHUB_ENV
[[ "$found" == 1 ]] && echo "::error ::Vulnerabilities found in dependencies."
exit 0
- uses: actions/github-script@v4
if: ${{ env.VULNERABILITIES != 0 }}
with:
script: |
github.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: "## ⚠️ Vulnerabilities found in dependencies.\nSee the 'PR -> Vulnerability Checks' logs for more details: https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}"
})
23 changes: 23 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/build

# IntelliJ
.idea
.idea/
*.iml
*.iws
*.ipr
.DS_Store

# Gradle
.gradle/
.gradletasknamecache
gradle-app.setting

# AppScan on Cloud - ASoC
/dependencies
appscan.irx
appscan_logs.zip

*.class
*.log
out
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ This repo contains the Alvearie Health Record Ingestion Flink pipeline processin
*_This is a draw.io diagram embedded in a png. Use draw.io to make changes._

### Input/Sources
Records are read from both the `*.in` and `*.notification` Kafka topics. The `*.notification` topic supplies Batch metadata information to the `Validation Processor` and the `Tracker`.
Records are read from both the `*.in` and `*.notification` Kafka topics. The `*.notification` topic supplies Batch metadata information to the `Validation Processor` and the `Tracker`.

### Validation Processor
The Validation processor stores Batch notification messages in it's broadcast state, which is used for initial checks for every record. It then calls a provided record validator and outputs the record to the valid or invalid downstream Kafka sinks. Additionally, a message is sent to the Tracker indicating if the record is valid.
The Validation processor stores Batch notification messages in it's broadcast state, which is used for initial checks for every record. It then calls a provided record validator and outputs the record to the valid or invalid downstream Kafka sinks. Additionally, a message is sent to the Tracker indicating if the record is valid.

Initial record checks:
* record has `batchId` header
* `batchId` exists
* Batch status is not `failed`, `terminated`, or `completed`
* provided Validator checks record
* record has `batchId` header
* `batchId` exists
* Batch status is not `failed`, `terminated`, or `completed`
* provided Validator checks record

The Validation processor uses the job parallelism, same as both Kafka sources. It uses a Broadcast state, because the stream of records is not keyed, so every instance needs to know about every batch. Keying the stream by batch id would limit the parallelism to 1 for each batch. The parallelism should be less than or equal to the number of Kafka partitions for the `*.in` topic.

Expand All @@ -23,19 +23,19 @@ When the validation processor attempts to get a batch by batchId or tenantId, if
### Tracker
The Tracker keeps track of every batch's progress and calls the HRI Management API when it is complete or fails. Batches are failed if the number of invalid records reaches the threshold, which is set per batch, or if more records are received then expected. Batches are completed once all the records are received and after a configurable delay. The Batch completion delay is used to wait for any additional records, which would fail the batch. Once the batch is completed, additional records are sent to the invalid topic and do not affect the status of the batch.

Because both Batch notification messages and Validation valid count messages could trigger a Batch status change, both of these input streams have to be keyed by the Batch ID. This ensures that all the messages for a given batch are always processed by the same Tracker instance, which enables it to store the total count and Batch notification together.
Because both Batch notification messages and Validation valid count messages could trigger a Batch status change, both of these input streams have to be keyed by the Batch ID. This ensures that all the messages for a given batch are always processed by the same Tracker instance, which enables it to store the total count and Batch notification together.

This however does limit the parallelism of the Tracker. Every Batch can only be processed by one Tracker instance. We don't believe this will be a significant performance/throughput bottleneck since the computation is very minimal. Currently, the Tracker's parallelism is set to one quarter of the job's parallelism.

### Output/Sinks
Records that pass validation are written to `*.out` topic. For invalid records, an Invalid record is written to the `*.invalid` topic. The Invalid records contains the error message and a pointer (topic, partition, offset) to the original record, and is defined in the `hri-api-spec` [repo](https://github.com/Alvearie/hri-api-spec/blob/main/notifications/invalidRecord.json). Once a Batch is failed or terminated, the Validation processor stops forwarding valid or invalid records down stream.

The Tracker uses a HRI Management API sink to complete or fail a Batch, which is described above. The sink calls the Batch `processingComplete` or `fail` endpoints respectively. Note that the HRI Management API will write a new Batch notification reflecting the status change to the `*.notification` topic. This creates an external loop in our flow, but is necessary for the Validation processor to pick up these changes. The Tracker keeps additional state about whether it has completed or failed a batch to prevent infinite loops.
The Tracker uses a HRI Management API sink to complete or fail a Batch, which is described above. The sink calls the Batch `processingComplete` or `fail` endpoints respectively. Note that the HRI Management API will write a new Batch notification reflecting the status change to the `*.notification` topic. This creates an external loop in our flow, but is necessary for the Validation processor to pick up these changes. The Tracker keeps additional state about whether it has completed or failed a batch to prevent infinite loops.

If, when updating a batch as either `processingComplete` or `fail`, the returned HTTP status code is a `409` (Conflict), indicating that the batch is already in the desired state, a warning message will be logged but no other errors will be raised. For all other HTTP status codes in the `400`s (e.g. `400-Bad Request`, `401-Unauthorized`) returned, the attempt to update the status of the batch will immediately fail. This scenario indicates something is wrong with the HTTP requests, and there is little point in retrying. For any other HTTP error status code, the HRI Management API sink will retry the status update for 24 hours, with an exponentially increasing amount of time between each attempt (with a maximum wait time of 5 minutes.)

### Testing
There are three constructors for the Base Validation Job class, one for production and two for testing purposes. One test constructor takes mock sources and sinks for end-to-end unit tests of the entire job. Any changes to the job will require updating and or adding additional end-to-end unit tests to `ValidationJobIntTest.scala`. The other test constructor is for stand-alone integration testing without the HRI Management API. Instead of using the HRI Management API sink, it uses a Kafka sink that writes Batch notification messages directly to the `*.notification` topic. This enables testing error scenarios that are difficult to simulate with the HRI Management API.
There are three constructors for the Base Validation Job class, one for production and two for testing purposes. One test constructor takes mock sources and sinks for end-to-end unit tests of the entire job. Any changes to the job will require updating and or adding additional end-to-end unit tests to `ValidationJobIntTest.scala`. The other test constructor is for stand-alone integration testing without the HRI Management API. Instead of using the HRI Management API sink, it uses a Kafka sink that writes Batch notification messages directly to the `*.notification` topic. This enables testing error scenarios that are difficult to simulate with the HRI Management API.

## Integration Testing hri-flink-pipeline-core
In order to fully test your branch of hri-flink-pipeline-core, you must create a branch of a hri-flink-validation repo, set it to point to your branch and run the test on that branch as well.
Expand Down Expand Up @@ -68,7 +68,7 @@ Next, these are the steps to actively test your work in hri-flink-pipeline-core
After you pass all the tests, create a PR, have the PR approved, merge/rebase this branch back into develop, and then delete this branch... The very last step is to delete the hri-flink-validation branch you made.

## Releases
Releases are created by creating Git tags, which trigger a GitHub Actions build that publishes a release version in GitHub packages, see [Overall strategy](https://github.com/Alvearie/HRI/wiki/Overall-Project-Branching,-Test,-and-Release-Strategy) for more details.
Releases are created by creating Git tags, which trigger a GitHub Actions build that publishes a release version in GitHub packages.

## Contribution Guide
Please read [CONTRIBUTING.md](CONTRIBUTING.md) for details on our code of conduct, and the process for submitting pull requests to us.
Expand Down
Loading

0 comments on commit 031d380

Please sign in to comment.