Skip to content

Commit

Permalink
[Nodes core] Fix backfill again (#10198)
Browse files Browse the repository at this point in the history
Description
---
Connecting to providers doesn't work after all

Risks
---
tested

Deploy
---
na
  • Loading branch information
philipperolet authored Jan 23, 2025
1 parent 9db9d1b commit 2dfc190
Showing 1 changed file with 11 additions and 28 deletions.
39 changes: 11 additions & 28 deletions connectors/migrations/20250122_gdrive_clean_parents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import {
concurrentExecutor,
getGoogleSheetTableId,
MIME_TYPES,
removeNulls,
} from "@dust-tt/types";
import _ from "lodash";
import type { LoggerOptions } from "pino";
Expand All @@ -11,11 +10,7 @@ import { makeScript } from "scripts/helpers";

import { getSourceUrlForGoogleDriveFiles } from "@connectors/connectors/google_drive";
import { getLocalParents } from "@connectors/connectors/google_drive/lib";
import {
getAuthObject,
getDriveClient,
getInternalId,
} from "@connectors/connectors/google_drive/temporal/utils";
import { getInternalId } from "@connectors/connectors/google_drive/temporal/utils";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import {
updateDataSourceDocumentParents,
Expand Down Expand Up @@ -53,27 +48,11 @@ async function migrateConnector(

// isolate roots that are drives
// no need to update parents for anything whose parent is a drive
const authCredentials = await getAuthObject(connector.connectionId);
const driveClient = await getDriveClient(authCredentials);
const driveRoots = removeNulls(
await concurrentExecutor(
roots,
async (root) => {
try {
const drive = await driveClient.drives.get({
driveId: root.folderId,
});
if (drive.status !== 200) {
return null;
}
return getInternalId(root.folderId);
} catch (e) {
return null;
}
},
{ concurrency: 4 }
const driveRoots = roots
.filter(
(root) => root.folderId.startsWith("0A") && root.folderId.length < 27
)
);
.map((root) => getInternalId(root.folderId));

logger.info({ driveRoots }, "Excluded drive roots");
logger.info({ numberOfFiles: files.length }, "Found files");
Expand All @@ -83,6 +62,7 @@ async function migrateConnector(

const chunks = _.chunk(files, 1024);

let totalProcessed = 0;
for (const chunk of chunks) {
const result = await processFilesBatch({
connector,
Expand All @@ -92,21 +72,22 @@ async function migrateConnector(
startTimeTs,
driveRoots,
});
totalProcessed += result;
logger.info(
{ numberOfFiles: result, execute, batchSize: chunk.length },
"Processed files batch"
);
await new Promise((resolve) => setTimeout(resolve, 50));
}

logger.info({ totalProcessed }, "Files: total processed");
const sheets = await GoogleDriveSheet.findAll({
where: {
connectorId: connector.id,
},
});

const sheetsChunks = _.chunk(sheets, 1024);

totalProcessed = 0;
for (const chunk of sheetsChunks) {
const result = await processSheetsBatch({
connector,
Expand All @@ -116,12 +97,14 @@ async function migrateConnector(
startTimeTs,
driveRoots,
});
totalProcessed += result;
logger.info(
{ numberOfSheets: result, execute, batchSize: chunk.length },
"Processed sheets batch"
);
await new Promise((resolve) => setTimeout(resolve, 50));
}
logger.info({ totalProcessed }, "Sheets: total processed");
}

async function processFilesBatch({
Expand Down

0 comments on commit 2dfc190

Please sign in to comment.