Skip to content

Commit

Permalink
fix(Zendesk) - upsert new document_id (#9704)
Browse files Browse the repository at this point in the history
* upsert documents x2 with both the old and new ID

* add clean script

* delete documents using the old IDs instead of upserting

* fix the TODO message
  • Loading branch information
aubin-tchoi authored Jan 2, 2025
1 parent de30a22 commit 4da155e
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 8 deletions.
151 changes: 151 additions & 0 deletions connectors/migrations/20250102_clean_zendesk_tickets_articles.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import { makeScript } from "scripts/helpers";
import { Op } from "sequelize";

import {
getArticleInternalId,
getTicketInternalId,
} from "@connectors/connectors/zendesk/lib/id_conversions";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { concurrentExecutor } from "@connectors/lib/async_utils";
import { deleteDataSourceDocument } from "@connectors/lib/data_sources";
import { ZendeskArticle, ZendeskTicket } from "@connectors/lib/models/zendesk";
import type Logger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";

const QUERY_BATCH_SIZE = 256;
const DOCUMENT_CONCURRENCY = 16;

async function cleanTickets(
connector: ConnectorResource,
logger: typeof Logger,
execute: boolean
) {
let idCursor: number = 0;
let tickets: ZendeskTicket[] = [];
do {
tickets = await ZendeskTicket.findAll({
where: {
connectorId: connector.id,
id: { [Op.gt]: idCursor },
},
limit: QUERY_BATCH_SIZE,
order: [["id", "ASC"]],
});

if (execute) {
await concurrentExecutor(
tickets,
async (ticket) => {
return deleteDataSourceDocument(
dataSourceConfigFromConnector(connector),
// this is the old internal ID
getTicketInternalId({
connectorId: connector.id,
ticketId: ticket.ticketId,
})
);
},
{ concurrency: DOCUMENT_CONCURRENCY }
);
logger.info(
`LIVE: ${tickets[tickets.length - 1]?.id} >= id > ${idCursor}`
);
} else {
logger.info(
`DRY: ${tickets[tickets.length - 1]?.id} >= id > ${idCursor}`
);
}

if (tickets.length > 0) {
const lastTicket = tickets[tickets.length - 1];
if (lastTicket) {
idCursor = lastTicket.id;
}
}
} while (tickets.length === QUERY_BATCH_SIZE);
}

async function cleanArticles(
connector: ConnectorResource,
logger: typeof Logger,
execute: boolean
) {
let idCursor: number = 0;
let articles: ZendeskArticle[] = [];
do {
articles = await ZendeskArticle.findAll({
where: {
connectorId: connector.id,
id: { [Op.gt]: idCursor },
},
limit: QUERY_BATCH_SIZE,
order: [["id", "ASC"]],
});

if (execute) {
await concurrentExecutor(
articles,
async (article) => {
return deleteDataSourceDocument(
dataSourceConfigFromConnector(connector),
// this is the old internal ID
getArticleInternalId({
connectorId: connector.id,
articleId: article.articleId,
})
);
},
{ concurrency: DOCUMENT_CONCURRENCY }
);
logger.info(
`LIVE: ${articles[articles.length - 1]?.id} >= id > ${idCursor}`
);
} else {
logger.info(
`DRY: ${articles[articles.length - 1]?.id} >= id > ${idCursor}`
);
}

if (articles.length > 0) {
const lastTicket = articles[articles.length - 1];
if (lastTicket) {
idCursor = lastTicket.id;
}
}
} while (articles.length === QUERY_BATCH_SIZE);
}

makeScript(
{ resourceType: { type: "string", choices: ["tickets", "articles"] } },
async ({ execute, resourceType }, logger) => {
const connectors = await ConnectorResource.listByType("zendesk", {});

switch (resourceType) {
case "tickets": {
for (const connector of connectors) {
logger.info({ connectorId: connector.id }, `MIGRATING`);
await cleanTickets(
connector,
logger.child({ connectorId: connector.id }),
execute
);
}
break;
}
case "articles": {
for (const connector of connectors) {
logger.info({ connectorId: connector.id }, `MIGRATING`);
await cleanArticles(
connector,
logger.child({ connectorId: connector.id }),
execute
);
}
break;
}
default: {
throw new Error(`Invalid resource type: ${resourceType}`);
}
}
}
);
24 changes: 24 additions & 0 deletions connectors/src/connectors/zendesk/lib/id_conversions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ export function getArticleInternalId({
return `zendesk-article-${connectorId}-${articleId}`;
}

export function getArticleNewInternalId({
connectorId,
brandId,
articleId,
}: {
connectorId: ModelId;
brandId: number;
articleId: number;
}): string {
return `zendesk-article-${connectorId}-${brandId}-${articleId}`;
}

export function getTicketsInternalId({
connectorId,
brandId,
Expand All @@ -67,6 +79,18 @@ export function getTicketInternalId({
return `zendesk-ticket-${connectorId}-${ticketId}`;
}

export function getTicketNewInternalId({
connectorId,
brandId,
ticketId,
}: {
connectorId: ModelId;
brandId: number;
ticketId: number;
}): string {
return `zendesk-ticket-${connectorId}-${brandId}-${ticketId}`;
}

/**
* Conversion from an internalId to an id.
*/
Expand Down
24 changes: 20 additions & 4 deletions connectors/src/connectors/zendesk/lib/sync_article.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import type {
ZendeskFetchedSection,
ZendeskFetchedUser,
} from "@connectors/@types/node-zendesk";
import { getArticleInternalId } from "@connectors/connectors/zendesk/lib/id_conversions";
import {
getArticleInternalId,
getArticleNewInternalId,
} from "@connectors/connectors/zendesk/lib/id_conversions";
import {
deleteDataSourceDocument,
renderDocumentTitleAndContent,
Expand Down Expand Up @@ -146,15 +149,27 @@ export async function syncArticle({
updatedAt,
});

const documentId = getArticleInternalId({
const oldDocumentId = getArticleInternalId({
connectorId,
articleId: article.id,
});

// TODO(2025-01-02 aubin): stop deleting old documents once the migration of internal IDs is done.
await deleteDataSourceDocument(dataSourceConfig, oldDocumentId, {
...loggerArgs,
articleId: article.id,
});

const parents = articleInDb.getParentInternalIds(connectorId);
const newDocumentId = getArticleNewInternalId({
connectorId,
brandId: category.brandId,
articleId: article.id,
});

await upsertDataSourceDocument({
dataSourceConfig,
documentId,
documentId: newDocumentId,
documentContent,
documentUrl: article.html_url,
timestampMs: updatedAt.getTime(),
Expand All @@ -163,14 +178,15 @@ export async function syncArticle({
`createdAt:${createdAt.getTime()}`,
`updatedAt:${updatedAt.getTime()}`,
],
parents,
parents: [newDocumentId, ...parents.slice(1)],
parentId: parents[1],
loggerArgs: { ...loggerArgs, articleId: article.id },
upsertContext: { sync_type: "batch" },
title: article.title,
mimeType: "application/vnd.dust.zendesk.article",
async: true,
});

await articleInDb.update({ lastUpsertedTs: new Date(currentSyncDateMs) });
} else {
logger.warn(
Expand Down
23 changes: 19 additions & 4 deletions connectors/src/connectors/zendesk/lib/sync_ticket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import type {
ZendeskFetchedTicketComment,
ZendeskFetchedUser,
} from "@connectors/@types/node-zendesk";
import { getTicketInternalId } from "@connectors/connectors/zendesk/lib/id_conversions";
import {
getTicketInternalId,
getTicketNewInternalId,
} from "@connectors/connectors/zendesk/lib/id_conversions";
import {
deleteDataSourceDocument,
renderDocumentTitleAndContent,
Expand Down Expand Up @@ -204,15 +207,26 @@ ${comments
updatedAt: updatedAtDate,
});

const documentId = getTicketInternalId({
const oldDocumentId = getTicketInternalId({
connectorId,
ticketId: ticket.id,
});
// TODO(2025-01-02 aubin): stop deleting old documents once the migration of internal IDs is done.
await deleteDataSourceDocument(dataSourceConfig, oldDocumentId, {
...loggerArgs,
ticketId: ticket.id,
});

const parents = ticketInDb.getParentInternalIds(connectorId);
const newDocumentId = getTicketNewInternalId({
connectorId,
brandId,
ticketId: ticket.id,
});

await upsertDataSourceDocument({
dataSourceConfig,
documentId,
documentId: newDocumentId,
documentContent,
documentUrl: ticket.url,
timestampMs: updatedAtDate.getTime(),
Expand All @@ -222,14 +236,15 @@ ${comments
`updatedAt:${updatedAtDate.getTime()}`,
`createdAt:${createdAtDate.getTime()}`,
],
parents,
parents: [newDocumentId, ...parents.slice(1)],
parentId: parents[1],
loggerArgs: { ...loggerArgs, ticketId: ticket.id },
upsertContext: { sync_type: "batch" },
title: ticket.subject,
mimeType: "application/vnd.dust.zendesk.ticket",
async: true,
});

await ticketInDb.update({ lastUpsertedTs: new Date(currentSyncDateMs) });
} else {
logger.warn(
Expand Down

0 comments on commit 4da155e

Please sign in to comment.