diff --git a/.gitignore b/.gitignore index 2f22ed6f..052bbdd6 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ src/config/*.env /dist /lib /node_modules +/lib #migration migrate.json diff --git a/src/functions/fetchMessages.ts b/src/functions/fetchMessages.ts index 49804d19..89185e59 100644 --- a/src/functions/fetchMessages.ts +++ b/src/functions/fetchMessages.ts @@ -66,7 +66,7 @@ async function getNeedDataFromMessage(message: Message, threadInfo?: threadInfo) channelName: threadInfo?.channelName ? threadInfo?.channelName : '', threadId: threadInfo?.threadId ? threadInfo?.threadId : null, threadName: threadInfo?.threadName ? threadInfo?.threadName : null, - isGeneratedByWebhook: message.webhookId ? true : false + isGeneratedByWebhook: message.webhookId ? true : false, }; } else { return { @@ -83,7 +83,7 @@ async function getNeedDataFromMessage(message: Message, threadInfo?: threadInfo) channelName: message.channel instanceof TextChannel ? message.channel.name : null, threadId: null, threadName: null, - isGeneratedByWebhook: message.webhookId ? true : false + isGeneratedByWebhook: message.webhookId ? true : false, }; } } @@ -169,10 +169,10 @@ async function fetchMessages( channelName: channel.parent?.name, }) : await pushMessagesToArray(connection, messagesToStore, [...fetchedMessages.values()]); + await rawInfoService.createRawInfos(connection, messagesToStore); options[fetchDirection] = boundaryMessage.id; fetchedMessages = await channel.messages.fetch(options); } - await rawInfoService.createRawInfos(connection, messagesToStore); } catch (err) { logger.error( { guild_id: connection.name, channel_id: channel.id, fetchDirection, err }, diff --git a/src/index.ts b/src/index.ts index 9de709e9..6ca7a0e0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -184,20 +184,19 @@ async function app() { host: config.redis.host, port: config.redis.port, password: config.redis.password, - }, + } }); queue.add('cronJob', {}, { repeat: { cron: '0 0 * * *', // Run once 00:00 UTC - // cron: '* * * * *', // Run every minute - // every: 10000 }, jobId: 'cronJob', // Optional: Provide a unique ID for the job - attempts: 1, // Number of times to retry the job if it fails + attempts: 0, // Number of times to retry the job if it fails backoff: { type: 'exponential', delay: 1000, // Initial delay between retries in milliseconds }, + } as never); // Create a worker to process the job @@ -216,6 +215,7 @@ async function app() { port: config.redis.port, password: config.redis.password, }, + lockDuration: 79200000, // 22 hours } ); diff --git a/src/migrations/db/1695210587863-add-isgeneratedbyweebhook-to-rawinfo-schema.ts b/src/migrations/db/1695210587863-add-isgeneratedbyweebhook-to-rawinfo-schema.ts index 9c4f7fb2..4dc290b5 100644 --- a/src/migrations/db/1695210587863-add-isgeneratedbyweebhook-to-rawinfo-schema.ts +++ b/src/migrations/db/1695210587863-add-isgeneratedbyweebhook-to-rawinfo-schema.ts @@ -6,19 +6,12 @@ import config from '../../config'; import webhookLogic from '../utils/webhookLogic'; import { DatabaseManager } from '@togethercrew.dev/db'; -const { - Guilds, - GuildMembers, - GuildMessages, - GuildPresences, - DirectMessages -} = GatewayIntentBits; - +const { Guilds, GuildMembers, GuildMessages, GuildPresences, DirectMessages } = GatewayIntentBits; export const up = async () => { - const client = new Client({ - intents: [Guilds, GuildMembers, GuildMessages, GuildPresences, DirectMessages], - }); + const client = new Client({ + intents: [Guilds, GuildMembers, GuildMessages, GuildPresences, DirectMessages], + }); await client.login(config.discord.botToken); await connectDB(); @@ -30,7 +23,5 @@ export const up = async () => { }; export const down = async () => { - // TODO: Implement rollback logic if needed + // TODO: Implement rollback logic if needed }; - - diff --git a/src/migrations/utils/template.ts b/src/migrations/utils/template.ts index 352e3fa8..7f34638e 100644 --- a/src/migrations/utils/template.ts +++ b/src/migrations/utils/template.ts @@ -10,6 +10,5 @@ export const up = async () => { }; export const down = async () => { - await connectDB() - -}; \ No newline at end of file + await connectDB(); +}; diff --git a/src/migrations/utils/webhookLogic.ts b/src/migrations/utils/webhookLogic.ts index 0e8e1784..c2621b8a 100644 --- a/src/migrations/utils/webhookLogic.ts +++ b/src/migrations/utils/webhookLogic.ts @@ -7,188 +7,198 @@ import { rawInfoService, channelService } from '../../database/services'; const logger = parentLogger.child({ module: 'Migration' }); interface FetchOptions { - limit: number; - before?: Snowflake; - after?: Snowflake; + limit: number; + before?: Snowflake; + after?: Snowflake; } async function fetchMessagesBetweenOldestAndNewest( - connection: Connection, - channel: TextChannel | ThreadChannel, - oldestRawInfo: IRawInfo, - newestRawInfo: IRawInfo + connection: Connection, + channel: TextChannel | ThreadChannel, + oldestRawInfo: IRawInfo, + newestRawInfo: IRawInfo ) { - try { - let allMessages: Message[] = []; - logger.info( - { guild_id: connection.name, channel_id: channel.id }, - 'Fetching channel messages is running' - ); - const options: FetchOptions = { limit: 100 }; - options.after = oldestRawInfo.messageId; - let fetchedMessages = await channel.messages.fetch(options); - while (fetchedMessages.size > 0) { - allMessages = allMessages.concat(Array.from(fetchedMessages.values())); - if (fetchedMessages.has(newestRawInfo.messageId)) { - break; - } - options.after = fetchedMessages.first()?.id; - fetchedMessages = await channel.messages.fetch(options); - } - return allMessages; - } catch (err) { - logger.error( - { guild_id: connection.name, channel_id: channel.id, err }, - 'Fetching channel messages failed' - ); + try { + let allMessages: Message[] = []; + logger.info({ guild_id: connection.name, channel_id: channel.id }, 'Fetching channel messages is running'); + const options: FetchOptions = { limit: 100 }; + options.after = oldestRawInfo.messageId; + let fetchedMessages = await channel.messages.fetch(options); + while (fetchedMessages.size > 0) { + allMessages = allMessages.concat(Array.from(fetchedMessages.values())); + if (fetchedMessages.has(newestRawInfo.messageId)) { + break; + } + options.after = fetchedMessages.first()?.id; + fetchedMessages = await channel.messages.fetch(options); } - logger.info( - { guild_id: connection.name, channel_id: channel.id }, - 'Fetching channel messages is done' - ); + return allMessages; + } catch (err) { + logger.error({ guild_id: connection.name, channel_id: channel.id, err }, 'Fetching channel messages failed'); + } + logger.info({ guild_id: connection.name, channel_id: channel.id }, 'Fetching channel messages is done'); } async function migrateIsGeneratedByWebhook(connection: Connection, channel: TextChannel) { - try { - logger.info({ guild_id: connection.name, channel_id: channel.id }, 'Migration for isGeneratedByWebhook is running'); - - // Fetch oldest rawInfo from DB - const oldestChannelRawInfo = await rawInfoService.getOldestRawInfo(connection, { - channelId: channel?.id, - threadId: null, - }); - - // Fetch newest rawInfo from DB - const newestChannelRawInfo = await rawInfoService.getNewestRawInfo(connection, { - channelId: channel?.id, - threadId: null, - }); - - if (!oldestChannelRawInfo || !newestChannelRawInfo) { - logger.info({ guild_id: connection.name, channel_id: channel.id }, 'No oldest rawInfo found, skipping migration'); - return; - } - - - - const fetchedMessages = await fetchMessagesBetweenOldestAndNewest(connection, channel, oldestChannelRawInfo, newestChannelRawInfo); - const messagesToUpdateTrue = []; - const messagesToUpdateFalse = []; - - const oldestMessage = await channel.messages.fetch(oldestChannelRawInfo.messageId); - const newestMessage = await channel.messages.fetch(newestChannelRawInfo.messageId); - + try { + logger.info({ guild_id: connection.name, channel_id: channel.id }, 'Migration for isGeneratedByWebhook is running'); + + // Fetch oldest rawInfo from DB + const oldestChannelRawInfo = await rawInfoService.getOldestRawInfo(connection, { + channelId: channel?.id, + threadId: null, + }); + + // Fetch newest rawInfo from DB + const newestChannelRawInfo = await rawInfoService.getNewestRawInfo(connection, { + channelId: channel?.id, + threadId: null, + }); + + if (!oldestChannelRawInfo || !newestChannelRawInfo) { + logger.info({ guild_id: connection.name, channel_id: channel.id }, 'No oldest rawInfo found, skipping migration'); + return; + } - if (oldestMessage.webhookId) messagesToUpdateTrue.push(oldestMessage.id); - else messagesToUpdateFalse.push(oldestMessage.id); + const fetchedMessages = await fetchMessagesBetweenOldestAndNewest( + connection, + channel, + oldestChannelRawInfo, + newestChannelRawInfo + ); + const messagesToUpdateTrue = []; + const messagesToUpdateFalse = []; - if (newestMessage.webhookId) messagesToUpdateTrue.push(newestMessage.id); - else messagesToUpdateFalse.push(newestMessage.id); + const oldestMessage = await channel.messages.fetch(oldestChannelRawInfo.messageId); + const newestMessage = await channel.messages.fetch(newestChannelRawInfo.messageId); - if (fetchedMessages) { - for (const message of fetchedMessages) { - if (message.webhookId) { - messagesToUpdateTrue.push(message.id); - } else { - messagesToUpdateFalse.push(message.id); - } - } + if (oldestMessage.webhookId) messagesToUpdateTrue.push(oldestMessage.id); + else messagesToUpdateFalse.push(oldestMessage.id); - } + if (newestMessage.webhookId) messagesToUpdateTrue.push(newestMessage.id); + else messagesToUpdateFalse.push(newestMessage.id); - if (messagesToUpdateTrue.length > 0) { - await rawInfoService.updateManyRawInfo(connection, { messageId: { $in: messagesToUpdateTrue } }, { isGeneratedByWebhook: true }); + if (fetchedMessages) { + for (const message of fetchedMessages) { + if (message.webhookId) { + messagesToUpdateTrue.push(message.id); + } else { + messagesToUpdateFalse.push(message.id); } + } + } - if (messagesToUpdateFalse.length > 0) { - - await rawInfoService.updateManyRawInfo(connection, { messageId: { $in: messagesToUpdateFalse } }, { isGeneratedByWebhook: false }); - } - - const threads = channel.threads.cache.values(); - - // Handle threads of the channel - for (const thread of threads) { - const oldestThreadRawInfo = await rawInfoService.getOldestRawInfo(connection, { - channelId: channel?.id, - threadId: thread.id, - }); - - const newestThreadRawInfo = await rawInfoService.getNewestRawInfo(connection, { - channelId: channel?.id, - threadId: thread.id, - }); - - if (!oldestThreadRawInfo || !newestThreadRawInfo) { - continue; // No data to migrate for this thread - } - - const fetchedThreadMessages = await fetchMessagesBetweenOldestAndNewest(connection, thread, oldestThreadRawInfo, newestThreadRawInfo); - - const threadMessagesToUpdateTrue = []; - const threadMessagesToUpdateFalse = []; - - - const oldestThreadMessage = await thread.messages.fetch(oldestThreadRawInfo.messageId); - const newestThreadMessage = await thread.messages.fetch(newestThreadRawInfo.messageId); - - if (oldestThreadMessage.webhookId) threadMessagesToUpdateTrue.push(oldestThreadMessage.id); - else threadMessagesToUpdateFalse.push(oldestThreadMessage.id); - - if (newestThreadMessage.webhookId) threadMessagesToUpdateTrue.push(newestThreadMessage.id); - else threadMessagesToUpdateFalse.push(newestThreadMessage.id); - - - - if (fetchedThreadMessages) { - for (const message of fetchedThreadMessages) { - if (message.webhookId) { - threadMessagesToUpdateTrue.push(message.id); - } else { - threadMessagesToUpdateFalse.push(message.id); - } - } - } + if (messagesToUpdateTrue.length > 0) { + await rawInfoService.updateManyRawInfo( + connection, + { messageId: { $in: messagesToUpdateTrue } }, + { isGeneratedByWebhook: true } + ); + } - if (threadMessagesToUpdateTrue.length > 0) { - await rawInfoService.updateManyRawInfo(connection, { messageId: { $in: threadMessagesToUpdateTrue } }, { isGeneratedByWebhook: true }); - } + if (messagesToUpdateFalse.length > 0) { + await rawInfoService.updateManyRawInfo( + connection, + { messageId: { $in: messagesToUpdateFalse } }, + { isGeneratedByWebhook: false } + ); + } - if (threadMessagesToUpdateFalse.length > 0) { - await rawInfoService.updateManyRawInfo(connection, { messageId: { $in: threadMessagesToUpdateFalse } }, { isGeneratedByWebhook: false }); - } + const threads = channel.threads.cache.values(); + + // Handle threads of the channel + for (const thread of threads) { + const oldestThreadRawInfo = await rawInfoService.getOldestRawInfo(connection, { + channelId: channel?.id, + threadId: thread.id, + }); + + const newestThreadRawInfo = await rawInfoService.getNewestRawInfo(connection, { + channelId: channel?.id, + threadId: thread.id, + }); + + if (!oldestThreadRawInfo || !newestThreadRawInfo) { + continue; // No data to migrate for this thread + } + + const fetchedThreadMessages = await fetchMessagesBetweenOldestAndNewest( + connection, + thread, + oldestThreadRawInfo, + newestThreadRawInfo + ); + + const threadMessagesToUpdateTrue = []; + const threadMessagesToUpdateFalse = []; + + const oldestThreadMessage = await thread.messages.fetch(oldestThreadRawInfo.messageId); + const newestThreadMessage = await thread.messages.fetch(newestThreadRawInfo.messageId); + + if (oldestThreadMessage.webhookId) threadMessagesToUpdateTrue.push(oldestThreadMessage.id); + else threadMessagesToUpdateFalse.push(oldestThreadMessage.id); + + if (newestThreadMessage.webhookId) threadMessagesToUpdateTrue.push(newestThreadMessage.id); + else threadMessagesToUpdateFalse.push(newestThreadMessage.id); + + if (fetchedThreadMessages) { + for (const message of fetchedThreadMessages) { + if (message.webhookId) { + threadMessagesToUpdateTrue.push(message.id); + } else { + threadMessagesToUpdateFalse.push(message.id); + } } + } - logger.info({ guild_id: connection.name, channel_id: channel.id }, 'Migration for isGeneratedByWebhook is done'); + if (threadMessagesToUpdateTrue.length > 0) { + await rawInfoService.updateManyRawInfo( + connection, + { messageId: { $in: threadMessagesToUpdateTrue } }, + { isGeneratedByWebhook: true } + ); + } - } catch (err) { - logger.error({ guild_id: connection.name, channel_id: channel.id, err }, 'Migration for isGeneratedByWebhook failed'); + if (threadMessagesToUpdateFalse.length > 0) { + await rawInfoService.updateManyRawInfo( + connection, + { messageId: { $in: threadMessagesToUpdateFalse } }, + { isGeneratedByWebhook: false } + ); + } } -} + logger.info({ guild_id: connection.name, channel_id: channel.id }, 'Migration for isGeneratedByWebhook is done'); + } catch (err) { + logger.error( + { guild_id: connection.name, channel_id: channel.id, err }, + 'Migration for isGeneratedByWebhook failed' + ); + } +} /** - * + * * @param {Connection} connection - Mongoose connection object for the database. * @param {Client} client - The discord.js client object used to fetch the guild. * @param {Snowflake} guildId - The identifier of the guild to extract information from. */ async function runRawInfoMigration(connection: Connection, client: Client, guildId: Snowflake) { - logger.info({ guild_id: guildId }, 'Migration is running'); - try { - const guild = await client.guilds.fetch(guildId); - const channels = await channelService.getChannels(connection, {}); - for (let i = 0; i < channels.length; i++) { - const channel = await guild.channels.fetch(channels[i].channelId); - if (channel) { - if (channel.type !== 0) continue; - await migrateIsGeneratedByWebhook(connection, channel); - } - } - } catch (err) { - logger.error({ guild_id: guildId, err }, 'Migration is failed'); + logger.info({ guild_id: guildId }, 'Migration is running'); + try { + const guild = await client.guilds.fetch(guildId); + const channels = await channelService.getChannels(connection, {}); + for (let i = 0; i < channels.length; i++) { + const channel = await guild.channels.fetch(channels[i].channelId); + if (channel) { + if (channel.type !== 0) continue; + await migrateIsGeneratedByWebhook(connection, channel); + } } - logger.info({ guild_id: guildId }, 'Migration is done'); + } catch (err) { + logger.error({ guild_id: guildId, err }, 'Migration is failed'); + } + logger.info({ guild_id: guildId }, 'Migration is done'); } -export default runRawInfoMigration; \ No newline at end of file +export default runRawInfoMigration;