From 632c04a124f62842f5bbc0f4e59c583c629fae52 Mon Sep 17 00:00:00 2001 From: anas-ibrahem Date: Mon, 15 Dec 2025 01:37:38 +0200 Subject: [PATCH 1/3] fix: redis lock extend and job limit --- src/tweet-analyze/tweet-analyze.service.ts | 47 +++++++++++++++++++++- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/src/tweet-analyze/tweet-analyze.service.ts b/src/tweet-analyze/tweet-analyze.service.ts index f37f02ef..7b006e60 100644 --- a/src/tweet-analyze/tweet-analyze.service.ts +++ b/src/tweet-analyze/tweet-analyze.service.ts @@ -22,6 +22,9 @@ export class TweetAnalyzeService implements OnModuleInit { private readonly analyzeApiUrl: string; private readonly LOCK_KEY = 'tweet-analyze:lock'; private readonly LOCK_TTL_SECONDS = 300; // 5 minutes + private readonly LOCK_EXTENSION_INTERVAL = 60; // Extend lock every 1 minute + private readonly LIMIT_PER_JOB = 100; // Max tweets to process per job run + private lockExtensionInterval: NodeJS.Timeout | null = null; constructor( private readonly configService: ConfigService, @@ -76,6 +79,9 @@ export class TweetAnalyzeService implements OnModuleInit { this.logger.log('=== Starting Tweet Analysis Job ==='); try { + // Start lock extension mechanism + this.startLockExtension(); + const tweetsToAnalyze = await this.getTweetsToAnalyze(); if (tweetsToAnalyze.length === 0) { @@ -83,9 +89,21 @@ export class TweetAnalyzeService implements OnModuleInit { return; } - this.logger.log(`Retrieved ${tweetsToAnalyze.length} tweets for analysis`); + // Apply limit per job + const tweetsToProcess = + tweetsToAnalyze.length > this.LIMIT_PER_JOB + ? tweetsToAnalyze.slice(0, this.LIMIT_PER_JOB) + : tweetsToAnalyze; - const batches = this.splitIntoBatches(tweetsToAnalyze, this.requestLimit); + if (tweetsToAnalyze.length > this.LIMIT_PER_JOB) { + this.logger.log( + `Retrieved ${tweetsToAnalyze.length} tweets, but limiting to ${this.LIMIT_PER_JOB} for this job run`, + ); + } else { + this.logger.log(`Retrieved ${tweetsToAnalyze.length} tweets for analysis`); + } + + const batches = this.splitIntoBatches(tweetsToProcess, this.requestLimit); this.logger.log( `Split into ${batches.length} batch(es) (limit: ${this.requestLimit} tweets/batch)`, @@ -159,6 +177,7 @@ export class TweetAnalyzeService implements OnModuleInit { error instanceof Error ? error.stack : String(error), ); } finally { + this.stopLockExtension(); await this.releaseLock(); } } @@ -199,6 +218,30 @@ export class TweetAnalyzeService implements OnModuleInit { } } + private startLockExtension(): void { + this.lockExtensionInterval = setInterval(async () => { + try { + const redis = this.redisService.getClient(); + await redis.expire(this.LOCK_KEY, this.LOCK_TTL_SECONDS); + this.logger.debug(`Extended distributed lock TTL to ${this.LOCK_TTL_SECONDS}s`); + } catch (error) { + this.logger.error( + 'Failed to extend distributed lock TTL', + error instanceof Error ? error.stack : String(error), + ); + } + }, this.LOCK_EXTENSION_INTERVAL * 1000); + this.logger.debug(`Started lock extension (every ${this.LOCK_EXTENSION_INTERVAL}s)`); + } + + private stopLockExtension(): void { + if (this.lockExtensionInterval) { + clearInterval(this.lockExtensionInterval); + this.lockExtensionInterval = null; + this.logger.debug('Stopped lock extension'); + } + } + private async getTweetsToAnalyze(): Promise> { this.logger.debug('Fetching tweets to analyze from repository'); const tweets = await this.repository.findTweetsToClassify(); From 0729e010fe003b9aaa0de26716e58dfb203e564c Mon Sep 17 00:00:00 2001 From: anas-ibrahem Date: Mon, 15 Dec 2025 02:36:24 +0200 Subject: [PATCH 2/3] feat: make continous runs --- src/tweet-analyze/tweet-analyze.service.ts | 179 +++++++++++++-------- 1 file changed, 108 insertions(+), 71 deletions(-) diff --git a/src/tweet-analyze/tweet-analyze.service.ts b/src/tweet-analyze/tweet-analyze.service.ts index 7b006e60..1f6c8bf8 100644 --- a/src/tweet-analyze/tweet-analyze.service.ts +++ b/src/tweet-analyze/tweet-analyze.service.ts @@ -51,6 +51,15 @@ export class TweetAnalyzeService implements OnModuleInit { const intervalMs = this.intervalMinutes * 60 * 1000; this.logger.log(`Starting tweet analysis cron job (interval: ${this.intervalMinutes} min)`); + // Run first job immediately + this.analyzeTweets().catch((error) => { + this.logger.error( + 'Initial tweet analysis failed', + error instanceof Error ? error.stack : String(error), + ); + }); + + // Schedule periodic jobs setInterval(() => { this.analyzeTweets().catch((error) => { this.logger.error( @@ -82,94 +91,122 @@ export class TweetAnalyzeService implements OnModuleInit { // Start lock extension mechanism this.startLockExtension(); - const tweetsToAnalyze = await this.getTweetsToAnalyze(); - - if (tweetsToAnalyze.length === 0) { - this.logger.log('No tweets to analyze'); - return; - } - - // Apply limit per job - const tweetsToProcess = - tweetsToAnalyze.length > this.LIMIT_PER_JOB - ? tweetsToAnalyze.slice(0, this.LIMIT_PER_JOB) - : tweetsToAnalyze; - - if (tweetsToAnalyze.length > this.LIMIT_PER_JOB) { - this.logger.log( - `Retrieved ${tweetsToAnalyze.length} tweets, but limiting to ${this.LIMIT_PER_JOB} for this job run`, - ); - } else { - this.logger.log(`Retrieved ${tweetsToAnalyze.length} tweets for analysis`); - } + let totalProcessedAcrossRuns = 0; + let runNumber = 0; + + // Keep processing until no more tweets remain + while (true) { + runNumber++; + const tweetsToAnalyze = await this.getTweetsToAnalyze(); + + if (tweetsToAnalyze.length === 0) { + if (runNumber === 1) { + this.logger.log('No tweets to analyze'); + } else { + this.logger.log( + `All tweets processed across ${runNumber - 1} run(s) (${totalProcessedAcrossRuns} total tweets)`, + ); + } + break; + } - const batches = this.splitIntoBatches(tweetsToProcess, this.requestLimit); + // Apply limit per job run + const tweetsToProcess = + tweetsToAnalyze.length > this.LIMIT_PER_JOB + ? tweetsToAnalyze.slice(0, this.LIMIT_PER_JOB) + : tweetsToAnalyze; - this.logger.log( - `Split into ${batches.length} batch(es) (limit: ${this.requestLimit} tweets/batch)`, - ); + const hasMoreTweets = tweetsToAnalyze.length > this.LIMIT_PER_JOB; - let totalAnalyzedTweets = 0; - let allBatchesSucceeded = true; + this.logger.log(`--- Starting run ${runNumber} ---`); - // Accumulate trending data across all batches - const accumulatedTrendingKeywords: TrendingKeyword[] = []; - let totalTweetsInAllBatches = 0; + if (hasMoreTweets) { + this.logger.log( + `Retrieved ${tweetsToAnalyze.length} tweets, processing ${this.LIMIT_PER_JOB} in this run`, + ); + } else { + this.logger.log(`Retrieved ${tweetsToAnalyze.length} tweets for analysis`); + } - for (let i = 0; i < batches.length; i++) { - const batch = batches[i]; - this.logger.log(`Processing batch ${i + 1}/${batches.length} (${batch.length} tweets)`); + const batches = this.splitIntoBatches(tweetsToProcess, this.requestLimit); - try { - const batchResult = await this.processBatch(batch); - totalAnalyzedTweets += batchResult.analyzedTweetsCount; + this.logger.log( + `Split into ${batches.length} batch(es) (limit: ${this.requestLimit} tweets/batch)`, + ); - // Accumulate trending keywords from this batch - if (batchResult.trending_keywords) { - accumulatedTrendingKeywords.push(...batchResult.trending_keywords); - } - if (batchResult.batch_meta) { - totalTweetsInAllBatches += batchResult.batch_meta.total_tweets; + let runAnalyzedTweets = 0; + let allBatchesSucceeded = true; + + // Accumulate trending data across all batches in this run + const accumulatedTrendingKeywords: TrendingKeyword[] = []; + let totalTweetsInRun = 0; + + for (let i = 0; i < batches.length; i++) { + const batch = batches[i]; + this.logger.log(`Processing batch ${i + 1}/${batches.length} (${batch.length} tweets)`); + + try { + const batchResult = await this.processBatch(batch); + runAnalyzedTweets += batchResult.analyzedTweetsCount; + + // Accumulate trending keywords from this batch + if (batchResult.trending_keywords) { + accumulatedTrendingKeywords.push(...batchResult.trending_keywords); + } + if (batchResult.batch_meta) { + totalTweetsInRun += batchResult.batch_meta.total_tweets; + } + + this.logger.log( + `Batch ${i + 1}/${batches.length} completed ` + + `(analyzed: ${batchResult.analyzedTweetsCount}, keywords: ${batchResult.trending_keywords?.length || 0})`, + ); + } catch (error) { + this.logger.error( + `Batch ${i + 1}/${batches.length} failed`, + error instanceof Error ? error.stack : String(error), + ); + this.logger.warn( + `Stopping after batch ${i + 1} failure. ` + + `${batches.length - i - 1} batch(es) will retry in next job`, + ); + allBatchesSucceeded = false; + break; } + } + // Update trending scores for this run + if (accumulatedTrendingKeywords.length > 0) { this.logger.log( - `Batch ${i + 1}/${batches.length} completed ` + - `(analyzed: ${batchResult.analyzedTweetsCount}, keywords: ${batchResult.trending_keywords?.length || 0})`, - ); - } catch (error) { - this.logger.error( - `Batch ${i + 1}/${batches.length} failed`, - error instanceof Error ? error.stack : String(error), + `Updating trending scores with ${accumulatedTrendingKeywords.length} keywords from run ${runNumber}`, ); + await this.trendingService.updateTrendScores({ + batch_meta: { total_tweets: totalTweetsInRun }, + trending_keywords: accumulatedTrendingKeywords, + }); + this.logger.log('Trending scores updated successfully'); + } + + totalProcessedAcrossRuns += runAnalyzedTweets; + + if (!allBatchesSucceeded) { this.logger.warn( - `Stopping after batch ${i + 1} failure. ` + - `${batches.length - i - 1} batch(es) will retry in next run`, + `Run ${runNumber} stopped due to failure (${runAnalyzedTweets} tweets processed in this run)`, ); - allBatchesSucceeded = false; break; } - } - // Update trending scores once with accumulated data from all batches - if (accumulatedTrendingKeywords.length > 0) { - this.logger.log( - `Updating trending scores with ${accumulatedTrendingKeywords.length} accumulated keywords`, - ); - await this.trendingService.updateTrendScores({ - batch_meta: { total_tweets: totalTweetsInAllBatches }, - trending_keywords: accumulatedTrendingKeywords, - }); - this.logger.log('Trending scores updated successfully'); - } + this.logger.log(`Run ${runNumber} completed successfully (${runAnalyzedTweets} tweets)`); - if (allBatchesSucceeded) { - this.logger.log( - `=== Tweet Analysis Job Completed Successfully (${totalAnalyzedTweets} tweets) ===`, - ); - } else { - this.logger.warn( - `=== Tweet Analysis Job Stopped (${totalAnalyzedTweets} tweets processed) ===`, - ); + // If no more tweets remain, exit the loop + if (!hasMoreTweets) { + this.logger.log( + `=== Tweet Analysis Job Completed Successfully (${totalProcessedAcrossRuns} total tweets across ${runNumber} run(s)) ===`, + ); + break; + } + + this.logger.log('More tweets remain, continuing to next run immediately...'); } } catch (error) { this.logger.error( From 9aa3fd8967c2cfd3eb3f004b6d705dd33c9b51db Mon Sep 17 00:00:00 2001 From: anas-ibrahem Date: Mon, 15 Dec 2025 20:22:45 +0200 Subject: [PATCH 3/3] fix: linter --- src/tweet-analyze/tweet-analyze.service.ts | 24 ++++++++++++---------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/tweet-analyze/tweet-analyze.service.ts b/src/tweet-analyze/tweet-analyze.service.ts index 1f6c8bf8..e971afbc 100644 --- a/src/tweet-analyze/tweet-analyze.service.ts +++ b/src/tweet-analyze/tweet-analyze.service.ts @@ -256,17 +256,19 @@ export class TweetAnalyzeService implements OnModuleInit { } private startLockExtension(): void { - this.lockExtensionInterval = setInterval(async () => { - try { - const redis = this.redisService.getClient(); - await redis.expire(this.LOCK_KEY, this.LOCK_TTL_SECONDS); - this.logger.debug(`Extended distributed lock TTL to ${this.LOCK_TTL_SECONDS}s`); - } catch (error) { - this.logger.error( - 'Failed to extend distributed lock TTL', - error instanceof Error ? error.stack : String(error), - ); - } + this.lockExtensionInterval = setInterval(() => { + void (async () => { + try { + const redis = this.redisService.getClient(); + await redis.expire(this.LOCK_KEY, this.LOCK_TTL_SECONDS); + this.logger.debug(`Extended distributed lock TTL to ${this.LOCK_TTL_SECONDS}s`); + } catch (error) { + this.logger.error( + 'Failed to extend distributed lock TTL', + error instanceof Error ? error.stack : String(error), + ); + } + })(); }, this.LOCK_EXTENSION_INTERVAL * 1000); this.logger.debug(`Started lock extension (every ${this.LOCK_EXTENSION_INTERVAL}s)`); }