Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a message queue between gateway and workflow service for improved reliability #7559

Closed
wants to merge 21 commits into from

Conversation

rikukissa
Copy link
Member

@rikukissa rikukissa commented Aug 30, 2024

To-do

  • Implement a new endpoint for country config to reject a registration (like confirm registration). The current synchronous flow where the /event-registration can return anything other than 2** status to indicate the request got accepted needs to be refactored. Endpoint needs to confirm the message was received and then call another endpoint in core to let core know if the record got rejected or accepted.

  • Allow configuring a port, username and password for Redis connection. Let services pass a full RedisOptions object inside useRecordQueue

  • If the services are restarted, the token expires in queue the same issue can happen if a message is in the queue for too long and the user token expires. Instead of us storing the token in the message queue payloads, let's instead store the initiating user's id and roll a new token in the code the message queue triggers.

  • Fix conflicts on this branch

  • Investigate the failing tests: should they be removed or rewritten?

    • Fix the tests

Dependencies

  • Hide all internal services behind one url #6279
    • This needs to be done so that we can implement an endpoint to the auth service to request a token for a given user id without the endpoint being exposed to public internet like we have all of them today.

Technical tests

  • How does Redis queue fair with large attachments?
  • How does both the queue and frontend behave when you create multiple drafts offline and then come back online

Tips

Run this to see a local GUI for your message queue items

docker run -p 3003:3000 -e REDIS_HOST=host.docker.internal  deadly0/bull-board

Copy link

Oops! Looks like you forgot to update the changelog. When updating CHANGELOG.md, please consider the following:

  • Changelog is read by country implementors who might not always be familiar with all technical details of OpenCRVS. Keep language high-level, user friendly and avoid technical references to internals.
  • Answer "What's new?", "Why was the change made?" and "Why should I care?" for each change.
  • If it's a breaking change, include a migration guide answering "What do I need to do to upgrade?".

@rikukissa rikukissa force-pushed the ocrvs-6208 branch 2 times, most recently from 89cfdbb to 406f03d Compare September 6, 2024 14:25
…e queue between gateway and workflow service
> & {
lastModified: string
status: 'ready' | 'requested' | 'draft' | 'accepted' | 'rejected'
extension: Array<Extension>
focus: {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a random type improvement for Task as it came up as part of the task. Don't think there is ever a case where a Task wouldn't reference a composition in focus.reference

* Copyright (C) The OpenCRVS Authors located at https://github.com/opencrvs/opencrvs-core/blob/master/AUTHORS.
*/

export * from './record'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is our new message queue module :) All posting to and listening from the queues should happen through this module in commons. This ensures that the type chain doesn't break in the code.

trackingId: string
isPotentiallyDuplicate: boolean
}>('POST', '/create-record', authHeader, { record, event })
if (inScope(authHeader, [USER_SCOPE.REGISTER])) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember how the "create record" handler is basically 3 operations in one? I split it and lifted the detection to this level. Ideally the client would call right graphql handler from the start

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice one 👍

@@ -25,6 +25,7 @@
"csv-stringify": "^5.3.4",
"csv-writer": "^1.6.0",
"date-fns": "^2.28.0",
"envalid": "^8.0.0",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added, rest are just reordered

