-
Notifications
You must be signed in to change notification settings - Fork 45
/
index.js
93 lines (86 loc) · 4.14 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/*
* Authored by Runpeng Liu,
* Brain Power (2018)
*/
const aws = require('aws-sdk');
aws.config.update({ region: process.env.AWS_REGION });
const kinesis = new aws.Kinesis();
// This function is triggered when new raw records are available
// from the output of Rekognition stream processor.
// For data specification, see: https://docs.aws.amazon.com/rekognition/latest/dg/streaming-video-kinesis-output.html
exports.handler = (event, context, callback) => {
const records = event.Records;
records.forEach((record) => {
// Kinesis data is base64 encoded so decode here
const payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
const data = JSON.parse(payload);
record.data = data;
});
// Filter for records that contain a detected face.
const faceRecords = records.filter((record) => {
return record.data.FaceSearchResponse && record.data.FaceSearchResponse.length;
});
if (faceRecords.length < 2) {
return callback(null, `Not enough records to process.`);
}
// keep data history needed for computations. Currently, one previous face per index.
const facesBuffer = {};
// Do post-processing on detected faces.
faceRecords.forEach((record, index) => {
// TODO: how do we track faces that may shift around in the array?
for (let faceIndex = 0; faceIndex < record.data.FaceSearchResponse.length; faceIndex++) {
const prev = (index == 0) ? 0 : index - 1;
const detectedFace = record.data.FaceSearchResponse[faceIndex].DetectedFace;
const prevFace = facesBuffer[faceIndex] || detectedFace;
detectedFace.RecordIndex = index;
processDetectedFace(detectedFace, prevFace, record.data.InputInformation.KinesisVideo);
facesBuffer[faceIndex] = detectedFace;
}
});
faceRecords.shift();
putRecordsIntoProcessedStream(faceRecords).then(() => {
const firstFace = faceRecords[0];
const lastFace = faceRecords[faceRecords.length - 1];
console.log(`Processed ${faceRecords.length} face records. Start: ${firstFace.data.FaceSearchResponse[0].DetectedFace.Timestamp}; End: ${lastFace.data.FaceSearchResponse[0].DetectedFace.Timestamp}`);
callback(null, `Processing complete.`);
}).catch(callback);
};
// Computes the position of face center based on BoundingBox data.
// Modify for custom use case.
function processDetectedFace(face, previousFace, inputInfo) {
const centerX = face.BoundingBox.Left + face.BoundingBox.Width / 2;
const centerY = face.BoundingBox.Top + face.BoundingBox.Height / 2;
face.BoundingBox.Center = [centerX, centerY];
face.Timestamp = Math.min(inputInfo.ProducerTimestamp + inputInfo.FrameOffsetInSeconds, inputInfo.ProducerTimestamp + face.RecordIndex);
// Estimate rotational and translational velocities
// of faces in successive frames using basic first-order derivative approximation.
const deltaTime = face.Timestamp - previousFace.Timestamp;
if (deltaTime === 0) return;
const deltaPosition = Math.sqrt(
(face.BoundingBox.Center[0] - previousFace.BoundingBox.Center[0]) ** 2 +
(face.BoundingBox.Center[1] - previousFace.BoundingBox.Center[1]) ** 2
);
const faceLength = Math.sqrt(face.BoundingBox.Height ** 2 + face.BoundingBox.Width ** 2);
face.TranslationalVelocity = (deltaPosition / faceLength) / deltaTime;
const deltaRotation = Math.sqrt(
(face.Pose.Pitch - previousFace.Pose.Pitch) ** 2 +
(face.Pose.Roll - previousFace.Pose.Roll) ** 2 +
(face.Pose.Yaw - previousFace.Pose.Yaw) ** 2
);
face.RotationalVelocity = deltaRotation / deltaTime;
}
// Put processed body motion metrics into downstream KDS
function putRecordsIntoProcessedStream(records) {
const packagedRecords = records.map((record) => {
return {
Data: JSON.stringify(record.data),
PartitionKey: 'shard-0'
};
});
return new Promise((resolve, reject) => {
kinesis.putRecords({
Records: packagedRecords,
StreamName: process.env.KDS_PROCESSED_STREAM_NAME
}, (err, data) => err ? reject(err) : resolve(data));
});
}