Skip to content

Commit 389b624

Browse files
committed
Add kafka messages consumer to the Framework
Add kafka messages consumer
1 parent d1328a2 commit 389b624

File tree

10 files changed

+180
-17
lines changed

10 files changed

+180
-17
lines changed

.github/workflows/proto-sync.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ jobs:
1919
run: |
2020
git clone https://github.com/AliceO2Group/Control.git aliecs
2121
cp aliecs/core/protos/o2control.proto Control/protobuf
22-
cp aliecs/common/protos/events.proto Control/protobuf/protos
23-
cp aliecs/common/protos/common.proto Control/protobuf/protos/protos
22+
cp aliecs/common/protos/events.proto Framework/Backend/protos
23+
cp aliecs/common/protos/common.proto Framework/Backend/protos/protos
2424
cp aliecs/apricot/protos/apricot.proto Control/protobuf/o2apricot.proto
2525
rm -rf aliecs
2626
- name: Check if there are any differences and create PR

Control/lib/control-core/GrpcProxy.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@
1010
* In applying this license CERN does not waive the privileges and immunities
1111
* granted to it by virtue of its status as an Intergovernmental Organization
1212
* or submit itself to any jurisdiction.
13-
*/
13+
*/
1414

1515
// Doc: https://grpc.io/docs/languages/node/
1616
const protoLoader = require('@grpc/proto-loader');
1717
const grpcLibrary = require('@grpc/grpc-js');
18+
const {getWebUiProtoIncludeDir} = require('@aliceo2/web-ui');
1819
const path = require('path');
1920
const {grpcErrorToNativeError} = require('./../errors/grpcErrorToNativeError.js');
2021
const {Status} = require(path.join(__dirname, './../../protobuf/status_pb.js'));
@@ -34,7 +35,10 @@ class GrpcProxy {
3435
*/
3536
constructor(config, path) {
3637
if (this._isConfigurationValid(config, path)) {
37-
const packageDefinition = protoLoader.loadSync(path, {longs: String, keepCase: false, arrays: true});
38+
const packageDefinition = protoLoader.loadSync(
39+
path,
40+
{longs: String, keepCase: false, arrays: true, includeDirs: [getWebUiProtoIncludeDir()]},
41+
);
3842
const octlProto = grpcLibrary.loadPackageDefinition(packageDefinition);
3943
const protoService = octlProto[this._package][this._label];
4044
const address = `${config.hostname}:${config.port}`;

Control/test/config/apricot-grpc.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const path = require('path');
1818
// Doc: https://grpc.io/grpc/node/grpc.html
1919
const protoLoader = require('@grpc/proto-loader');
2020
const grpcLibrary = require('@grpc/grpc-js');
21+
const {getWebUiProtoIncludeDir} = require('@aliceo2/web-ui');
2122

2223
const PROTO_PATH = path.join(__dirname, './../../protobuf/o2apricot.proto');
2324

@@ -28,7 +29,7 @@ const apricotGRPCServer = (config) => {
2829
let calls = {};
2930

3031
const server = new grpcLibrary.Server();
31-
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {keepCase: false}); // change to camel case
32+
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {keepCase: false, includeDirs: [getWebUiProtoIncludeDir()]}); // change to camel case
3233
const octlProto = grpcLibrary.loadPackageDefinition(packageDefinition);
3334
const credentials = grpcLibrary.ServerCredentials.createInsecure();
3435
const address = `${config.apricot.hostname}:${config.apricot.port}`;

Control/test/config/core-grpc.js

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@
1010
* In applying this license CERN does not waive the privileges and immunities
1111
* granted to it by virtue of its status as an Intergovernmental Organization
1212
* or submit itself to any jurisdiction.
13-
*/
13+
*/
1414
/* eslint-disable require-jsdoc */
1515

1616
const path = require('path');
1717

1818
// Doc: https://grpc.io/grpc/node/grpc.html
1919
const protoLoader = require('@grpc/proto-loader');
2020
const grpcLibrary = require('@grpc/grpc-js');
21+
const {getWebUiProtoIncludeDir} = require('@aliceo2/web-ui');
2122

2223
const PROTO_PATH = path.join(__dirname, './../../protobuf/o2control.proto');
2324

@@ -29,7 +30,10 @@ const coreGRPCServer = (config) => {
2930
let calls = {};
3031

3132
const server = new grpcLibrary.Server();
32-
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {keepCase: false});// change to camel case
33+
const packageDefinition = protoLoader.loadSync(
34+
PROTO_PATH,
35+
{keepCase: false, includeDirs: [getWebUiProtoIncludeDir()]},
36+
);// change to camel case
3337
const octlProto = grpcLibrary.loadPackageDefinition(packageDefinition);
3438
const credentials = grpcLibrary.ServerCredentials.createInsecure();
3539
const address = `${config.grpc.hostname}:${config.grpc.port}`;
@@ -43,7 +47,7 @@ const coreGRPCServer = (config) => {
4347
calls['getEnvironments'] = true;
4448
const responseData = {
4549
frameworkId: '74917838-27cb-414d-bfcd-7e74f85d4926-0000',
46-
environments: [envTest.environment]
50+
environments: [envTest.environment],
4751
};
4852
callback(null, responseData);
4953
},
@@ -130,37 +134,36 @@ const envTest = {
130134
mid_enabled: 'false',
131135
mid_something: 'test',
132136
dd_enabled: 'true',
133-
run_type: 'run'
137+
run_type: 'run',
134138
},
135139
vars: {
136140
odc_enabled: 'true',
137141
mid_enabled: 'false',
138142
mid_something: 'test',
139143
dd_enabled: 'true',
140-
run_type: 'run'
144+
run_type: 'run',
141145
},
142146
defaults: {
143147
dcs_topology: 'test',
144148
dd_enabled: 'true',
145-
run_type: 'run'
149+
run_type: 'run',
146150
},
147-
integratedServices: {
148-
}
151+
integratedServices: {},
149152
},
150153
workflow: {},
151154
workflowTemplates: {
152155
workflowTemplates: [
153156
{
154157
repo: 'git.cern.ch/some-user/some-repo/',
155-
template: 'prettyreadout-1', revision: 'master', description: 'something'
158+
template: 'prettyreadout-1', revision: 'master', description: 'something',
156159
},
157-
]
160+
],
158161
},
159162
listRepos: {
160163
repos: [
161164
{name: 'git.cern.ch/some-user/some-repo/', default: true, defaultRevision: 'dev', revisions: ['master', 'dev']},
162-
{name: 'git.com/alice-user/alice-repo/'}]
163-
}
165+
{name: 'git.com/alice-user/alice-repo/'}],
166+
},
164167
};
165168

