Skip to content

Commit ed7d079

Browse files
authored
Merge pull request #3740 from Countly/hotfix/push
Hotfix/push
2 parents e10f4e0 + f3715ab commit ed7d079

File tree

11 files changed

+162
-40
lines changed

11 files changed

+162
-40
lines changed

bin/scripts/fix-data/clear_push_duplicates.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ var Promise = require("bluebird");
55

66
var app_list = [];//add app ids, if none added will run on all apps
77

8-
var dry_run = true;
8+
var args = process.argv.slice(2).join('').replace(/\s/g, '');
9+
var dry_run = args.indexOf('--dryrun=false') === -1;
910

1011
console.log("Script clears push tokens if same token is for mutiple users. Keeping only for user with highest value for lac or ls");
1112
console.log("Can be called multiple times. On each run checks current state in database");
1213
if (dry_run) {
1314
console.log("This is dry run");
1415
console.log("Nothing will be cleared");
16+
console.log("Run with --dryrun=false in order to modify data in the database");
1517
}
1618

1719
function clearing_out(options, callback) {

bin/upgrade/DEV/scripts/push_hash.js

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
const pluginManager = require('../../../../plugins/pluginManager.js'),
2+
{ util } = require('../../../../plugins/push/api/send/std.js'),
3+
{ fields, platforms, PLATFORM, TK } = require('../../../../plugins/push/api/send/platforms');
4+
5+
pluginManager.dbConnection('countly').then(async(db) => {
6+
try {
7+
let apps = await db.collection('apps').find({}, {_id: 1}).toArray(),
8+
fields_appusers = fields(platforms, true);
9+
for (let app of apps) {
10+
console.log('Hashing tokens of app %s', app._id);
11+
12+
let $unset = {};
13+
fields_appusers.forEach(field => {
14+
$unset[field] = 1;
15+
});
16+
await db.collection(`app_users${app._id}`).updateMany({}, {$unset});
17+
console.log('Done unsetting bools');
18+
19+
let stream = db.collection(`push_${app._id}`).find({tk: {$exists: 1}}, {tk: 1}),
20+
batch = [],
21+
count = 0,
22+
add = async (op, flush) => {
23+
if (flush || (batch.length > 0 && batch.length >= 10000)) {
24+
if (op) {
25+
batch.push(op);
26+
}
27+
if (batch.length) {
28+
console.log('... updating %d-th record', count++);
29+
await db.collection(`app_users${app._id}`).bulkWrite(batch);
30+
batch = [];
31+
}
32+
else {
33+
console.log('... nothing to flush');
34+
}
35+
}
36+
else if (op) {
37+
batch.push(op);
38+
}
39+
};
40+
41+
for await (const doc of stream) {
42+
let $set = {},
43+
$setset = false;
44+
45+
for (const p in PLATFORM) {
46+
for (const n in PLATFORM[p].FIELDS) { // number
47+
const f = PLATFORM[p].FIELDS[n], // field key
48+
field = p + f;
49+
50+
if (doc.tk[field]) {
51+
$set[TK + field] = util.hashInt(doc.tk[field]);
52+
$setset = true;
53+
}
54+
}
55+
}
56+
57+
if ($setset) {
58+
await add({updateOne: {filter: {uid: doc._id}, update: {$set}}});
59+
}
60+
}
61+
62+
// flush the rest
63+
await add(null, true);
64+
console.log('Done setting ints for app %s', app._id);
65+
}
66+
}
67+
catch(e) {
68+
console.error(e);
69+
}
70+
db.close();
71+
});

plugins/push/api/api-auto.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,10 @@ module.exports.autoOnEvent = function(appId, uid, keys, events) {
175175

176176
keys = keys.filter((k, i) => keys.indexOf(k) === i);
177177

178+
if (!keys.length) {
179+
return;
180+
}
181+
178182
logEvents.d('Checking event keys %j', keys);
179183

180184
plugins.getPluginsApis().push.cache.iterate((k, msg) => {

plugins/push/api/api-dashboard.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,11 @@ module.exports.dashboard = async function(params) {
165165
platforms.forEach(p => {
166166
ptq.push({
167167
$or: fields([p], true).map(f => ({
168-
[f]: true
168+
[f]: {$exists: true}
169169
}))
170170
});
171171
any.$or.push(...fields([p], true).map(f => ({
172-
[f]: true
172+
[f]: {$exists: true}
173173
})));
174174
});
175175

plugins/push/api/api-push.js

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const common = require('../../../api/utils/common'),
66
module.exports.onTokenSession = async(dbAppUser, params) => {
77
let stuff = extract(params.qstring);
88
if (stuff) {
9-
let [p, f, token] = stuff,
9+
let [p, f, token, hash] = stuff,
1010
appusersField = field(p, f, true),
1111
pushField = field(p, f, false),
1212
pushCollection = common.db.collection(`push_${params.app_id}`),
@@ -16,13 +16,32 @@ module.exports.onTokenSession = async(dbAppUser, params) => {
1616

1717
let push = await pushCollection.findOne({_id: dbAppUser.uid}, {projection: {[field]: 1}});
1818
if (token && (!push || common.dot(push, pushField) !== token)) {
19-
let $set = {[appusersField]: true};
20-
// if (params.qstring.locale) {
21-
// $set[common.dbUserMap.locale] = params.qstring.locale;
22-
// dbAppUser[common.dbUserMap.locale] = params.qstring.locale;
23-
// }
24-
appusersCollection.updateOne({_id: params.app_user_id}, {$set}, () => {}); // don't wait
19+
appusersCollection.updateOne({_id: params.app_user_id}, {$set: {[appusersField]: hash}}, () => {}); // don't wait
2520
pushCollection.updateOne({_id: params.app_user.uid}, {$set: {[pushField]: token}}, {upsert: true}, () => {});
21+
22+
appusersCollection.find({[appusersField]: hash, _id: {$ne: dbAppUser._id}}, {uid: 1}).toArray(function(err, docs) {
23+
if (err) {
24+
log.e('Failed to look for same tokens', err);
25+
}
26+
else if (docs && docs.length) {
27+
log.d('Found %d hash duplicates for token %s', docs.length, token);
28+
// the hash is 32 bit, not enough randomness for strict decision to unset tokens, comparing actual token strings
29+
pushCollection.find({_id: {$in: docs.map(d => d.uid)}}, {[`tk.${p + f}`]: 1}).toArray(function(err2, pushes) {
30+
if (err2) {
31+
log.e('Failed to look for same tokens', err2);
32+
}
33+
else if (pushes && pushes.length) {
34+
pushes = pushes.filter(user => user._id !== dbAppUser.uid && user.tk[p + f] === token);
35+
if (pushes.length) {
36+
log.d('Unsetting same tokens (%s) for users %j', token, pushes.map(x => x._id));
37+
38+
appusersCollection.updateMany({uid: {$in: pushes.map(x => x._id)}}, {$unset: {[appusersField]: 1}}, () => {});
39+
pushCollection.updateOne({_id: {$in: pushes.map(x => x._id)}}, {$unset: {[pushField]: 1}}, () => {});
40+
}
41+
}
42+
});
43+
}
44+
});
2645
}
2746
else {
2847
appusersCollection.updateOne({_id: params.app_user_id}, {$unset: {[appusersField]: 1}}, function() {});

plugins/push/api/send/audience.js

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,7 @@ const common = require('../../../../api/utils/common'),
4343
else {
4444
return require('../../../pluginManager').getPluginsApis().geo;
4545
}
46-
},
47-
48-
/**
49-
* Cache of app objects for quick evented/cohorted/tx message mapping
50-
*/
51-
APPS = {};
46+
};
5247

5348
/**
5449
* Class encapsulating user selection / queue / message scheduling logic
@@ -74,14 +69,9 @@ class Audience {
7469
*/
7570
async getApp() {
7671
if (!this.app) {
77-
if (APPS[this.message.app]) {
78-
this.app = APPS[this.message.app];
79-
}
80-
else {
81-
this.app = APPS[this.message.app] = await common.db.collection('apps').findOne({_id: this.message.app});
82-
if (!this.app) {
83-
throw new PushError(`App ${this.message.app} not found`, ERROR.EXCEPTION);
84-
}
72+
this.app = await common.db.collection('apps').findOne({_id: this.message.app});
73+
if (!this.app) {
74+
throw new PushError(`App ${this.message.app} not found`, ERROR.EXCEPTION);
8575
}
8676
}
8777
return this.app;
@@ -160,7 +150,7 @@ class Audience {
160150
* @param {Object[]} steps aggregation steps array to add steps to
161151
*/
162152
async addFields(steps) {
163-
let flds = fields(this.platformsWithVirtuals(), true).map(f => ({[f]: true}));
153+
let flds = fields(this.platformsWithVirtuals(), true).map(f => ({[f]: {$exists: true}}));
164154
steps.push({$match: {$or: flds}});
165155
}
166156

@@ -391,11 +381,12 @@ class Mapper {
391381
* @param {object} user app_user object
392382
* @param {number} date notification date as ms timestamp
393383
* @param {object[]} c [Content.json] overrides
384+
* @param {int} offset rate limit offset
394385
* @returns {object} push object ready to be inserted
395386
*/
396-
map(user, date, c) {
387+
map(user, date, c, offset = 0) {
397388
let ret = {
398-
_id: dbext.oidWithDate(date),
389+
_id: dbext.oidWithDate(date + offset),
399390
a: this.message.app,
400391
m: this.message._id,
401392
p: this.p,
@@ -427,9 +418,10 @@ class PlainApiMapper extends Mapper {
427418
* @param {object} user app_user object
428419
* @param {Date} date notification date
429420
* @param {object[]} c [Content.json] overrides
421+
* @param {int} offset rate limit offset
430422
* @returns {object} push object ready to be inserted
431423
*/
432-
map(user, date, c) {
424+
map(user, date, c, offset = 0) {
433425
let d = date.getTime();
434426
if (this.trigger.tz) {
435427
let utz = (user.tz === undefined || user.tz === null ? this.offset || 0 : user.tz || 0) * 60000;
@@ -444,7 +436,7 @@ class PlainApiMapper extends Mapper {
444436
}
445437
}
446438
}
447-
return super.map(user, d, c);
439+
return super.map(user, d, c, offset);
448440
}
449441
}
450442

@@ -458,9 +450,10 @@ class CohortsEventsMapper extends Mapper {
458450
* @param {object} user app_user object
459451
* @param {Date} date reference date (cohort entry date, event date)
460452
* @param {object[]} c [Content.json] overrides
453+
* @param {int} offset rate limit offset
461454
* @returns {object} push object ready to be inserted
462455
*/
463-
map(user, date, c) {
456+
map(user, date, c, offset) {
464457
let d = date.getTime();
465458

466459
// send in user's timezone
@@ -509,7 +502,7 @@ class CohortsEventsMapper extends Mapper {
509502
return null;
510503
}
511504

512-
return super.map(user, d, c);
505+
return super.map(user, d, c, offset);
513506
}
514507
}
515508

@@ -666,7 +659,11 @@ class Pusher extends PusherPopper {
666659
start = this.start || this.trigger.start,
667660
result = new Result(),
668661
updates = {},
669-
virtuals = {};
662+
virtuals = {},
663+
offset = 0,
664+
curPeriod = 0,
665+
ratePeriod = (this.audience.app.plugins.push.rate || {}).period || 0,
666+
rateNumber = (this.audience.app.plugins.push.rate || {}).rate || 0;
670667

671668
for await (let user of stream) {
672669
let push = user[TK][0],
@@ -679,7 +676,14 @@ class Pusher extends PusherPopper {
679676
continue;
680677
}
681678

682-
let note = this.mappers[pf].map(user, start, this.contents);
679+
if (ratePeriod && rateNumber) {
680+
if ((curPeriod + 1) % rateNumber === 0) {
681+
offset += ratePeriod * 1000;
682+
}
683+
curPeriod++;
684+
}
685+
686+
let note = this.mappers[pf].map(user, start, this.contents, offset);
683687
if (!note) {
684688
continue;
685689
}

plugins/push/api/send/platforms/a.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ const virtuals = ['h'];
3333
*/
3434
function extractor(qstring) {
3535
if (qstring.android_token !== undefined && (!qstring.token_provider || qstring.token_provider === 'FCM')) {
36-
return [key, FIELDS['0'], qstring.android_token === 'BLACKLISTED' ? '' : qstring.android_token];
36+
const token = qstring.android_token === 'BLACKLISTED' ? '' : qstring.android_token;
37+
return [key, FIELDS['0'], token, util.hashInt(token)];
3738
}
3839
}
3940

plugins/push/api/send/platforms/h.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ const key = 'h';
2222
*/
2323
function extractor(qstring) {
2424
if (qstring.android_token !== undefined && (qstring.token_provider === 'HMS' || qstring.token_provider === 'HPK')) {
25-
return [key, FIELDS['0'], qstring.android_token === 'BLACKLISTED' ? '' : qstring.android_token];
25+
const token = qstring.android_token === 'BLACKLISTED' ? '' : qstring.android_token;
26+
return [key, FIELDS['0'], token, util.hashInt(token)];
2627
}
2728
}
2829

plugins/push/api/send/platforms/i.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
const { ConnectionError, PushError, SendError, ERROR, Creds, Template } = require('../data'),
2-
{ Base } = require('../std'),
2+
{ Base, util } = require('../std'),
33
FORGE = require('node-forge'),
44
logger = require('../../../../../api/utils/log'),
55
jwt = require('jsonwebtoken'),
@@ -21,7 +21,7 @@ const key = 'i';
2121
*/
2222
function extractor(qstring) {
2323
if (qstring.ios_token !== undefined && qstring.test_mode in FIELDS) {
24-
return [key, FIELDS[qstring.test_mode], qstring.ios_token];
24+
return [key, FIELDS[qstring.test_mode], qstring.ios_token, util.hashInt(qstring.ios_token)];
2525
}
2626
}
2727

plugins/push/api/send/std.js

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
const { Duplex } = require('stream'),
22
Measurement = require('./measure'),
3-
{ XXHash64 } = require('xxhash-addon'),
3+
{ XXHash64, XXHash32 } = require('xxhash-addon'),
44
// { getHasher, OutputType, HashType, hashAsBigInt} = require('bigint-hash'),
55
{ ERROR, PushError, SendError, ConnectionError, ValidationError, Message} = require('./data'),
66
{ FRAME, FRAME_NAME } = require('./proto');
77
// ,
88
// log = require('../../../../api/utils/log.js')('push:send:base');
99

10-
const xx64 = new XXHash64();
10+
const xx64 = new XXHash64(),
11+
xx32 = new XXHash32();
1112

1213
/**
1314
* Waits for given time
@@ -346,6 +347,21 @@ function hash(data, seed) {
346347
// }
347348
}
348349

350+
/**
351+
* Simple 32-bit hashing
352+
*
353+
* @param {string} string string to hash
354+
* @returns {integer} 32 bit integer hash of the string, 0 if string is empty or no string is supplied
355+
*/
356+
function hashInt(string) {
357+
if (typeof string === 'string' && string) {
358+
xx32.reset();
359+
xx32.update(Buffer.from(string, 'utf-8'));
360+
return xx32.digest().readIntBE(0, 4);
361+
}
362+
return 0;
363+
}
364+
349365

350366
/**
351367
* Flatten object using dot notation ({a: {b: 1}} becomes {'a.b': 1})
@@ -379,4 +395,4 @@ function flattenObject(ob) {
379395
}
380396

381397

382-
module.exports = { Base, util: {hash, wait, flattenObject}, Measurement, ERROR, PushError, SendError, ConnectionError, ValidationError };
398+
module.exports = { Base, util: {hash, hashInt, wait, flattenObject}, Measurement, ERROR, PushError, SendError, ConnectionError, ValidationError };

plugins/push/frontend/public/javascripts/countly.models.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1432,6 +1432,10 @@
14321432
endDate: null,
14331433
type: dto.info && dto.info.scheduled ? SendEnum.LATER : SendEnum.NOW,
14341434
};
1435+
// overwrite date with now() for send-now drafts
1436+
if (model.status === 'draft' && model.delivery.type === SendEnum.NOW) {
1437+
model.delivery.startDate = moment().valueOf();
1438+
}
14351439
model[TypeEnum.ONE_TIME].audienceSelection = triggerDto.delayed ? AudienceSelectionEnum.BEFORE : AudienceSelectionEnum.NOW;
14361440
model.timezone = triggerDto.tz ? TimezoneEnum.DEVICE : TimezoneEnum.SAME;
14371441
return model;

0 commit comments

Comments
 (0)