-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.js
154 lines (110 loc) · 6.29 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// let { join } = require('path')
let _arcFunctions = require('@architect/functions')
let AWS = require('aws-sdk')
let defaultPollingInterval = 10000;
let retryCountRemaining = 3;
let successCount = 0;
let shardIds = [];
module.exports = {
// Sandbox
sandbox: {
// Startup operations
start: async ({ arc, inventory, invoke }) => {
// Run operations upon Sandbox startup
const pluginProperties = arc['sandbox-table-streams'];
if(pluginProperties.length === 0) {
console.log(`@hicksy/arc-plugin-sandbox-table-streams: Default polling interval is set to ${defaultPollingInterval}. To change, add a polling_interval property to the @sandbox-table-streams pragma in your arc manifest. eg. \n@sandbox-table-streams \npolling_interval 5000`)
}
for(prop of pluginProperties) {
if(prop[0] === 'polling_interval') {
defaultPollingInterval = prop[1];
console.log(`@hicksy/arc-plugin-sandbox-table-streams: Polling interval read from arc manifest. Now set to poll every ${defaultPollingInterval}.`)
}
}
// if(inventory.inv.aws.region !== 'ddblocal') {
// console.error('@hicksy/arc-plugin-sandbox-table-streams: AWS region not set to ddblocal. Plugin @hicksy/arc-plugin-sandbox-table-streams is only compatible with ddblocal. DynamoDBLocal streams will not invoke your table stream functions.')
// return;
// }
const client = await _arcFunctions.tables();
const dynamodb = new AWS.DynamoDB({region: inventory.inv.aws.region, endpoint: `http://localhost:${process.env.ARC_TABLES_PORT}`});
const dynamodb_streams = new AWS.DynamoDBStreams({region: inventory.inv.aws.region, endpoint: `http://localhost:${process.env.ARC_TABLES_PORT}`});
const waitFor = delay => new Promise(resolve => setTimeout(resolve, delay));
const readStreamShardData = async (shardIterator, tblName) => {
let data = await dynamodb_streams.getRecords({ShardIterator: shardIterator}).promise();
if(data.Records.length) {
invoke({
pragma: 'tables-streams',
name: tblName,
payload: data,
});
}
await waitFor(defaultPollingInterval);
if(data.NextShardIterator) {
successCount++;
if(successCount === 2) {
// only show polling success message after two successful returns of NextShardIterator - seems to be a delay in the stream being ready
console.log(`@hicksy/arc-plugin-sandbox-table-streams: Stream found for table ${tableStream.table}. Polling to invoke stream function.`)
retryCountRemaining = 3;
}
readStreamShardData(data.NextShardIterator, tblName);
} else {
console.log(`@hicksy/arc-plugin-sandbox-table-streams: Table ${tableStream.table} stream missing NextShardIterator. Will retry ${retryCountRemaining} more times.`)
retryCountRemaining--;
await waitFor(3000);
}
initLocalStreams(false);
}
const initLocalStreams = async (showInitLog = true) => {
if(!showInitLog) {
await waitFor(defaultPollingInterval);
}
if(inventory.inv['tables-streams']) {
for(tableStream of inventory.inv['tables-streams']) {
let generatedDynamoTableName = client.name(tableStream.table);
let shardSuccess = [];
try {
if(showInitLog) console.log(`@hicksy/arc-plugin-sandbox-table-streams: Attempting to connect to stream for table ${tableStream.table}`);
let tableMetaData = await dynamodb.describeTable({TableName: generatedDynamoTableName}).promise();
let streamMetaData = await dynamodb_streams.describeStream({StreamArn: tableMetaData.Table.LatestStreamArn}).promise();
for(shard of streamMetaData.StreamDescription.Shards) {
let shardIteratorData = await dynamodb_streams.getShardIterator({StreamArn: tableMetaData.Table.LatestStreamArn, ShardIteratorType: 'LATEST',ShardId: shard.ShardId}).promise();
if(shardIteratorData.ShardIterator) {
if(!shardIds.includes(shard.ShardId)) {
shardIds.push(shard.ShardId);
readStreamShardData(shardIteratorData.ShardIterator, tableStream.table);
} else {
//console.log('shard already polling')
}
shardSuccess.push(true)
} else {
shardSuccess.push(false)
}
}
if(shardSuccess.every(s => s !== true)) {
throw new Error('Sahrds awaiting init')
}
} catch(e) {
if(e.code === 'ResourceNotFoundException') {
console.log(`@hicksy/arc-plugin-sandbox-table-streams: Table ${tableStream.table} does not exist. Using DynamoDB Local requires you to create the dynamodb table yourself (including 'StreamSpecification' config).`)
}
if(e.message === 'Shards awaiting init' || (e.code === 'MissingRequiredParameter' && e.message === "Missing required key 'ShardIterator' in params")) {
if(retryCountRemaining > 0) {
console.log(`@hicksy/arc-plugin-sandbox-table-streams: Table ${tableStream.table} does not have a stream enabled, or table has not finished creating / seeding. Will retry ${retryCountRemaining} more times.`)
retryCountRemaining--;
await waitFor(3000);
initLocalStreams();
} else {
console.log(`@hicksy/arc-plugin-sandbox-table-streams: Table ${tableStream.table} does not have a stream enabled.`)
}
}
console.log(e)
}
}
} else {
console.error('@hicksy/arc-plugin-sandbox-table-streams: No @tables-streams pragma found in arc file. Plugin @hicksy/arc-plugin-sandbox-table-streams requires at least one @tables-streams pragma.')
}
}
initLocalStreams();
},
}
}