diff --git a/src/tweet-analyze/tweet-analyze.service.ts b/src/tweet-analyze/tweet-analyze.service.ts index f37f02ef..e971afbc 100644 --- a/src/tweet-analyze/tweet-analyze.service.ts +++ b/src/tweet-analyze/tweet-analyze.service.ts @@ -22,6 +22,9 @@ export class TweetAnalyzeService implements OnModuleInit { private readonly analyzeApiUrl: string; private readonly LOCK_KEY = 'tweet-analyze:lock'; private readonly LOCK_TTL_SECONDS = 300; // 5 minutes + private readonly LOCK_EXTENSION_INTERVAL = 60; // Extend lock every 1 minute + private readonly LIMIT_PER_JOB = 100; // Max tweets to process per job run + private lockExtensionInterval: NodeJS.Timeout | null = null; constructor( private readonly configService: ConfigService, @@ -48,6 +51,15 @@ export class TweetAnalyzeService implements OnModuleInit { const intervalMs = this.intervalMinutes * 60 * 1000; this.logger.log(`Starting tweet analysis cron job (interval: ${this.intervalMinutes} min)`); + // Run first job immediately + this.analyzeTweets().catch((error) => { + this.logger.error( + 'Initial tweet analysis failed', + error instanceof Error ? error.stack : String(error), + ); + }); + + // Schedule periodic jobs setInterval(() => { this.analyzeTweets().catch((error) => { this.logger.error( @@ -76,82 +88,125 @@ export class TweetAnalyzeService implements OnModuleInit { this.logger.log('=== Starting Tweet Analysis Job ==='); try { - const tweetsToAnalyze = await this.getTweetsToAnalyze(); - - if (tweetsToAnalyze.length === 0) { - this.logger.log('No tweets to analyze'); - return; - } - - this.logger.log(`Retrieved ${tweetsToAnalyze.length} tweets for analysis`); + // Start lock extension mechanism + this.startLockExtension(); + + let totalProcessedAcrossRuns = 0; + let runNumber = 0; + + // Keep processing until no more tweets remain + while (true) { + runNumber++; + const tweetsToAnalyze = await this.getTweetsToAnalyze(); + + if (tweetsToAnalyze.length === 0) { + if (runNumber === 1) { + this.logger.log('No tweets to analyze'); + } else { + this.logger.log( + `All tweets processed across ${runNumber - 1} run(s) (${totalProcessedAcrossRuns} total tweets)`, + ); + } + break; + } - const batches = this.splitIntoBatches(tweetsToAnalyze, this.requestLimit); + // Apply limit per job run + const tweetsToProcess = + tweetsToAnalyze.length > this.LIMIT_PER_JOB + ? tweetsToAnalyze.slice(0, this.LIMIT_PER_JOB) + : tweetsToAnalyze; - this.logger.log( - `Split into ${batches.length} batch(es) (limit: ${this.requestLimit} tweets/batch)`, - ); + const hasMoreTweets = tweetsToAnalyze.length > this.LIMIT_PER_JOB; - let totalAnalyzedTweets = 0; - let allBatchesSucceeded = true; + this.logger.log(`--- Starting run ${runNumber} ---`); - // Accumulate trending data across all batches - const accumulatedTrendingKeywords: TrendingKeyword[] = []; - let totalTweetsInAllBatches = 0; + if (hasMoreTweets) { + this.logger.log( + `Retrieved ${tweetsToAnalyze.length} tweets, processing ${this.LIMIT_PER_JOB} in this run`, + ); + } else { + this.logger.log(`Retrieved ${tweetsToAnalyze.length} tweets for analysis`); + } - for (let i = 0; i < batches.length; i++) { - const batch = batches[i]; - this.logger.log(`Processing batch ${i + 1}/${batches.length} (${batch.length} tweets)`); + const batches = this.splitIntoBatches(tweetsToProcess, this.requestLimit); - try { - const batchResult = await this.processBatch(batch); - totalAnalyzedTweets += batchResult.analyzedTweetsCount; + this.logger.log( + `Split into ${batches.length} batch(es) (limit: ${this.requestLimit} tweets/batch)`, + ); - // Accumulate trending keywords from this batch - if (batchResult.trending_keywords) { - accumulatedTrendingKeywords.push(...batchResult.trending_keywords); - } - if (batchResult.batch_meta) { - totalTweetsInAllBatches += batchResult.batch_meta.total_tweets; + let runAnalyzedTweets = 0; + let allBatchesSucceeded = true; + + // Accumulate trending data across all batches in this run + const accumulatedTrendingKeywords: TrendingKeyword[] = []; + let totalTweetsInRun = 0; + + for (let i = 0; i < batches.length; i++) { + const batch = batches[i]; + this.logger.log(`Processing batch ${i + 1}/${batches.length} (${batch.length} tweets)`); + + try { + const batchResult = await this.processBatch(batch); + runAnalyzedTweets += batchResult.analyzedTweetsCount; + + // Accumulate trending keywords from this batch + if (batchResult.trending_keywords) { + accumulatedTrendingKeywords.push(...batchResult.trending_keywords); + } + if (batchResult.batch_meta) { + totalTweetsInRun += batchResult.batch_meta.total_tweets; + } + + this.logger.log( + `Batch ${i + 1}/${batches.length} completed ` + + `(analyzed: ${batchResult.analyzedTweetsCount}, keywords: ${batchResult.trending_keywords?.length || 0})`, + ); + } catch (error) { + this.logger.error( + `Batch ${i + 1}/${batches.length} failed`, + error instanceof Error ? error.stack : String(error), + ); + this.logger.warn( + `Stopping after batch ${i + 1} failure. ` + + `${batches.length - i - 1} batch(es) will retry in next job`, + ); + allBatchesSucceeded = false; + break; } + } + // Update trending scores for this run + if (accumulatedTrendingKeywords.length > 0) { this.logger.log( - `Batch ${i + 1}/${batches.length} completed ` + - `(analyzed: ${batchResult.analyzedTweetsCount}, keywords: ${batchResult.trending_keywords?.length || 0})`, - ); - } catch (error) { - this.logger.error( - `Batch ${i + 1}/${batches.length} failed`, - error instanceof Error ? error.stack : String(error), + `Updating trending scores with ${accumulatedTrendingKeywords.length} keywords from run ${runNumber}`, ); + await this.trendingService.updateTrendScores({ + batch_meta: { total_tweets: totalTweetsInRun }, + trending_keywords: accumulatedTrendingKeywords, + }); + this.logger.log('Trending scores updated successfully'); + } + + totalProcessedAcrossRuns += runAnalyzedTweets; + + if (!allBatchesSucceeded) { this.logger.warn( - `Stopping after batch ${i + 1} failure. ` + - `${batches.length - i - 1} batch(es) will retry in next run`, + `Run ${runNumber} stopped due to failure (${runAnalyzedTweets} tweets processed in this run)`, ); - allBatchesSucceeded = false; break; } - } - // Update trending scores once with accumulated data from all batches - if (accumulatedTrendingKeywords.length > 0) { - this.logger.log( - `Updating trending scores with ${accumulatedTrendingKeywords.length} accumulated keywords`, - ); - await this.trendingService.updateTrendScores({ - batch_meta: { total_tweets: totalTweetsInAllBatches }, - trending_keywords: accumulatedTrendingKeywords, - }); - this.logger.log('Trending scores updated successfully'); - } + this.logger.log(`Run ${runNumber} completed successfully (${runAnalyzedTweets} tweets)`); - if (allBatchesSucceeded) { - this.logger.log( - `=== Tweet Analysis Job Completed Successfully (${totalAnalyzedTweets} tweets) ===`, - ); - } else { - this.logger.warn( - `=== Tweet Analysis Job Stopped (${totalAnalyzedTweets} tweets processed) ===`, - ); + // If no more tweets remain, exit the loop + if (!hasMoreTweets) { + this.logger.log( + `=== Tweet Analysis Job Completed Successfully (${totalProcessedAcrossRuns} total tweets across ${runNumber} run(s)) ===`, + ); + break; + } + + this.logger.log('More tweets remain, continuing to next run immediately...'); } } catch (error) { this.logger.error( @@ -159,6 +214,7 @@ export class TweetAnalyzeService implements OnModuleInit { error instanceof Error ? error.stack : String(error), ); } finally { + this.stopLockExtension(); await this.releaseLock(); } } @@ -199,6 +255,32 @@ export class TweetAnalyzeService implements OnModuleInit { } } + private startLockExtension(): void { + this.lockExtensionInterval = setInterval(() => { + void (async () => { + try { + const redis = this.redisService.getClient(); + await redis.expire(this.LOCK_KEY, this.LOCK_TTL_SECONDS); + this.logger.debug(`Extended distributed lock TTL to ${this.LOCK_TTL_SECONDS}s`); + } catch (error) { + this.logger.error( + 'Failed to extend distributed lock TTL', + error instanceof Error ? error.stack : String(error), + ); + } + })(); + }, this.LOCK_EXTENSION_INTERVAL * 1000); + this.logger.debug(`Started lock extension (every ${this.LOCK_EXTENSION_INTERVAL}s)`); + } + + private stopLockExtension(): void { + if (this.lockExtensionInterval) { + clearInterval(this.lockExtensionInterval); + this.lockExtensionInterval = null; + this.logger.debug('Stopped lock extension'); + } + } + private async getTweetsToAnalyze(): Promise> { this.logger.debug('Fetching tweets to analyze from repository'); const tweets = await this.repository.findTweetsToClassify();