Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a7002cd
refactor: simplify entrypoint
sg-gs Nov 10, 2023
3767c8f
refactor(cd): unify Dockerfile
sg-gs Nov 10, 2023
4ac7962
feat: delete-files task
sg-gs Nov 10, 2023
d4d5ae1
feat: delete file versions
apsantiso Dec 24, 2025
57e402c
refactor: change name
apsantiso Dec 26, 2025
e654581
Merge pull request #2 from internxt/feat/delete-files-versions
sg-gs Dec 26, 2025
b2bc2fb
Add 'feat/delete-files' branch to workflow triggers
sg-gs Dec 26, 2025
cd19c09
Fix Dockerfile path in GitHub Actions workflow
sg-gs Dec 26, 2025
0913e9c
Fix Dockerfile path in GitHub Actions workflow
sg-gs Dec 26, 2025
0c92621
Add Dockerfile for consumer application
sg-gs Dec 26, 2025
b96d128
Rename workflow and update Docker image references
sg-gs Dec 26, 2025
0a84f85
Update Dockerfile path in CI workflow
sg-gs Dec 26, 2025
7efc58a
refactor(delete-files): remove version deletion from network
Dougama Jan 14, 2026
ac9953b
refactor: extract network deletion logic
Dougama Jan 14, 2026
0a1f405
feat: add task for file version deletion
Dougama Jan 14, 2026
798384a
Merge pull request #5 from internxt/refactor/remove-version-network-d…
douglas-xt Jan 20, 2026
58f2630
refactor(drive): simplify query in getDeletedFileVersions
Dougama Feb 2, 2026
a2085fe
Merge pull request #6 from internxt/refactor/extract-network-module
douglas-xt Feb 3, 2026
ef31252
Merge pull request #7 from internxt/feat/delete-file-versions
douglas-xt Feb 4, 2026
11e9aa8
Add delete file versions deployment
sg-gs Feb 4, 2026
62c61b0
Change Dockerfile reference and adjust deployment steps
sg-gs Feb 4, 2026
8ffa6da
Rename workflow and update deployment steps
sg-gs Feb 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions .github/workflows/build-and-deploy-consumer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
name: Build and Deploy Consumer to Production
on:
push:
branches: ["master", "feature/cd", "feat/delete-files"]
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Check Out Repo
uses: actions/checkout@v4
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build and push to DockerHub
uses: docker/build-push-action@v5
with:
context: .
file: ./Dockerfile
push: true
tags: ${{ secrets.DOCKERHUB_USERNAME }}/background-tasks:${{ github.sha }}
deploy:
needs: build
runs-on: ubuntu-latest
environment:
name: production
steps:
- uses: actions/checkout@master
- name: Update delete items deployment image
uses: steebchen/kubectl@v2.0.0
with:
config: ${{ secrets.KUBE_CONFIG_DATA }}
version: v1.27.4
command: set image --record deployment/delete-files-consumer delete-files-consumer=${{ secrets.DOCKERHUB_USERNAME }}/background-tasks:${{ github.sha }}
- name: Verify successful deployment
uses: steebchen/kubectl@v2.0.0
with:
config: ${{ secrets.KUBE_CONFIG_DATA }}
version: v1.27.4
command: rollout status deployment/delete-files-consumer
- name: Update delete file versions deployment image
uses: steebchen/kubectl@v2.0.0
with:
config: ${{ secrets.KUBE_CONFIG_DATA }}
version: v1.27.4
command: set image --record deployment/delete-file-versions-consumer delete-file-versions-consumer=${{ secrets.DOCKERHUB_USERNAME }}/background-tasks:${{ github.sha }}
- name: Verify successful deployment
uses: steebchen/kubectl@v2.0.0
with:
config: ${{ secrets.KUBE_CONFIG_DATA }}
version: v1.27.4
command: rollout status deployment/delete-file-versions-consumer
26 changes: 19 additions & 7 deletions .github/workflows/build-and-deploy-producer.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: build & deploy
on:
push:
branches: ["master", "feature/cd"]
branches: ["master", "feature/cd", "feat/delete-files"]
jobs:
build:
runs-on: ubuntu-latest
Expand All @@ -19,25 +19,37 @@ jobs:
uses: docker/build-push-action@v5
with:
context: ./
file: ./producer.Dockerfile
file: ./Dockerfile
push: true
tags: ${{ secrets.DOCKERHUB_USERNAME }}/drive-background-tasks-producer:${{ github.sha }}
tags: ${{ secrets.DOCKERHUB_USERNAME }}/background-tasks:${{ github.sha }}
deploy:
needs: build
runs-on: ubuntu-latest
environment:
name: production
steps:
- uses: actions/checkout@master
- name: Update deployment image
# - uses: actions/checkout@master
# - name: Update delete files/folders deployment image
# uses: steebchen/kubectl@v2.0.0
# with:
# config: ${{ secrets.KUBE_CONFIG_DATA }}
# version: v1.27.4
# command: set image --record deployment/background-tasks-producer background-tasks-producer=${{ secrets.DOCKERHUB_USERNAME }}/drive-background-tasks:${{ github.sha }}
# - name: Verify succesful deployment
# uses: steebchen/kubectl@v2.0.0
# with:
# config: ${{ secrets.KUBE_CONFIG_DATA }}
# version: v1.27.4
# command: rollout status deployment/background-tasks-producer
- name: Update delete file versions deployment image
uses: steebchen/kubectl@v2.0.0
with:
config: ${{ secrets.KUBE_CONFIG_DATA }}
version: v1.27.4
command: set image --record deployment/background-tasks-producer background-tasks-producer=${{ secrets.DOCKERHUB_USERNAME }}/drive-background-tasks-producer:${{ github.sha }}
command: set image --record deployment/delete-file-versions-producer delete-file-versions-producer=${{ secrets.DOCKERHUB_USERNAME }}/background-tasks:${{ github.sha }}
- name: Verify succesful deployment
uses: steebchen/kubectl@v2.0.0
with:
config: ${{ secrets.KUBE_CONFIG_DATA }}
version: v1.27.4
command: rollout status deployment/background-tasks-producer
command: rollout status deployment/delete-file-versions-producer
43 changes: 0 additions & 43 deletions .github/workflows/deploy-and-deploy-consumer.yaml

