-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
141 lines (118 loc) · 4.88 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/*
(c) Jordi Cenzano 2018
*
* GCP Cloud function
*
* @param {!Object} req HTTP request context.
* @param {!Object} res HTTP response context.
*/
// Beacon source ID in BiqQuery (should be the same used in collector)
const enBeaconOriginFlag = {
CLIENT_SIDE: 'cs',
SERVER_SIDE: 'ss'
};
// Imports Google cloud BQ
const BigQuery = require('@google-cloud/bigquery');
// Imports the Google Cloud DS
const Datastore = require('@google-cloud/datastore');
exports.main = (req, res) => {
// Load env vars
const GCP_PROJECT_ID = process.env.GCP_PROJECT; // From GCP
const GCP_DATASET = process.env.GCP_DATASET;
const GCP_TABLE = process.env.GCP_TABLE;
const SHARED_INTERNAL_SECRET = process.env.SHARED_INTERNAL_SECRET;
const GCP_DS_KIND = process.env.GCP_DS_KIND;
const MAX_BEACON_DELAY_S = parseInt(process.env.MAX_BEACON_DELAY_S);
const CCU_AGGREGATION_PERIOD_MS = parseInt(process.env.CCU_AGGREGATION_PERIOD_MS);
console.log(`CONFIG, GCP_PROJECT_ID: ${GCP_PROJECT_ID}, GCP_DATASET:${GCP_DATASET}, GCP_TABLE:${GCP_TABLE}; SHARED_INTERNAL_SECRET: ${SHARED_INTERNAL_SECRET}; GCP_DS_KIND: ${GCP_DS_KIND}; MAX_BEACON_DELAY_S: ${MAX_BEACON_DELAY_S}; AGGREGATION_PERIOD_MS: ${CCU_AGGREGATION_PERIOD_MS}`);
// Check if the request is legit
if (SHARED_INTERNAL_SECRET !== req.get('x-api-key'))
return res.status(401).send(new Error ('Request NOT authorized'));
// Check time range
let time_in_ms = Date.now();
let time_out_ms = time_in_ms - (MAX_BEACON_DELAY_S * 1000);
// Ini big query
const bigquery = new BigQuery({projectId: GCP_PROJECT_ID});
// The SQL query to run
let sqlQuery = `SELECT TIMESTAMP_MILLIS(CAST(FLOOR(ts/${CCU_AGGREGATION_PERIOD_MS}) * ${CCU_AGGREGATION_PERIOD_MS} AS INT64)) AS time,
COUNT( distinct IF(source = '${enBeaconOriginFlag.SERVER_SIDE}', sessionid , NULL) ) AS ssccu,
COUNT( distinct IF(source = '${enBeaconOriginFlag.CLIENT_SIDE}', sessionid , NULL) ) AS csccu,
jobid AS jobid,
TIMESTAMP_MILLIS(MAX(rx_at)) AS last_rx_at
FROM ${GCP_DATASET}.${GCP_TABLE}
WHERE ts > ${time_out_ms} AND ts <= ${time_in_ms}
GROUP BY time, jobid ORDER BY time ASC`;
// Query options list: https://cloud.google.com/bigquery/docs/reference/v2/jobs/query
const options = {
query: sqlQuery,
useLegacySql: false, // Use standard SQL syntax for queries.
};
// Runs the query
bigquery
.query(options)
.then(results => {
try {
insertDataDataStore(GCP_PROJECT_ID, GCP_DS_KIND, results, function (err, num_inserted_records) {
if (err) {
res.status(500).send('ERROR DS: inserting entities in Datastore: ' + err.toString());
return;
}
res.status(200).send('DS Processed / inserted entities: ' + num_inserted_records);
return;
});
}
catch (err) {
res.status(500).send('ERROR DS: in Datastore: ' + err.toString());
return;
}
})
.catch(err => {
console.error('ERROR:', err);
res.status(500).send('ERROR BQ: ' + err);
return;
});
};
function calculateMinFromIni(date_str) {
// Return the epoch in minutes
return Math.floor(new Date(date_str).getTime() / (1000 * 60));
}
function createEntityFromBQObj(datastore, kind, data) {
const min_from_ini = calculateMinFromIni(data.time.value);
const keyName = data.jobid + '-' + min_from_ini;
const key = datastore.key([kind, keyName]);
const entity = {
key: key,
data: [
{ name: 'jobid', value: data.jobid },
{ name: 'time', value: new Date(data.time.value) },
{ name: 'ssccu', value: data.ssccu, excludeFromIndexes: true },
{ name: 'csccu', value: data.csccu, excludeFromIndexes: true },
{ name: 'last_rx_at', value: new Date(data.last_rx_at.value), excludeFromIndexes: true }
]
};
return entity;
}
function insertDataDataStore(projectId, ds_kind, results, callback) {
// Creates a client
const datastore = new Datastore({ projectId: projectId });
const entities_to_insert = [];
// Just in case iterate over the results
results.forEach(function(element) {
element.forEach(function(row) {
entities_to_insert.push(createEntityFromBQObj(datastore, ds_kind, row));
});
});
if (entities_to_insert.length <= 0) {
console.log('No entities to be inserted/updated');
return callback(null, 0);
}
//Insert the final data to the min agg table
datastore
.upsert(entities_to_insert)
.then(() => {
return callback(null, entities_to_insert.length);
})
.catch(err => {
return callback(err, 0);
});
}