Skip to content

Commit

Permalink
Use fifo queue to receive Telegram webhook
Browse files Browse the repository at this point in the history
  • Loading branch information
daohoangson committed Nov 12, 2023
1 parent 4bb2fcf commit 22a332c
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 41 deletions.
40 changes: 24 additions & 16 deletions packages/functions/src/events/telegram-webhook.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
import { EventBridgeEvent, Handler } from "aws-lambda";
import { Handler, SQSEvent, SQSRecord } from "aws-lambda";
import { Config } from "sst/node/config";

import {
HandleTelegramWebhookInput,
handleTelegramWebhook,
kv,
} from "@bubby/core/3rdparty";
import { handleTelegramWebhook, kv } from "@bubby/core/3rdparty";
import { replyTextChat } from "@bubby/core/handlers";

export const handler: Handler<
EventBridgeEvent<"telegram.webhook", HandleTelegramWebhookInput["body"]>
> = async (event) => {
const secretToken = Config.TELEGRAM_WEBHOOK_SECRET_TOKEN;
if (event.source !== secretToken) {
console.error("event.source !== secretToken", { event });
export const handler: Handler<SQSEvent> = async ({ Records }) => {
for (const record of Records) {
await recordHandler(record);
}
};

async function recordHandler(record: SQSRecord) {
const expectedSecretToken = Config.TELEGRAM_WEBHOOK_SECRET_TOKEN;
const actualSecretToken =
record.messageAttributes["XTelegramBotApiSecretToken"];
if (
typeof actualSecretToken !== "object" ||
actualSecretToken.stringValue !== expectedSecretToken
) {
console.warn("Unrecognized secret token", {
record: JSON.stringify(actualSecretToken),
});
return;
}

console.log(JSON.stringify(event.detail, null, 2));
handleTelegramWebhook({
body: event.detail,
const body = JSON.parse(record.body);
console.log(JSON.stringify(body, null, 2));
await handleTelegramWebhook({
body,
onText: (chat) => replyTextChat({ chat, kv }),
});
};
}
65 changes: 40 additions & 25 deletions stacks/MyStack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ import {
ParameterMapping,
MappingValue,
} from "@aws-cdk/aws-apigatewayv2-alpha";
import { Duration } from "aws-cdk-lib";
import {
StackContext,
Api,
EventBus,
Config,
Table,
FunctionProps,
Queue,
} from "sst/constructs";

export function API({ stack }: StackContext) {
Expand All @@ -27,11 +28,6 @@ export function API({ stack }: StackContext) {
TELEGRAM_WEBHOOK_SECRET_TOKEN,
];

const bus = new EventBus(stack, "bus", {
defaults: {
retries: 10,
},
});
const tableKeyValues = new Table(stack, "KeyValues", {
fields: {
ChannelId: "string",
Expand All @@ -41,10 +37,32 @@ export function API({ stack }: StackContext) {
primaryIndex: { partitionKey: "ChannelId", sortKey: "Key" },
});
const functionDefaults: FunctionProps = {
bind: [bus, ...envVars, tableKeyValues],
timeout: 300,
bind: [...envVars, tableKeyValues],
};

const telegramWebhookTimeout = 300;
const telegramWebhookQueue = new Queue(stack, "telegramWebhook", {
cdk: {
queue: {
contentBasedDeduplication: true,
fifo: true,
visibilityTimeout: Duration.seconds(telegramWebhookTimeout),
},
},
consumer: {
cdk: {
eventSource: {
batchSize: 1,
},
},
function: {
...functionDefaults,
handler: "packages/functions/src/events/telegram-webhook.handler",
timeout: telegramWebhookTimeout,
},
},
});

const api = new Api(stack, "api", {
defaults: {
function: {
Expand All @@ -56,32 +74,29 @@ export function API({ stack }: StackContext) {
type: "aws",
cdk: {
integration: {
subtype: HttpIntegrationSubtype.EVENTBRIDGE_PUT_EVENTS,
subtype: HttpIntegrationSubtype.SQS_SEND_MESSAGE,
parameterMapping: ParameterMapping.fromObject({
EventBusName: MappingValue.custom(bus.eventBusName),
DetailType: MappingValue.custom("telegram.webhook"),
Detail: MappingValue.custom("$request.body"),
Source: MappingValue.requestHeader(
"x-telegram-bot-api-secret-token"
MessageAttributes: MappingValue.custom(
JSON.stringify({
XTelegramBotApiSecretToken: {
DataType: "String",
StringValue:
"${request.header.x-telegram-bot-api-secret-token}",
},
})
),
MessageBody: MappingValue.custom("$request.body"),
MessageGroupId: MappingValue.custom(
"$request.body.message.chat.id"
),
QueueUrl: MappingValue.custom(telegramWebhookQueue.queueUrl),
}),
},
},
},
},
});

bus.subscribe(
"telegram.webhook",
{
...functionDefaults,
handler: "packages/functions/src/events/telegram-webhook.handler",
},
{
retries: 0,
}
);

stack.addOutputs({
ApiEndpoint: api.url,
});
Expand Down

0 comments on commit 22a332c

Please sign in to comment.