This file was deleted.

2 changes: 1 addition & 1 deletion producer.Dockerfile → Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ COPY . .
RUN yarn && yarn build && yarn --production && yarn cache clean

# Start server
CMD yarn start:prod:producer
CMD yarn start:prod
4 changes: 2 additions & 2 deletions consumer.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM node:iron-slim
FROM node:24
LABEL author="internxt"

WORKDIR /app
Expand All @@ -9,4 +9,4 @@ COPY . .
RUN yarn && yarn build && yarn --production && yarn cache clean

# Start server
CMD yarn start:prod:consumer
CMD yarn start:prod:consumer
106 changes: 20 additions & 86 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,34 @@ import amqp from 'amqplib';
import { v4 } from 'uuid';

import { createLogger } from './src/utils';
import { Consumer } from './src/consumer';
import { Producer } from './src/producer';
import { DriveDatabase } from './src/drive';
import { DeletedFoldersIterator } from './src/tasks/process-folder-deletion/deleted-folders.iterator';
import { taskTypes, tasks } from './src/tasks';
import { ProcessType } from './src/tasks/process';

const [,, ...args] = process.argv;
const [type] = args;
config();

if (!type) {
console.error('Missing argument: type');
process.exit(1);
}
const taskType = process.env.TASK_TYPE as undefined | string;

