Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
rikukissa committed Sep 6, 2024
1 parent c3fb95e commit 406f03d
Show file tree
Hide file tree
Showing 50 changed files with 1,763 additions and 880 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ grafana
deploy_key
.lh
.vscode
TODO.md
8 changes: 5 additions & 3 deletions packages/commons/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"./authentication": "./build/dist/authentication.js",
"./http": "./build/dist/http.js",
"./fixtures": "./build/dist/fixtures.js",
"./assignment": "./build/dist/assignment.js"
"./assignment": "./build/dist/assignment.js",
"./message-queue": "./build/dist/message-queue/index.js"
},
"scripts": {
"start": "yarn build:watch",
Expand All @@ -33,17 +34,18 @@
"@types/lodash": "4.17.0",
"@types/node-fetch": "^2.5.12",
"@types/uuid": "^9.0.3",
"bullmq": "^5.12.13",
"date-fns": "^2.28.0",
"elastic-apm-node": "^3.29.0",
"jest": "27.5.1",
"jwt-decode": "^2.2.0",
"lint-staged": "^15.2.2",
"lodash": "^4.17.10",
"node-fetch": "^2.6.7",
"pino": "^7.0.0",
"pkg-up": "^3.1.0",
"typescript": "4.9.5",
"uuid": "^9.0.0",
"pino": "^7.0.0"
"uuid": "^9.0.0"
},
"devDependencies": {
"@typescript-eslint/eslint-plugin": "^4.5.0",
Expand Down
5 changes: 5 additions & 0 deletions packages/commons/src/authentication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ export function hasScope(authHeader: IAuthHeader, scope: Scope) {
return (tokenPayload.scope && tokenPayload.scope.indexOf(scope) > -1) || false
}

export function getScopes(token: string) {
const tokenPayload = getTokenPayload(token)
return tokenPayload.scope
}

export function inScope(authHeader: IAuthHeader, scopes: Scope[]) {
const matchedScope = scopes.find((scope) => hasScope(authHeader, scope))
return !!matchedScope
Expand Down
1 change: 1 addition & 0 deletions packages/commons/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ export * from './documents'
export * from './http'
export * from './logger'
export * from './search'
export * from './saga'
133 changes: 133 additions & 0 deletions packages/commons/src/message-queue/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* OpenCRVS is also distributed under the terms of the Civil Registration
* & Healthcare Disclaimer located at http://opencrvs.org/license.
*
* Copyright (C) The OpenCRVS Authors located at https://github.com/opencrvs/opencrvs-core/blob/master/AUTHORS.
*/

import { Processor, Queue, Worker } from 'bullmq'
import { Scope } from 'src/authentication'
import { logger } from '../logger'
import { isTask, SavedBundle } from '../types'

type MetricsJobPayload =
| {
action: 'sent-notification-for-review'
payload: {
bundle: SavedBundle
userScopes: Scope[]
}
}
| {
action: 'waiting-external-validation'
payload: {
bundle: SavedBundle
}
}
| {
action: 'sent-for-approval'
payload: {
bundle: SavedBundle
}
}
| {
action: 'sent-notification'
payload: {
bundle: SavedBundle
}
}

export function useMetricsQueue(redisHost: string) {
async function addMetric<T extends MetricsJobPayload['action']>(
action: T,
payload: Extract<MetricsJobPayload, { action: T }>['payload']
) {
const queue = new Queue<MetricsJobPayload>('metrics', {
connection: { host: redisHost, port: 6379 }
})

await queue.waitUntilReady()
await queue.add('metrics-event', { payload, action } as any, {
attempts: Number.MAX_SAFE_INTEGER,
backoff: {
type: 'exponential',
delay: 1000
}
})
await queue.close()
}
return addMetric
}

export async function registerMetricsWorker(
redisHost: string,
processJob: Processor<MetricsJobPayload>
) {
const worker = new Worker<MetricsJobPayload>('metrics', processJob, {
connection: { host: redisHost, port: 6379 }
})
worker.on('failed', (job, err) => {
logger.error(`Metrics worker error: ${err}`)
})
worker.on('error', (err) => {
logger.error(`Metrics worker error: ${err}`)
})
await worker.waitUntilReady()
return worker
}

type SearchIndexingJobPayload = {
bundle: SavedBundle
}

export function useSearchIndexingQueue(redisHost: string) {
return async function indexRecord(bundle: SavedBundle) {
const queue = new Queue<SearchIndexingJobPayload>('search-indexing', {
connection: { host: redisHost, port: 6379 }
})

await queue.waitUntilReady()

console.log(
'indexing record',
bundle.entry
.map((entry) => entry.resource)
.filter(isTask)
.map((t) => t.businessStatus.coding)
)

await queue.add(
'index-record',
{ bundle },
{
attempts: Number.MAX_SAFE_INTEGER,
backoff: {
type: 'exponential',
delay: 1000
}
}
)
await queue.close()
}
}

export async function registerSearchIndexingWorker(
redisHost: string,
processJob: Processor<SearchIndexingJobPayload>
) {
const worker = new Worker<SearchIndexingJobPayload>(
'search-indexing',
processJob,
{
connection: { host: redisHost, port: 6379 }
}
)

await worker.waitUntilReady()

return worker
}
8 changes: 8 additions & 0 deletions packages/commons/src/record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ export type RegistrationStatus =
| 'VALIDATED'
| 'WAITING_VALIDATION'

/*
* Temporarily using this type to represent the action that can be taken on a record
* There is a conceptual difference however:
* - An action is a single event that can be taken on a record
* - Record state / registration status is the current state of the record, determined by the actions that have been taken on the record
*/
export type RecordAction = RegistrationStatus

export type StateIdenfitiers = {
IN_PROGRESS: InProgressRecord
READY_FOR_REVIEW: ReadyForReviewRecord
Expand Down
50 changes: 50 additions & 0 deletions packages/commons/src/saga/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { logger } from '../logger'

export type RunFunction = (sideEffect: SideEffect<any>) => Promise<any>
type Saga<R> = (run: <T>(effect: SideEffect<T>) => Promise<T>) => Promise<R>

export async function saga<R>(callback: Saga<R>): Promise<R> {
const rollbacks: (() => Promise<any>)[] = []
async function run(sideEffect: SideEffect<any>) {
try {
logger.info(`Running side effect: ${sideEffect.label}`)
const result = await sideEffect.commit()
rollbacks.unshift(async () => {
logger.info(`Rolling back side effect: ${sideEffect.label}`)

try {
await sideEffect.rollback(result)
} catch (e) {
logger.error(
`Error during rollback: ${sideEffect.label}. Error: ${e.message}`
)
}
})
return result
} catch (e) {
for (const rollback of rollbacks) {
await rollback()
}
throw e
}
}
return callback(run)
}

type SideEffect<T> = {
commit: () => Promise<T>
rollback: (param: T) => Promise<any>
label?: string
}

export function effect<T>(
label: string,
commit: () => Promise<T>,
rollback: (param: T) => Promise<any>
): SideEffect<T> {
return {
commit,
rollback,
label
}
}
125 changes: 125 additions & 0 deletions packages/commons/src/saga/saga.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import { saga, effect } from '.'

test('executes invocations in order', async () => {
const result = await saga<number>(async (run) => {
const a = await run(
effect(
'',
() => Promise.resolve(1),
() => Promise.resolve()
)
)
const b = await run(
effect(
'',
() => Promise.resolve(a + 1),
() => Promise.resolve()
)
)
const c = await run(
effect(
'',
() => Promise.resolve(b + 1),
() => Promise.resolve()
)
)
return c
})
expect(result).toBe(3)
})

test('executes compensates in order if anything fails', async () => {
const compensateSpy = jest.fn()

const result = saga<number>(async (run) => {
const a = await run(
effect(
'',
() => Promise.resolve(1),
() => compensateSpy('compensate 1')
)
)
await run(
effect(
'',
() => Promise.resolve(a + 1),
() => compensateSpy('compensate 2')
)
)
const c = await run(
effect(
'',
() => Promise.reject(new Error('error')),
() => compensateSpy('compensate 3')
)
)

return c
})

await expect(result).rejects.toThrowError('error')
expect(compensateSpy).toHaveBeenCalledTimes(2)
expect(compensateSpy).toHaveBeenCalledWith('compensate 2')
expect(compensateSpy).toHaveBeenCalledWith('compensate 1')
})

test('executes all compensates even if one of them fails', async () => {
const compensateSpy = jest.fn()

const result = saga<number>(async (run) => {
const a = await run(
effect(
'',
() => Promise.resolve(1),
() => compensateSpy('compensate 1')
)
)
await run(
effect(
'',
() => Promise.resolve(a + 1),
() => Promise.reject(new Error('error'))
)
)
const c = await run(
effect(
'',
() => Promise.reject(new Error('error')),
() => compensateSpy('compensate 3')
)
)

return c
})

await expect(result).rejects.toThrowError('error')
expect(compensateSpy).toHaveBeenCalledTimes(1)
expect(compensateSpy).toHaveBeenCalledWith('compensate 1')
})
test('compensate handler receives the invoke return value as a parameter', async () => {
const compensateSpy = jest.fn()
const shouldNotBeCalled = jest.fn(() => {
throw new Error('This function should not have been called')
})
const result = saga<number>(async (run) => {
const a = await run(
effect(
'',
() => Promise.resolve(1),
() => compensateSpy('compensate 1')
)
)
await run(effect('', () => Promise.resolve(a + 1), compensateSpy))

const c = await run(
effect('', () => Promise.reject(new Error('error')), shouldNotBeCalled)
)

return c
})

await expect(result).rejects.toThrowError('error')
expect(compensateSpy).toHaveBeenCalledTimes(2)
expect(compensateSpy).toHaveBeenCalledWith(2)
expect(compensateSpy).toHaveBeenCalledWith('compensate 1')
})
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ export const resolvers: GQLResolver = {
throw new UserInputError(error.message)
}

return await createRegistration(details, EVENT_TYPE.BIRTH, authHeader)
return createRegistration(details, EVENT_TYPE.BIRTH, authHeader)
},
async createDeathRegistration(_, { details }, { headers: authHeader }) {
try {
Expand Down
Loading

0 comments on commit 406f03d

Please sign in to comment.