Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/build.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 28 additions & 13 deletions .projen/tasks.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions .projenrc.js.bak
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ const project = new awscdk.AwsCdkConstructLibrary({
singleQuote: true,
},
},
jestOptions: {
jestConfig: {
moduleNameMapper: {
['^aws-cdk-lib/.warnings.jsii.js$']: '<rootDir>/node_modules/aws-cdk-lib/.warnings.jsii.js',
},
},
},

// deps: [], /* Runtime dependencies of this module. */
// description: undefined, /* The description is just a string that helps people understand the purpose of the package. */
Expand Down
11 changes: 0 additions & 11 deletions API.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

138 changes: 69 additions & 69 deletions common/SpyEventSender.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
import {
ApiGatewayManagementApi,
PostToConnectionCommand,
} from '@aws-sdk/client-apigatewaymanagementapi';
import {
AttributeValue,
DeleteItemCommand,
DynamoDBClient,
ScanCommand,
} from '@aws-sdk/client-dynamodb';

import { unmarshall } from '@aws-sdk/util-dynamodb';
import iot from 'aws-iot-device-sdk';
import {
DynamoDBStreamEvent,
S3Event,
SNSEvent,
EventBridgeEvent,
SQSEvent,
} from 'aws-lambda';
import { v4 } from 'uuid';
import {
fragment,
getConnection,
SSPY_TOPIC,
} from '../listener/iot-connection';
import { envVariableNames } from '../src/common/envVariableNames';
import { DynamoDBSpyEvent } from './spyEvents/DynamoDBSpyEvent';
import { EventBridgeRuleSpyEvent } from './spyEvents/EventBridgeRuleSpyEvent';
Expand All @@ -28,27 +24,32 @@ import { SpyMessage } from './spyEvents/SpyMessage';
import { SqsSpyEvent } from './spyEvents/SqsSpyEvent';

export class SpyEventSender {
ddb = new DynamoDBClient({
region: process.env.AWS_REGION,
});
debugMode = process.env[envVariableNames.SSPY_DEBUG] === 'true';
apigwManagementApi = new ApiGatewayManagementApi({
apiVersion: '2018-11-29',
endpoint: process.env[envVariableNames.SSPY_WS_ENDPOINT]!,
});
connections: Record<string, AttributeValue>[] | undefined;

constructor(params?: {
log: (message: string, ...optionalParams: any[]) => void;
logError: (message: string, ...optionalParams: any[]) => void;
connection: iot.device | undefined;
scope: string;

constructor(params: {
log?: (message: string, ...optionalParams: any[]) => void;
logError?: (message: string, ...optionalParams: any[]) => void;
scope: string;
}) {
if (params?.log) {
if (params.log) {
this.log = params.log;
}

if (params?.logError) {
if (params.logError) {
this.logError = params.logError;
}

this.scope = params.scope;
}

public async close() {
this.connection?.end();
}

public async connect() {
this.connection = await getConnection(this.debugMode);
}

public async publishSpyEvent(event: any) {
Expand All @@ -59,17 +60,6 @@ export class SpyEventSender {
);
this.log('ARN to names mapping', JSON.stringify(mapping));

let connectionData;

const scanParams = new ScanCommand({
TableName: process.env[envVariableNames.SSPY_WS_TABLE_NAME] as string,
ProjectionExpression: 'connectionId',
});

connectionData = await this.ddb.send(scanParams);

this.connections = connectionData.Items;

const postDataPromises: Promise<any>[] = [];

if (event?.Records && event.Records[0]?.Sns) {
Expand Down Expand Up @@ -227,45 +217,55 @@ export class SpyEventSender {
await Promise.all(postDataPromises);
}

private async postData(spyMessage: Omit<SpyMessage, 'timestamp'>) {
this.log('Post spy message', JSON.stringify(spyMessage));
private encode(input: any): fragment[] {
const payload = JSON.stringify(input);
const parts = payload.match(/.{1,50000}/g);
if (!parts) return [];
this.log(`Encoded iot message, ${parts.length}`);
const id = v4();
return parts.map((part, index) => ({
id,
index,
count: parts.length,
data: part,
}));
}

if (!this.connections) {
return;
private async postData(spyMessage: Omit<SpyMessage, 'timestamp'>) {
if (this.connection === undefined) {
throw new Error(
'No IoT connection created yet, did you forget to call connect()?'
);
}

const postCalls = this.connections.map(async ({ connectionId }) => {
this.log(`Sending message to client: ${connectionId.S}`);

try {
const postToConnectionCommand = new PostToConnectionCommand({
ConnectionId: connectionId.S,
Data: JSON.stringify({
timestamp: new Date().toISOString(),
serviceKey: spyMessage.serviceKey,
data: spyMessage.data,
}) as any,
});

await this.apigwManagementApi.send(postToConnectionCommand);
} catch (e) {
this.logError(`Faild sending spy message to: ${connectionId.S}`, e);
if ((e as any).$metadata.httpStatusCode === 410) {
this.log(`Found stale connection, deleting ${connectionId}`);

const deleteParams = new DeleteItemCommand({
TableName: process.env[envVariableNames.SSPY_WS_TABLE_NAME],
Key: { connectionId },
});
this.log('Post spy message', JSON.stringify(spyMessage));

await this.ddb.send(deleteParams);
} else {
throw e;
}
const connection = this.connection;
const topic = `${SSPY_TOPIC}/${this.scope}`;

try {
for (const fragment of this.encode(spyMessage)) {
await new Promise<void>((resolve) => {
connection.publish(
topic,
JSON.stringify(fragment),
{
qos: 1,
},
() => {
console.error('Publishing finished');
resolve();
}
);
});
this.log(
`Published fragment ${fragment.index} out of ${fragment.count} to topic ${topic}`
);
}
});
} catch (e) {
this.logError(`Failed to send payload to iot: ${e}`);
}

await Promise.all(postCalls);
this.log('Send spy message finish');
}

Expand Down
1 change: 1 addition & 0 deletions common/spyEvents/FunctionConsole.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export type FunctionConsole = {
type: 'log' | 'debug' | 'info' | 'error' | 'warn';
formattedMessage?: string;
message?: any;
optionalParams: any[];
};
6 changes: 6 additions & 0 deletions extension/interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ interceptConsole();
const spyEventSender = new SpyEventSender({
log,
logError,
scope: process.env['SSPY_ROOT_STACK']!,
});

// Wrap original handler.
Expand All @@ -41,6 +42,8 @@ export const handler = async (
context: Context,
callback: Callback
): Promise<any | undefined> => {
await spyEventSender.connect();

const contextSpy: FunctionContext = {
functionName: context.functionName,
awsRequestId: context.awsRequestId,
Expand Down Expand Up @@ -138,11 +141,14 @@ export const handler = async (
}
} catch (error) {
// Even if the original handler is not async, we return the promise as an async handler so we can send an error message
// eslint-disable-next-line @typescript-eslint/return-await
return new Promise((_, reject) =>
fail(error).then(() => {
reject(error);
})
);
} finally {
await spyEventSender.close();
}
};

Expand Down
Empty file added extensions/python/__init__.py
Empty file.
Loading