if (type !== 'producer' && type !== 'consumer') {
console.error('Invalid argument: type. Accepted values are "producer" or "consumer"');
if (!taskType || !taskTypes.includes(taskType)) {
console.error(`Invalid or missing task type. Expected ${
taskTypes.map(t => `'${t}'`).join(', ')
} but got '${taskType}'`);
process.exit(1);
}
const processId = v4();
const logger = createLogger(processId);

config();

const amqpServer = process.env.AMQP_SERVER;
const queueName = process.env.MARK_DELETED_ITEMS_QUEUE_NAME;
const maxEnqueuedItems = process.env.TASK_MARK_DELETED_ITEMS_PRODUCER_MAX_ENQUEUED_ITEMS;
const maxConcurrentItems = process.env.TASK_MARK_DELETED_ITEMS_CONSUMER_MAX_CONCURRENT_ITEMS;
const processType = process.env.PROCESS_TYPE as undefined | ProcessType;

if (!maxEnqueuedItems) {
logger.log('Missing env var: TASK_MARK_DELETED_ITEMS_PRODUCER_MAX_ENQUEUED_ITEMS');
if (!processType || !Object.values(ProcessType).includes(processType)) {
console.error(`Invalid or missing process type. Expected ${
Object.values(ProcessType).map(t => `'${t}'`).join(', ')
} but got '${processType}'`);
process.exit(1);
}

if (!maxConcurrentItems) {
logger.log('Missing env var: TASK_MARK_DELETED_ITEMS_CONSUMER_MAX_CONCURRENT_ITEMS');
process.exit(1);
}
const processId = v4();
const logger = createLogger(processId);

logger.log(`params: process_type -> ${type}, env -> ${
JSON.stringify({
maxConcurrentItems,
maxEnqueuedItems,
queueName
})
}`);
const amqpServer = process.env.QUEUE_SERVER;

let db: DriveDatabase;
let connection: amqp.Connection;
Expand Down Expand Up @@ -75,7 +59,7 @@ function handleStop() {
}
}

async function start(): Promise<{ connection: amqp.Connection, db: DriveDatabase }> {
async function start(): Promise<void> {
db = new DriveDatabase();

logger.log('(drive-db) connecting ...');
Expand All @@ -86,63 +70,13 @@ async function start(): Promise<{ connection: amqp.Connection, db: DriveDatabase
connection = await amqp.connect(amqpServer as string);
logger.log('(rabbit) connected !');

return { connection, db };
await tasks[taskType as string](processType as 'consumer' | 'producer', { db }, connection);
}

start().then(({ connection, db }) => {
if (type === 'producer') {
const deletedFoldersIterator = new DeletedFoldersIterator(db);

return connection.createChannel().then((channel) => {
const producer = new Producer(
channel,
queueName as string,
deletedFoldersIterator,
maxEnqueuedItems ? parseInt(maxEnqueuedItems as string) : undefined,
);

producer.on('enqueue', (item) => {
logger.log(`enqueued item: + ${JSON.stringify(item)}`, 'producer');
});

producer.on('queue-full', () => {
logger.log(`queue full, waiting 1s...`, 'producer');
});

return producer.run();
});
} else {
connection.createChannel().then((channel) => {
const consumer = new Consumer<{
folder_id: string,
processed: boolean,
created_at: Date,
updated_at: Date,
processed_at: Date,
}>(
channel,
queueName as string,
async (taskPayload) => {
logger.log(`received item: + ${JSON.stringify(taskPayload)}`, 'consumer');

await db.markChildrenFilesAsDeleted(taskPayload.folder_id);
await db.markChildrenFoldersAsDeleted(taskPayload.folder_id);
await db.markDeletedFolderAsProcessed([taskPayload.folder_id]);
},
maxConcurrentItems ? parseInt(maxConcurrentItems as string) : undefined,
);

consumer.on('error', ({ err, msg }) => {
logger.error(`error processing item: ${JSON.stringify(msg.content)}`, err, 'consumer');
});

consumer.run();
});
}
}).catch((err) => {
start().catch((err) => {
logger.error('Error starting', err);
process.exit(1);
})
});

process.on('uncaughtException', (err) => {
logger.error('Uncaught exception', err);
Expand Down
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@
"main": "index.js",
"license": "MIT",
"scripts": {
"start:dev:producer": "ts-node index.ts producer",
"start:dev:consumer": "ts-node index.ts consumer",
"start:prod:producer": "node dist/index.js producer",
"start:prod:consumer": "node dist/index.js consumer",
"start:dev": "ts-node index.ts",
"start:prod": "node dist/index.js",
"build": "tsc"
},
"dependencies": {
"amqplib": "^0.10.3",
"axios": "^1.6.1",
"dotenv": "^16.3.1",
"jsonwebtoken": "^9.0.2",
"pg": "^8.11.3",
"ts-node": "^10.9.1",
"typescript": "^5.2.2",
"uuid": "^9.0.1"
},
"devDependencies": {
"@types/amqplib": "^0.10.3",
"@types/jsonwebtoken": "^9.0.5",
"@types/node": "^20.8.9",
"@types/pg": "^8.10.7",
"@types/uuid": "^9.0.6"
Expand Down
Loading