diff --git a/connectors/migrations/20250102_clean_zendesk_tickets_articles.ts b/connectors/migrations/20250102_clean_zendesk_tickets_articles.ts new file mode 100644 index 000000000000..f78cbf2fa624 --- /dev/null +++ b/connectors/migrations/20250102_clean_zendesk_tickets_articles.ts @@ -0,0 +1,151 @@ +import { makeScript } from "scripts/helpers"; +import { Op } from "sequelize"; + +import { + getArticleInternalId, + getTicketInternalId, +} from "@connectors/connectors/zendesk/lib/id_conversions"; +import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; +import { concurrentExecutor } from "@connectors/lib/async_utils"; +import { deleteDataSourceDocument } from "@connectors/lib/data_sources"; +import { ZendeskArticle, ZendeskTicket } from "@connectors/lib/models/zendesk"; +import type Logger from "@connectors/logger/logger"; +import { ConnectorResource } from "@connectors/resources/connector_resource"; + +const QUERY_BATCH_SIZE = 256; +const DOCUMENT_CONCURRENCY = 16; + +async function cleanTickets( + connector: ConnectorResource, + logger: typeof Logger, + execute: boolean +) { + let idCursor: number = 0; + let tickets: ZendeskTicket[] = []; + do { + tickets = await ZendeskTicket.findAll({ + where: { + connectorId: connector.id, + id: { [Op.gt]: idCursor }, + }, + limit: QUERY_BATCH_SIZE, + order: [["id", "ASC"]], + }); + + if (execute) { + await concurrentExecutor( + tickets, + async (ticket) => { + return deleteDataSourceDocument( + dataSourceConfigFromConnector(connector), + // this is the old internal ID + getTicketInternalId({ + connectorId: connector.id, + ticketId: ticket.ticketId, + }) + ); + }, + { concurrency: DOCUMENT_CONCURRENCY } + ); + logger.info( + `LIVE: ${tickets[tickets.length - 1]?.id} >= id > ${idCursor}` + ); + } else { + logger.info( + `DRY: ${tickets[tickets.length - 1]?.id} >= id > ${idCursor}` + ); + } + + if (tickets.length > 0) { + const lastTicket = tickets[tickets.length - 1]; + if (lastTicket) { + idCursor = lastTicket.id; + } + } + } while (tickets.length === QUERY_BATCH_SIZE); +} + +async function cleanArticles( + connector: ConnectorResource, + logger: typeof Logger, + execute: boolean +) { + let idCursor: number = 0; + let articles: ZendeskArticle[] = []; + do { + articles = await ZendeskArticle.findAll({ + where: { + connectorId: connector.id, + id: { [Op.gt]: idCursor }, + }, + limit: QUERY_BATCH_SIZE, + order: [["id", "ASC"]], + }); + + if (execute) { + await concurrentExecutor( + articles, + async (article) => { + return deleteDataSourceDocument( + dataSourceConfigFromConnector(connector), + // this is the old internal ID + getArticleInternalId({ + connectorId: connector.id, + articleId: article.articleId, + }) + ); + }, + { concurrency: DOCUMENT_CONCURRENCY } + ); + logger.info( + `LIVE: ${articles[articles.length - 1]?.id} >= id > ${idCursor}` + ); + } else { + logger.info( + `DRY: ${articles[articles.length - 1]?.id} >= id > ${idCursor}` + ); + } + + if (articles.length > 0) { + const lastTicket = articles[articles.length - 1]; + if (lastTicket) { + idCursor = lastTicket.id; + } + } + } while (articles.length === QUERY_BATCH_SIZE); +} + +makeScript( + { resourceType: { type: "string", choices: ["tickets", "articles"] } }, + async ({ execute, resourceType }, logger) => { + const connectors = await ConnectorResource.listByType("zendesk", {}); + + switch (resourceType) { + case "tickets": { + for (const connector of connectors) { + logger.info({ connectorId: connector.id }, `MIGRATING`); + await cleanTickets( + connector, + logger.child({ connectorId: connector.id }), + execute + ); + } + break; + } + case "articles": { + for (const connector of connectors) { + logger.info({ connectorId: connector.id }, `MIGRATING`); + await cleanArticles( + connector, + logger.child({ connectorId: connector.id }), + execute + ); + } + break; + } + default: { + throw new Error(`Invalid resource type: ${resourceType}`); + } + } + } +); diff --git a/connectors/src/connectors/zendesk/lib/id_conversions.ts b/connectors/src/connectors/zendesk/lib/id_conversions.ts index 611f321794b7..ce4b5b9feac8 100644 --- a/connectors/src/connectors/zendesk/lib/id_conversions.ts +++ b/connectors/src/connectors/zendesk/lib/id_conversions.ts @@ -47,6 +47,18 @@ export function getArticleInternalId({ return `zendesk-article-${connectorId}-${articleId}`; } +export function getArticleNewInternalId({ + connectorId, + brandId, + articleId, +}: { + connectorId: ModelId; + brandId: number; + articleId: number; +}): string { + return `zendesk-article-${connectorId}-${brandId}-${articleId}`; +} + export function getTicketsInternalId({ connectorId, brandId, @@ -67,6 +79,18 @@ export function getTicketInternalId({ return `zendesk-ticket-${connectorId}-${ticketId}`; } +export function getTicketNewInternalId({ + connectorId, + brandId, + ticketId, +}: { + connectorId: ModelId; + brandId: number; + ticketId: number; +}): string { + return `zendesk-ticket-${connectorId}-${brandId}-${ticketId}`; +} + /** * Conversion from an internalId to an id. */ diff --git a/connectors/src/connectors/zendesk/lib/sync_article.ts b/connectors/src/connectors/zendesk/lib/sync_article.ts index ed3d7ddabbf6..bd86eadd9593 100644 --- a/connectors/src/connectors/zendesk/lib/sync_article.ts +++ b/connectors/src/connectors/zendesk/lib/sync_article.ts @@ -6,7 +6,10 @@ import type { ZendeskFetchedSection, ZendeskFetchedUser, } from "@connectors/@types/node-zendesk"; -import { getArticleInternalId } from "@connectors/connectors/zendesk/lib/id_conversions"; +import { + getArticleInternalId, + getArticleNewInternalId, +} from "@connectors/connectors/zendesk/lib/id_conversions"; import { deleteDataSourceDocument, renderDocumentTitleAndContent, @@ -146,15 +149,27 @@ export async function syncArticle({ updatedAt, }); - const documentId = getArticleInternalId({ + const oldDocumentId = getArticleInternalId({ connectorId, articleId: article.id, }); + // TODO(2025-01-02 aubin): stop deleting old documents once the migration of internal IDs is done. + await deleteDataSourceDocument(dataSourceConfig, oldDocumentId, { + ...loggerArgs, + articleId: article.id, + }); + const parents = articleInDb.getParentInternalIds(connectorId); + const newDocumentId = getArticleNewInternalId({ + connectorId, + brandId: category.brandId, + articleId: article.id, + }); + await upsertDataSourceDocument({ dataSourceConfig, - documentId, + documentId: newDocumentId, documentContent, documentUrl: article.html_url, timestampMs: updatedAt.getTime(), @@ -163,7 +178,7 @@ export async function syncArticle({ `createdAt:${createdAt.getTime()}`, `updatedAt:${updatedAt.getTime()}`, ], - parents, + parents: [newDocumentId, ...parents.slice(1)], parentId: parents[1], loggerArgs: { ...loggerArgs, articleId: article.id }, upsertContext: { sync_type: "batch" }, @@ -171,6 +186,7 @@ export async function syncArticle({ mimeType: "application/vnd.dust.zendesk.article", async: true, }); + await articleInDb.update({ lastUpsertedTs: new Date(currentSyncDateMs) }); } else { logger.warn( diff --git a/connectors/src/connectors/zendesk/lib/sync_ticket.ts b/connectors/src/connectors/zendesk/lib/sync_ticket.ts index b626b53fd5c6..d51a5e154a17 100644 --- a/connectors/src/connectors/zendesk/lib/sync_ticket.ts +++ b/connectors/src/connectors/zendesk/lib/sync_ticket.ts @@ -6,7 +6,10 @@ import type { ZendeskFetchedTicketComment, ZendeskFetchedUser, } from "@connectors/@types/node-zendesk"; -import { getTicketInternalId } from "@connectors/connectors/zendesk/lib/id_conversions"; +import { + getTicketInternalId, + getTicketNewInternalId, +} from "@connectors/connectors/zendesk/lib/id_conversions"; import { deleteDataSourceDocument, renderDocumentTitleAndContent, @@ -204,15 +207,26 @@ ${comments updatedAt: updatedAtDate, }); - const documentId = getTicketInternalId({ + const oldDocumentId = getTicketInternalId({ connectorId, ticketId: ticket.id, }); + // TODO(2025-01-02 aubin): stop deleting old documents once the migration of internal IDs is done. + await deleteDataSourceDocument(dataSourceConfig, oldDocumentId, { + ...loggerArgs, + ticketId: ticket.id, + }); const parents = ticketInDb.getParentInternalIds(connectorId); + const newDocumentId = getTicketNewInternalId({ + connectorId, + brandId, + ticketId: ticket.id, + }); + await upsertDataSourceDocument({ dataSourceConfig, - documentId, + documentId: newDocumentId, documentContent, documentUrl: ticket.url, timestampMs: updatedAtDate.getTime(), @@ -222,7 +236,7 @@ ${comments `updatedAt:${updatedAtDate.getTime()}`, `createdAt:${createdAtDate.getTime()}`, ], - parents, + parents: [newDocumentId, ...parents.slice(1)], parentId: parents[1], loggerArgs: { ...loggerArgs, ticketId: ticket.id }, upsertContext: { sync_type: "batch" }, @@ -230,6 +244,7 @@ ${comments mimeType: "application/vnd.dust.zendesk.ticket", async: true, }); + await ticketInDb.update({ lastUpsertedTs: new Date(currentSyncDateMs) }); } else { logger.warn(