Skip to content

Commit ed034d2

Browse files
committed
scrub deleted wip
1 parent 8d0af84 commit ed034d2

File tree

1 file changed

+130
-0
lines changed

1 file changed

+130
-0
lines changed
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import { Storage } from "@google-cloud/storage";
2+
import { createHash } from "blake3";
3+
import { Sequelize } from "sequelize";
4+
5+
// To be run from connectors with `CORE_DATABASE_URI`
6+
const {
7+
CORE_DATABASE_URI,
8+
SERVICE_ACCOUNT,
9+
DUST_DATA_SOURCES_BUCKET,
10+
LIVE = false,
11+
} = process.env;
12+
13+
async function main() {
14+
const core_sequelize = new Sequelize(CORE_DATABASE_URI as string, {
15+
logging: false,
16+
});
17+
18+
const deletedDocumentsData = await core_sequelize.query(
19+
`SELECT * FROM data_sources_documents WHERE status = 'deleted'`
20+
);
21+
22+
const deletedDocuments = deletedDocumentsData[0] as {
23+
id: number;
24+
created: number;
25+
data_source: number;
26+
document_id: string;
27+
hash: string;
28+
}[];
29+
30+
console.log(
31+
`Found ${deletedDocuments.length} deleted core documents to process`
32+
);
33+
34+
const chunks = [];
35+
for (let i = 0; i < deletedDocuments.length; i += 32) {
36+
chunks.push(deletedDocuments.slice(i, i + 32));
37+
}
38+
39+
for (let i = 0; i < chunks.length; i++) {
40+
console.log(`Processing chunk ${i}/${chunks.length}...`);
41+
const chunk = chunks[i];
42+
await Promise.all(
43+
chunk.map(async (d) => {
44+
return scrubDocument(
45+
core_sequelize,
46+
d.data_source,
47+
d.document_id,
48+
d.created,
49+
d.hash
50+
);
51+
})
52+
);
53+
}
54+
}
55+
56+
async function scrubDocument(
57+
core_sequelize: Sequelize,
58+
data_source: number,
59+
document_id: string,
60+
deletedAt: number,
61+
hash: string
62+
) {
63+
const moreRecentSameHash = await core_sequelize.query(
64+
`SELECT id FROM data_sources_documents WHERE data_source = :data_source AND document_id = :document_id AND hash = :hash AND status != 'deleted' AND created > :deletedAt LIMIT 1`,
65+
{
66+
replacements: {
67+
data_source: data_source,
68+
document_id: document_id,
69+
hash: hash,
70+
deletedAt: deletedAt,
71+
},
72+
}
73+
);
74+
75+
if (moreRecentSameHash[0].length > 0) {
76+
console.log(
77+
`Skipping ${document_id} as there is a more recent version with the same hash`
78+
);
79+
return;
80+
}
81+
82+
const dataSourceData = await core_sequelize.query(
83+
`SELECT * FROM data_sources WHERE id = :data_source`,
84+
{
85+
replacements: {
86+
data_source: data_source,
87+
},
88+
}
89+
);
90+
91+
if (dataSourceData[0].length === 0) {
92+
throw new Error(`Could not find data source ${data_source}`);
93+
}
94+
95+
const dataSource = dataSourceData[0][0] as {
96+
id: number;
97+
project: number;
98+
internal_id: string;
99+
};
100+
101+
// console.log(dataSource);
102+
103+
if (LIVE) {
104+
const hasher = createHash();
105+
hasher.update(Buffer.from(document_id));
106+
const documentIdHash = hasher.digest("hex");
107+
108+
const path = `${dataSource.project}/${dataSource.internal_id}/${documentIdHash}/${hash}`;
109+
110+
const storage = new Storage({ keyFilename: SERVICE_ACCOUNT });
111+
112+
const [files] = await storage
113+
.bucket(DUST_DATA_SOURCES_BUCKET || "")
114+
.getFiles({ prefix: path });
115+
116+
if (files.length > 0) {
117+
console.log(files);
118+
}
119+
}
120+
}
121+
122+
main()
123+
.then(() => {
124+
console.log("Done");
125+
process.exit(0);
126+
})
127+
.catch((err) => {
128+
console.error(err);
129+
process.exit(1);
130+
});

0 commit comments

Comments
 (0)