Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CoreNodes] Add cleanup script for Google Drive parents #10442

Merged
merged 2 commits into from
Feb 3, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add migration script
  • Loading branch information
aubin-tchoi committed Jan 31, 2025

Verified

This commit was signed with the committer’s verified signature.
commit 566d03cbf59b6d78a9a879754e940d06ed97d756
277 changes: 277 additions & 0 deletions front/migrations/20250131_fix_google_drive_parents.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
import { concurrentExecutor, CoreAPI, Ok } from "@dust-tt/types";
import assert from "assert";

import apiConfig from "@app/lib/api/config";
import { getCorePrimaryDbConnection } from "@app/lib/production_checks/utils";
import { DataSourceModel } from "@app/lib/resources/storage/models/data_source";
import { withRetries } from "@app/lib/utils/retries";
import type Logger from "@app/logger/logger";
import { makeScript } from "@app/scripts/helpers";

const QUERY_BATCH_SIZE = 256;
const NODE_CONCURRENCY = 16;

async function migrateDocument({
coreAPI,
dataSource,
coreNode,
execute,
skipIfParentsAreAlreadyCorrect,
logger,
}: {
coreAPI: CoreAPI;
dataSource: DataSourceModel;
coreNode: {
parents: string[];
node_id: string;
};
execute: boolean;
skipIfParentsAreAlreadyCorrect: boolean;
logger: typeof Logger;
}) {
let newParents = coreNode.parents;
let newParentId: string | null = null;
try {
const uniqueIds = [
new Set(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no guarantees on resulting order when using set
unless you're really sure, we'd need to have guarantees on parents sorting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought order was preserved when writing this, gonna check again out of curiosity, changing if needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unlike most languages, js Sets actually preserve order: https://dust.tt/w/0ec9852c2f/assistant/FycTesggB2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks for looking
so here there is an assumption that botched parents are still symmetric in the right order--which indeed seems correct to me from afar
IMO for unconventional language features such as this (in the future, not here), might warrant a comment (showing you know or you checked)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, if the order of the parents was lost at any point we won't recover it here (we would need to run a backfill relying on connectors db)

[coreNode.node_id, ...coreNode.parents].map((x) =>
x.replace("gdrive-", "")
)
),
];
newParents = uniqueIds.map((id) => `gdrive-${id}`);
newParentId = newParents[1] || null;
} catch (e) {
logger.error(
{
nodeId: coreNode.node_id,
parents: coreNode.parents,
},
`TRANSFORM_ERROR`
);
throw e;
}

if (
skipIfParentsAreAlreadyCorrect &&
newParents.every((x, i) => x === coreNode.parents[i])
) {
logger.info(
{
documentId: coreNode.node_id,
fromParents: coreNode.parents,
toParents: newParents,
},
`SKIP document (parents are already correct)`
);
return new Ok(undefined);
}

if (execute) {
await withRetries(
async () => {
const updateRes = await coreAPI.updateDataSourceDocumentParents({
projectId: dataSource.dustAPIProjectId,
dataSourceId: dataSource.dustAPIDataSourceId,
documentId: coreNode.node_id,
parents: newParents,
parentId: newParentId,
});
if (updateRes.isErr()) {
logger.error(
{
nodeId: coreNode.node_id,
fromParents: coreNode.parents,
toParents: newParents,
toParentId: newParentId,
},
`Error while updating parents`
);
throw new Error(updateRes.error.message);
}
},
{ retries: 10 }
)({});

logger.info(
{
nodeId: coreNode.node_id,
fromParents: coreNode.parents,
toParents: newParents,
},
`LIVE`
);
} else {
logger.info(
{
nodeId: coreNode.node_id,
fromParents: coreNode.parents,
toParents: newParents,
},
`DRY`
);
}

return new Ok(undefined);
}