166169
module.exports = {coreGRPCServer};

Framework/Backend/index.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const {
3737
updateAndSendExpressResponseFromNativeError,
3838
} = require('./errors/updateAndSendExpressResponseFromNativeError.js');
3939
const { Logger } = require('./log/Logger');
40+
const {getWebUiProtoIncludeDir} = require('./protobuf/getWebUiProtoIncludeDir');
4041

4142
exports.ConsulService = ConsulService;
4243

@@ -82,3 +83,5 @@ exports.UnauthorizedAccessError = UnauthorizedAccessError;
8283
exports.grpcErrorToNativeError = grpcErrorToNativeError;
8384

8485
exports.updateAndSendExpressResponseFromNativeError = updateAndSendExpressResponseFromNativeError;
86+
87+
exports.getWebUiProtoIncludeDir = getWebUiProtoIncludeDir;
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
8+
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
12+
*/
13+
const protobuf = require('protobufjs');
14+
const path = require('node:path');
15+
const { KafkaMessagesConsumer } = require('./KafkaMessagesConsumer.js');
16+
17+
const protoDir = path.resolve(__dirname, '../protos');
18+
const root = protobuf.loadSync(path.resolve(protoDir, 'events.proto'));
19+
const EventMessage = root.lookupType('events.Event');
20+
21+
/**
22+
* @callback MessageReceivedCallback
23+
* @param {EventMessage} message received message
24+
* @return {Promise<void>}
25+
*/
26+
27+
/**
28+
* Consumer that consume ECS event messages and pass them to previously-registered listeners
29+
*/
30+
class AliEcsEventMessagesConsumer extends KafkaMessagesConsumer {
31+
// eslint-disable-next-line valid-jsdoc
32+
/**
33+
* Constructor
34+
*
35+
* @param {import('kafkajs').Kafka} kafkaClient configured kafka client
36+
* @param {string} groupId the group id to use for the kafka consumer
37+
* @param {string[]} topics the list of topics to consume
38+
*/
39+
constructor(kafkaClient, groupId, topics) {
40+
super(kafkaClient, groupId, topics, EventMessage);
41+
}
42+
43+
// eslint-disable-next-line valid-jsdoc
44+
/**
45+
* @inheritDoc
46+
*/
47+
getLoggerLabel() {
48+
return 'ALI-ECS-EVENT-CONSUMER';
49+
}
50+
}
51+
52+
exports.AliEcsEventMessagesConsumer = AliEcsEventMessagesConsumer;
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
const { LogManager } = require('@aliceo2/web-ui');
2+
3+
/**
4+
* Generic Kafka Message consumer extracting objects according to a protobuf definition
5+
* @template T extends import('protobufjs').Type
6+
*/
7+
class KafkaMessagesConsumer {
8+
// eslint-disable-next-line valid-jsdoc
9+
/**
10+
* Constructor
11+
*
12+
* @param {import('kafkajs').Kafka} kafkaClient configured kafka client
13+
* @param {string} groupId the group id to use for the kafka consumer
14+
* @param {string[]} topics the list of topics to consume
15+
* @param {import('protobufjs').Type} protoType the type definition of the handled message
16+
*/
17+
constructor(kafkaClient, groupId, topics, protoType) {
18+
this.consumer = kafkaClient.consumer({ groupId });
19+
this._topics = topics;
20+
this._protoType = protoType;
21+
22+
/**
23+
* @type {MessageReceivedCallback[]}
24+
* @private
25+
*/
26+
this._listeners = [];
27+
28+
this._logger = LogManager.getLogger(this.getLoggerLabel());
29+
}
30+
31+
/**
32+
* Register a listener to listen on event message being received
33+
*
34+
* Listeners are called all at once, not waiting for completion before calling the next ones, only errors are caught and logged
35+
*
36+
* @param {MessageReceivedCallback} listener the listener to register
37+
* @return {void}
38+
*/
39+
onMessageReceived(listener) {
40+
this._listeners.push(listener);
41+
}
42+
43+
/**
44+
* Start the kafka consumer
45+
*
46+
* @return {Promise<void>} Resolves once the consumer started to consume messages
47+
*/
48+
async start() {
49+
this._logger.infoMessage(`Started to listen on kafka topic ${this._topics}`);
50+
await this.consumer.connect();
51+
await this.consumer.subscribe({ topics: this._topics });
52+
await this.consumer.run({
53+
eachMessage: async ({ message, topic }) => {
54+
const error = this._protoType.verify(message.value);
55+
if (error) {
56+
this._logger.errorMessage(`Received an invalid message on "${topic}" ${error}`);
57+
return;
58+
}
59+
this._logger.debugMessage(`Received message on ${topic}`);
60+
61+
try {
62+
await this._handleEvent(this._protoType.toObject(
63+
this._protoType.decode(message.value),
64+
{ enums: String },
65+
));
66+
} catch (error) {
67+
this._logger.errorMessage(`Failed to convert message to object on topic ${topic}: ${error}`);
68+
}
69+
},
70+
});
71+
}
72+
73+
/**
74+
* Call every registered listeners by passing the given message to it
75+
*
76+
* @param {T} message the message to pass to listeners
77+
* @return {void}
78+
*/
79+
async _handleEvent(message) {
80+
for (const listener of this._listeners) {
81+
try {
82+
await listener(message);
83+
} catch (error) {
84+
this._logger.errorMessage(`An error occurred when handling event: ${error.message}\n${error.stack}`);
85+
}
86+
}
87+
}
88+
89+
/**
90+
* Return the label to be used by the logger
91+
*
92+
* @return {string} the logger label
93+
*/
94+
getLoggerLabel() {
95+
return 'EVENT-CONSUMER';
96+
}
97+
}
98+
99+
exports.KafkaMessagesConsumer = KafkaMessagesConsumer;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
exports.getWebUiProtoIncludeDir = () => __dirname;

0 commit comments

Comments
 (0)