@@ -378,6 +378,11 @@ const metrics = {
378
378
help : 'Redis uptime in seconds'
379
379
} ) ,
380
380
381
+ redisPing : new promClient . Gauge ( {
382
+ name : 'redis_latency' ,
383
+ help : 'Redis latency in milliseconds'
384
+ } ) ,
385
+
381
386
redisRejectedConnectionsTotal : new promClient . Gauge ( {
382
387
name : 'redis_rejected_connections_total' ,
383
388
help : 'Number of connections rejected by Redis'
@@ -1212,6 +1217,64 @@ function checkUpgrade() {
1212
1217
} ) ;
1213
1218
}
1214
1219
1220
+ // measure Redis ping once in every 10 seconds
1221
+
1222
+ let redisPingCounter = [ ] ;
1223
+
1224
+ function getRedisPing ( ) {
1225
+ if ( redisPingCounter . length < 14 ) {
1226
+ return null ;
1227
+ }
1228
+ let entries = redisPingCounter
1229
+ . slice ( - 34 )
1230
+ . map ( entry => entry [ 1 ] )
1231
+ . sort ( ( a , b ) => a - b ) ;
1232
+
1233
+ // remove 2 highest and lowest
1234
+ entries . shift ( ) ;
1235
+ entries . shift ( ) ;
1236
+ entries . pop ( ) ;
1237
+ entries . pop ( ) ;
1238
+
1239
+ let sum = 0 ;
1240
+ for ( let entry of entries ) {
1241
+ sum += entry ;
1242
+ }
1243
+
1244
+ return Math . round ( sum / entries . length ) ;
1245
+ }
1246
+
1247
+ const REDIS_PING_TIMEOUT = 10 * 1000 ;
1248
+ let redisPingTimer = false ;
1249
+ const processRedisPing = async ( ) => {
1250
+ try {
1251
+ let startTime = Date . now ( ) ;
1252
+ await redis . ping ( ) ;
1253
+ let endTime = Date . now ( ) ;
1254
+ let duration = endTime - startTime ;
1255
+ redisPingCounter . push ( [ endTime , duration ] ) ;
1256
+ if ( redisPingCounter . length > 300 ) {
1257
+ redisPingCounter = redisPingCounter . slice ( 0 , 150 ) ;
1258
+ }
1259
+ return duration ;
1260
+ } catch ( err ) {
1261
+ logger . error ( { msg : 'Failed to run Redis ping' , err } ) ;
1262
+ }
1263
+ } ;
1264
+
1265
+ const redisPingHandler = async ( ) => {
1266
+ await processRedisPing ( ) ;
1267
+ redisPingTimer = setTimeout ( checkRedisPing , REDIS_PING_TIMEOUT ) ;
1268
+ redisPingTimer . unref ( ) ;
1269
+ } ;
1270
+
1271
+ function checkRedisPing ( ) {
1272
+ clearTimeout ( redisPingTimer ) ;
1273
+ redisPingHandler ( ) . catch ( err => {
1274
+ logger . error ( 'Failed to process Redis Ping' , err ) ;
1275
+ } ) ;
1276
+ }
1277
+
1215
1278
async function updateQueueCounters ( ) {
1216
1279
metrics . emailengineConfig . set ( { version : 'v' + packageData . version } , 1 ) ;
1217
1280
metrics . emailengineConfig . set ( { config : 'uvThreadpoolSize' } , Number ( process . env . UV_THREADPOOL_SIZE ) ) ;
@@ -1245,6 +1308,9 @@ async function updateQueueCounters() {
1245
1308
metrics . redisVersion . set ( { version : 'v' + redisInfo . redis_version } , 1 ) ;
1246
1309
1247
1310
metrics . redisUptimeInSeconds . set ( Number ( redisInfo . uptime_in_seconds ) || 0 ) ;
1311
+
1312
+ metrics . redisPing . set ( ( await processRedisPing ( ) ) || 0 ) ;
1313
+
1248
1314
metrics . redisRejectedConnectionsTotal . set ( Number ( redisInfo . rejected_connections ) || 0 ) ;
1249
1315
metrics . redisConfigMaxclients . set ( Number ( redisInfo . maxclients ) || 0 ) ;
1250
1316
metrics . redisConnectedClients . set ( Number ( redisInfo . connected_clients ) || 0 ) ;
@@ -1306,7 +1372,7 @@ async function onCommand(worker, message) {
1306
1372
}
1307
1373
}
1308
1374
1309
- return { connections } ;
1375
+ return { connections, redisPing : await getRedisPing ( ) } ;
1310
1376
}
1311
1377
1312
1378
case 'imapWorkerCount' : {
@@ -2176,6 +2242,9 @@ startApplication()
2176
2242
upgradeCheckTimer = setTimeout ( checkUpgrade , UPGRADE_CHECK_TIMEOUT ) ;
2177
2243
upgradeCheckTimer . unref ( ) ;
2178
2244
2245
+ redisPingTimer = setTimeout ( checkRedisPing , REDIS_PING_TIMEOUT ) ;
2246
+ redisPingTimer . unref ( ) ;
2247
+
2179
2248
queueEvents . notify = new QueueEvents ( 'notify' , Object . assign ( { } , queueConf ) ) ;
2180
2249
queueEvents . submit = new QueueEvents ( 'submit' , Object . assign ( { } , queueConf ) ) ;
2181
2250
queueEvents . documents = new QueueEvents ( 'documents' , Object . assign ( { } , queueConf ) ) ;
0 commit comments