async function migrateDataSource({
coreAPI,
dataSource,
execute,
skipIfParentsAreAlreadyCorrect,
parentLogger,
}: {
coreAPI: CoreAPI;
dataSource: DataSourceModel;
execute: boolean;
skipIfParentsAreAlreadyCorrect: boolean;
parentLogger: typeof Logger;
}) {
const logger = parentLogger.child({ dataSourceId: dataSource.id });
const corePrimary = getCorePrimaryDbConnection();

// Retrieve the core data source.
const [coreDataSourceRows] = (await corePrimary.query(
`SELECT id, data_source_id
FROM data_sources
WHERE project = ?
AND data_source_id = ?`,
{
replacements: [
dataSource.dustAPIProjectId,
dataSource.dustAPIDataSourceId,
],
}
)) as { id: number; data_source_id: string }[][];

assert(
coreDataSourceRows.length === 1 &&
coreDataSourceRows[0].data_source_id === dataSource.dustAPIDataSourceId,
"Core data source mismatch"
);
const coreDataSourceId = coreDataSourceRows[0].id;

// For all nodes in the data source (can be big).
let nextId = "";

for (;;) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: those are hard to read, i suggest you always use a control flow statement expliciting the condition here, or if no condition use forEach / map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can turn this into a do while but since I have to keep the break the condition isn't doing anything so didn't bother with an unused condition

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so i'd have gone with while(nextId) and having nextId = rows[rows.length - 1]?.node_id;
but it's ok not to change here 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that works too!

const [rows] = (await (async () => {
return corePrimary.query(
`SELECT node_id, parents
FROM data_sources_nodes
WHERE data_source = :coreDataSourceId
AND node_id > :nextId
AND EXISTS
(
SELECT 1
FROM UNNEST(parents) p
WHERE p NOT LIKE 'gdrive-%'
)
ORDER BY node_id
LIMIT :batchSize`,
{
replacements: {
coreDataSourceId,
nextId,
batchSize: QUERY_BATCH_SIZE,
},
}
);
})()) as {
parents: string[];
node_id: string;
}[][];

logger.info({ nextId, rowCount: rows.length }, "BATCH");

if (rows.length === 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i understand it might be a little faster to write, but it's better to optimize for easier to understand :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to keep this break for rows[rows.length - 1] to not be undefined on the next line

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessarily, but fine as is 👍

break;
}

nextId = rows[rows.length - 1].node_id;

// concurrentExecutor on documents
try {
await concurrentExecutor(
rows,
(coreNode) =>
migrateDocument({
coreAPI,
dataSource,
coreNode,
skipIfParentsAreAlreadyCorrect,
execute,
logger,
}),
{ concurrency: NODE_CONCURRENCY }
);
} catch (e) {
logger.error(
{
error: e,
nextDataSourceId: dataSource.id,
nextId,
},
`ERROR`
);
throw e;
}
}
}

async function migrateAll({
coreAPI,
nextDataSourceId,
execute,
skipIfParentsAreAlreadyCorrect,
logger,
}: {
coreAPI: CoreAPI;
nextDataSourceId: number;
execute: boolean;
skipIfParentsAreAlreadyCorrect: boolean;
logger: typeof Logger;
}) {
// retrieve all data sources for the provider
const dataSources = await DataSourceModel.findAll({
where: { connectorProvider: "google_drive" },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
where: { connectorProvider: "google_drive" },
where: { connectorProvider: "google_drive" },
id: {
[Op.gt]: nextDataSourceId,
},

order: [["id", "ASC"]],
});

for (const dataSource of dataSources) {
if (dataSource.id >= nextDataSourceId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do that in the where clause

Copy link
Contributor Author

@aubin-tchoi aubin-tchoi Feb 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually meant to log when skipping a datasource but forgot about it, adding it
Edit: actually I did not forget about it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I get that. You use this condition to make the nextDataSourceId param useful
So if you start using it and call your script with nextDataSourceId > 1000, you want 1. to grab all datasources anyways, and 2. to log "SKIP" for all the datasources with an id < nextDataSourceId?
What would be the purpose?

it's not critical to the PR so still going to approve though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the purpose is that it doesn't cost anything and if anything fishy happens on a datasource, I'd grep on the datasource ID and it's more readable to see that it was skipped vs not seeing anything and having to check in the script what it means. In general I'd rather not optimize queries that are ran once and are fast anyway if it allows me to have more easily parsable logs

logger.info({ dataSourceId: dataSource.id }, "MIGRATING");
await migrateDataSource({
coreAPI,
dataSource,
execute,
skipIfParentsAreAlreadyCorrect,
parentLogger: logger,
});
} else {
logger.info({ dataSourceId: dataSource.id }, "SKIP");
}
}
}

makeScript(
{
skipIfParentsAreAlreadyCorrect: { type: "boolean", default: false },
nextDataSourceId: { type: "number", default: 0 },
},
async (
{ nextDataSourceId, execute, skipIfParentsAreAlreadyCorrect },
logger
) => {
const coreAPI = new CoreAPI(apiConfig.getCoreAPIConfig(), logger);

await migrateAll({
coreAPI,
nextDataSourceId,
execute,
skipIfParentsAreAlreadyCorrect,
logger,
});
}
);