method = 'GET',
body?: string
) {
const url = [fhirUrl.replace(/\/$/, ''), suffix].join('/')
return fetch(url, {
method,
headers: {
'Content-Type': 'application/fhir+json',
...authHeader
'Content-Type': 'application/fhir+json'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, here starts some not-very-major but many-in-numbers changes start. I removed the way we send auth tokens to Hearth as it doesn't use it. for anything. More of a risk than a benefit if nothing in the receiving end uses it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only token removals in this file.

const payload = request.payload as IUserAuditBody

const transactionId = payload.transactionId
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transaction IDs are used to determine if an operation has already been performed. A service can for instance check if there is already a database row with a certain ID. If yes, then it wont write it again

@@ -50,7 +55,8 @@ export async function newAuditHandler(
payload.additionalData
)
)
await writePoints(points)

await writePoints(points, transactionId)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writePoints function is now in charge of never writing duplicates even if the handler is called multiple times (given transaction id stays the same)

@@ -61,9 +61,7 @@ export async function monthlyExportHandler(
if (monthlyMetrics.genderBasisMetrics) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only token removals in this file

countRegistrarsByLocation,
totalOfficesInCountry,
fetchChildLocationsByParentId,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only token removals in this file

@@ -44,8 +44,7 @@ export async function locationWiseEventEstimationsHandler(
'x-correlation-id': request.headers['x-correlation-id']
}
const childLocations = await fetchChildLocationsByParentId(
locationId || 'Location/0',
authHeader
locationId || 'Location/0'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only token removals in this file

@@ -71,8 +71,7 @@ export async function metricsHandler(
const childLocationIds = await fetchChildLocationIdsByParentId(
request.query[LOCATION_ID],
currentLocationLevel,
lowerLocationLevel,
authHeader
lowerLocationLevel
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only token removals in this file

@@ -243,8 +243,7 @@ describe('verify metrics util', () => {
it('Returns the district location', async () => {
fetchLocation.mockResolvedValueOnce(location)
const result = await getDistrictLocation(
'b2b3ca8b-a14f-41c6-b97f-7cb99a1299e5',
{ Authorization: 'bearer token' }
'b2b3ca8b-a14f-41c6-b97f-7cb99a1299e5'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only token removals in this file

@@ -268,7 +268,7 @@ export const fetchEstimateForTargetDaysByLocationId = async (
timeTo: string
): Promise<IEstimation> => {
if (locationId) {
const locationData: Location = await fetchFHIR(locationId, authHeader)
const locationData: Location = await fetchFHIR(locationId)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only token removals in this file

@@ -8,7 +8,7 @@
*
* Copyright (C) The OpenCRVS Authors located at https://github.com/opencrvs/opencrvs-core/blob/master/AUTHORS.
*/
import { IAuthHeader } from '@metrics/features/registration'

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only token removals in this file

@@ -23,49 +23,53 @@ import {
generateRejectedPoints,
generateTimeLoggedPoint
} from '@metrics/features/registration/pointGenerator'
import { badRequest, internal } from '@hapi/boom'
import { populateBundleFromPayload } from '@metrics/features/registration/utils'
import { writePoints } from '@metrics/influxdb/client'
Copy link
Member Author

@rikukissa rikukissa Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this file you find a lot of restructuring the code. Notice also that I've changed the request payload type in many places from

body: fhir.Bundle
body: {
  record: ValidRecord
  transactionId: string
}

It's on purpose that I'm not using x-correlation-id as a transaction id. It wouldn't work in cases the same operation needs to be called twice during the same transaction. And conceptually it's a little different as well.

@@ -119,7 +120,7 @@ describe('Verify point generation', () => {
Date.prototype.toISOString = jest.fn(() => '2019-03-12T07:35:42.043Z')

const point = await generateBirthRegPoint(
cloneDeep(testPayload),
cloneDeep(testPayload) as any,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can promise you I tried to fix these types. It was just such a massive effort I had to let it go this time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥲

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could change these to the fixtures in commons so the type would be correct. Needs updating the tests quite a bit though

@@ -69,15 +68,6 @@ export const getRoutes = () => {
'Register event based on tracking id and registration number.'
}
},
{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this anymore as the trigger comes from BullMQ

export async function markEventAsRegisteredCallbackHandler(
request: Hapi.Request,
h: Hapi.ResponseToolkit
export async function markEventAsRegistered(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we land after country config confirms the registration is ok. I suggest first reading create.ts to follow the flow logically.

@@ -8,39 +8,50 @@
*
* Copyright (C) The OpenCRVS Authors located at https://github.com/opencrvs/opencrvs-core/blob/master/AUTHORS.
*/
import * as Hapi from '@hapi/hapi'

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are the most significant in this PR so please read extremely carefully, compare to the old implementation and make parts of the flows fail intentionally by adding for instance

if(Math.random() > 0.00005) {
  throw new Error('Random error!')
}

To see current status of BullMQ, open up the dashboard by running

docker run -p 3003:3000 -e BULLMQ_DASHBOARD=true taskforcesh/bullmq-dashboard

and then opening up http://localhost:3003

export default async function createRecordHandler(
request: Hapi.Request,
_: Hapi.ResponseToolkit
export async function declareRecordHandler(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function and the two other handlers validateRecordHandler and registerRecordHandler follow one simple rule: you can call these functions as many times you like with the same parameters and the end result is always the same. So call this function once or million times, you should always only see one record in the database. Idempotency like this is absolutely critical so that the message queue can submit the same record here twice.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about notifications?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean are the notification endpoints idempotent too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. I left them untouched for now as in many cases it's the last operation to be performed. We should make them idempotent though. I'll add it to the TODO list

RECORD_SEARCH = 'recordsearch',
VERIFY = 'verify'
}
import { PlainToken } from '@opencrvs/commons'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TokenWithBearer = explicit type for Bearer abc
PlainToken = explicit type for abc

@rikukissa rikukissa changed the title Implement saga pattern for better orchestration of operations spanning multiple services Implement a message queue between gateway and workflow service for improved reliability Sep 17, 2024
@@ -43,25 +43,6 @@ describe('Verify handlers', () => {
mockedsearchForDuplicates.mockReturnValue([])
})

it('should return status code 500 if invalid payload received', async () => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were causing stderr output in test runner, so I just removed the test. There shouldn't be any expected case where the server returns a 500 status

@ocrvs-bot
Copy link
Collaborator

Your environment is deployed to https://ocrvs-6208.opencrvs.dev.

@rikukissa rikukissa marked this pull request as ready for review September 19, 2024 14:08
Copy link
Collaborator

@Zangetsu101 Zangetsu101 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took me a while to get my head around the record-queue. But awesome stuff Riku!

Comment on lines 74 to 95
let nthTry = 0

while (true) {
const result = await client.query<FetchRecordStatusQuery>({
query: FETCH_RECORD_STATUS,
fetchPolicy: 'no-cache',
variables: {
draftId
}
})
if (result.data.fetchRecordStatus.__typename === 'RecordProcessed') {
return {
recordId: result.data.fetchRecordStatus.recordId,
trackingId: result.data.fetchRecordStatus.trackingId,
isPotentiallyDuplicate:
result.data.fetchRecordStatus.hasPotentialDuplicates
}
}

await new Promise((resolve) => setTimeout(resolve, 1000 + nthTry * 1000))
nthTry++
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say a field agent creates quite a few declarations offline and then comes back online at a later time. So won't there be quite a few pooling going on at the same time

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the best case it would go through instantly (as it does now) but definitely a thing worth testing

Comment on lines +41 to +51

export type PlainToken = Nominal<string, 'PlainToken'>
export type TokenWithBearer = Nominal<string, 'PlainToken'>

export function getToken(headers: IAuthHeader) {
return (headers.Authorization || '').replace('Bearer ', '') as PlainToken
}

export function toTokenWithBearer(token: string) {
return `Bearer ${token.replace('Bearer ', '')}` as TokenWithBearer
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one. This has been a headache in quite a few places

trackingId: string
isPotentiallyDuplicate: boolean
}>('POST', '/create-record', authHeader, { record, event })
if (inScope(authHeader, [USER_SCOPE.REGISTER])) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice one 👍


await createUserAuditPointFromFHIR('DECLARED', request)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused regarding the audit points as some of them are present while the others were removed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing all the "create-record" related ones were separated out?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zangetsu101 yea, I only changed the ones that were part of the create record flows. But in the long run, all endpoints should either create a record audit log item OR a user audit log item but never both

@@ -119,7 +120,7 @@ describe('Verify point generation', () => {
Date.prototype.toISOString = jest.fn(() => '2019-03-12T07:35:42.043Z')

const point = await generateBirthRegPoint(
cloneDeep(testPayload),
cloneDeep(testPayload) as any,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥲

export default async function createRecordHandler(
request: Hapi.Request,
_: Hapi.ResponseToolkit
export async function declareRecordHandler(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about notifications?

Comment on lines +528 to +530
'VALIDATED',
'DECLARED',
'WAITING_VALIDATION'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
'VALIDATED',
'DECLARED',
'WAITING_VALIDATION'
'READY_FOR_REVIEW',
'WAITING_VALIDATION'

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finding existing composition is for the idempotent part right? So the record should be either in READY_FOR_REVIEW or WAITING_VALIDATION state in this case if I am not wrong

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, exactly. One place that got me confused was this. From my understanding, IN_PROGRESS can never happen as the record cannot be incomplete. But it still leaves us with at least DECLARED. VALIDATED I can't remember where I got from anymore.

What do you think? Should we keep DECLARED? It might be that the await createRecord call below goes through but then one of the proceeding calls fail

Comment on lines +605 to +610
await createUserAuditEvent('REGISTERED', {
transactionId: transactionId,
compositionId: getComposition(record).id,
trackingId: getTrackingIdFromRecord(record),
isPotentiallyDuplicate: false
}
headers: getAuthorizationHeaderFromToken(token)
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be WAITING_EXTERNAL_VALIDATION instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zangetsu101 you are onto something. Or should it be nothing at all here? I was trying to find WAITING_EXTERNAL_VALIDATION from user audit events from the old implementation but couldn't.

Also a little bit of a conceptual question – does the user action initiation for "registered" happen already before the external validation?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, if we think of it like that we are tracking the user actions and the user is initiating the "register" aciton

Comment on lines +162 to +173
export function hasSameDuplicatesInExtension(
task: SavedTask,
duplicateIds: { id: string; trackingId: string }[]
) {
return task.extension?.some(
(ext) =>
ext.url === FLAGGED_AS_POTENTIAL_DUPLICATE &&
ext.valueString ===
duplicateIds.map((duplicate) => duplicate.trackingId).toString()
)
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the trackingIds guaranteed to be in the same order?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think so. That's the assumption I built this with

packages/commons/src/message-queue/record.ts Outdated Show resolved Hide resolved
@rikukissa
Copy link
Member Author

naftis and others added 2 commits October 10, 2024 10:22
…#7674)

* feat: allow confirming and rejecting record externally

* revert: no 'patch' needed after all

it would be the correct REST action for the state changes: chose convention for now

* fix: remove dummy definitions

* refactor: improve typing on graphql gateway supported patient identifiers

* refactor: use defined rejection types

* refactor(tests): use defined rejection types

* fix: add some missing rejection reasons

* fix: linter issues

* fix: named types not being included in the generated graphql query

* fix: REVERT THIS? remove trivy from build

* revert: typing work related to reasons

* refactor: stop using global interception in workflow tests

* chore: allow specifying the port to the queue

* feat: add testcontainers

* chore: add host&port to missing places

* refactor: add port and host to redis call

* chore: clean up testcontainers

* fix: use defined redis port in gateway

* Revert "refactor: stop using global interception in workflow tests"

This reverts commit 201cec7.

* revert: yarn.lock changes

* revert: typing efforts in client - add a todo comment to look into this

* Revert "fix: REVERT THIS? remove trivy from build"

This reverts commit 529d702.

---------

Co-authored-by: Riku Rouvila <riku.rouvila@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants