Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 141 additions & 59 deletions src/tweet-analyze/tweet-analyze.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ export class TweetAnalyzeService implements OnModuleInit {
private readonly analyzeApiUrl: string;
private readonly LOCK_KEY = 'tweet-analyze:lock';
private readonly LOCK_TTL_SECONDS = 300; // 5 minutes
private readonly LOCK_EXTENSION_INTERVAL = 60; // Extend lock every 1 minute
private readonly LIMIT_PER_JOB = 100; // Max tweets to process per job run
private lockExtensionInterval: NodeJS.Timeout | null = null;

constructor(
private readonly configService: ConfigService,
Expand All @@ -48,6 +51,15 @@ export class TweetAnalyzeService implements OnModuleInit {
const intervalMs = this.intervalMinutes * 60 * 1000;
this.logger.log(`Starting tweet analysis cron job (interval: ${this.intervalMinutes} min)`);

// Run first job immediately
this.analyzeTweets().catch((error) => {
this.logger.error(
'Initial tweet analysis failed',
error instanceof Error ? error.stack : String(error),
);
});

// Schedule periodic jobs
setInterval(() => {
this.analyzeTweets().catch((error) => {
this.logger.error(
Expand Down Expand Up @@ -76,89 +88,133 @@ export class TweetAnalyzeService implements OnModuleInit {
this.logger.log('=== Starting Tweet Analysis Job ===');

try {
const tweetsToAnalyze = await this.getTweetsToAnalyze();

if (tweetsToAnalyze.length === 0) {
this.logger.log('No tweets to analyze');
return;
}

this.logger.log(`Retrieved ${tweetsToAnalyze.length} tweets for analysis`);
// Start lock extension mechanism
this.startLockExtension();

let totalProcessedAcrossRuns = 0;
let runNumber = 0;

// Keep processing until no more tweets remain
while (true) {
runNumber++;
const tweetsToAnalyze = await this.getTweetsToAnalyze();

if (tweetsToAnalyze.length === 0) {
if (runNumber === 1) {
this.logger.log('No tweets to analyze');
} else {
this.logger.log(
`All tweets processed across ${runNumber - 1} run(s) (${totalProcessedAcrossRuns} total tweets)`,
);
}
break;
}

const batches = this.splitIntoBatches(tweetsToAnalyze, this.requestLimit);
// Apply limit per job run
const tweetsToProcess =
tweetsToAnalyze.length > this.LIMIT_PER_JOB
? tweetsToAnalyze.slice(0, this.LIMIT_PER_JOB)
: tweetsToAnalyze;

this.logger.log(
`Split into ${batches.length} batch(es) (limit: ${this.requestLimit} tweets/batch)`,
);
const hasMoreTweets = tweetsToAnalyze.length > this.LIMIT_PER_JOB;

let totalAnalyzedTweets = 0;
let allBatchesSucceeded = true;
this.logger.log(`--- Starting run ${runNumber} ---`);

// Accumulate trending data across all batches
const accumulatedTrendingKeywords: TrendingKeyword[] = [];
let totalTweetsInAllBatches = 0;
if (hasMoreTweets) {
this.logger.log(
`Retrieved ${tweetsToAnalyze.length} tweets, processing ${this.LIMIT_PER_JOB} in this run`,
);
} else {
this.logger.log(`Retrieved ${tweetsToAnalyze.length} tweets for analysis`);
}

for (let i = 0; i < batches.length; i++) {
const batch = batches[i];
this.logger.log(`Processing batch ${i + 1}/${batches.length} (${batch.length} tweets)`);
const batches = this.splitIntoBatches(tweetsToProcess, this.requestLimit);

try {
const batchResult = await this.processBatch(batch);
totalAnalyzedTweets += batchResult.analyzedTweetsCount;
this.logger.log(
`Split into ${batches.length} batch(es) (limit: ${this.requestLimit} tweets/batch)`,
);

// Accumulate trending keywords from this batch
if (batchResult.trending_keywords) {
accumulatedTrendingKeywords.push(...batchResult.trending_keywords);
}
if (batchResult.batch_meta) {
totalTweetsInAllBatches += batchResult.batch_meta.total_tweets;
let runAnalyzedTweets = 0;
let allBatchesSucceeded = true;

// Accumulate trending data across all batches in this run
const accumulatedTrendingKeywords: TrendingKeyword[] = [];
let totalTweetsInRun = 0;

for (let i = 0; i < batches.length; i++) {
const batch = batches[i];
this.logger.log(`Processing batch ${i + 1}/${batches.length} (${batch.length} tweets)`);

try {
const batchResult = await this.processBatch(batch);
runAnalyzedTweets += batchResult.analyzedTweetsCount;

// Accumulate trending keywords from this batch
if (batchResult.trending_keywords) {
accumulatedTrendingKeywords.push(...batchResult.trending_keywords);
}
if (batchResult.batch_meta) {
totalTweetsInRun += batchResult.batch_meta.total_tweets;
}

this.logger.log(
`Batch ${i + 1}/${batches.length} completed ` +
`(analyzed: ${batchResult.analyzedTweetsCount}, keywords: ${batchResult.trending_keywords?.length || 0})`,
);
} catch (error) {
this.logger.error(
`Batch ${i + 1}/${batches.length} failed`,
error instanceof Error ? error.stack : String(error),
);
this.logger.warn(
`Stopping after batch ${i + 1} failure. ` +
`${batches.length - i - 1} batch(es) will retry in next job`,
);
allBatchesSucceeded = false;
break;
}
}

// Update trending scores for this run
if (accumulatedTrendingKeywords.length > 0) {
this.logger.log(
`Batch ${i + 1}/${batches.length} completed ` +
`(analyzed: ${batchResult.analyzedTweetsCount}, keywords: ${batchResult.trending_keywords?.length || 0})`,
);
} catch (error) {
this.logger.error(
`Batch ${i + 1}/${batches.length} failed`,
error instanceof Error ? error.stack : String(error),
`Updating trending scores with ${accumulatedTrendingKeywords.length} keywords from run ${runNumber}`,
);
await this.trendingService.updateTrendScores({
batch_meta: { total_tweets: totalTweetsInRun },
trending_keywords: accumulatedTrendingKeywords,
});
this.logger.log('Trending scores updated successfully');
}

totalProcessedAcrossRuns += runAnalyzedTweets;

if (!allBatchesSucceeded) {
this.logger.warn(
`Stopping after batch ${i + 1} failure. ` +
`${batches.length - i - 1} batch(es) will retry in next run`,
`Run ${runNumber} stopped due to failure (${runAnalyzedTweets} tweets processed in this run)`,
);
allBatchesSucceeded = false;
break;
}
}

// Update trending scores once with accumulated data from all batches
if (accumulatedTrendingKeywords.length > 0) {
this.logger.log(
`Updating trending scores with ${accumulatedTrendingKeywords.length} accumulated keywords`,
);
await this.trendingService.updateTrendScores({
batch_meta: { total_tweets: totalTweetsInAllBatches },
trending_keywords: accumulatedTrendingKeywords,
});
this.logger.log('Trending scores updated successfully');
}
this.logger.log(`Run ${runNumber} completed successfully (${runAnalyzedTweets} tweets)`);

if (allBatchesSucceeded) {
this.logger.log(
`=== Tweet Analysis Job Completed Successfully (${totalAnalyzedTweets} tweets) ===`,
);
} else {
this.logger.warn(
`=== Tweet Analysis Job Stopped (${totalAnalyzedTweets} tweets processed) ===`,
);
// If no more tweets remain, exit the loop
if (!hasMoreTweets) {
this.logger.log(
`=== Tweet Analysis Job Completed Successfully (${totalProcessedAcrossRuns} total tweets across ${runNumber} run(s)) ===`,
);
break;
}

this.logger.log('More tweets remain, continuing to next run immediately...');
}
} catch (error) {
this.logger.error(
'Tweet analysis job failed',
error instanceof Error ? error.stack : String(error),
);
} finally {
this.stopLockExtension();
await this.releaseLock();
}
}
Expand Down Expand Up @@ -199,6 +255,32 @@ export class TweetAnalyzeService implements OnModuleInit {
}
}

private startLockExtension(): void {
this.lockExtensionInterval = setInterval(() => {
void (async () => {
try {
const redis = this.redisService.getClient();
await redis.expire(this.LOCK_KEY, this.LOCK_TTL_SECONDS);
this.logger.debug(`Extended distributed lock TTL to ${this.LOCK_TTL_SECONDS}s`);
} catch (error) {
this.logger.error(
'Failed to extend distributed lock TTL',
error instanceof Error ? error.stack : String(error),
);
}
})();
}, this.LOCK_EXTENSION_INTERVAL * 1000);
this.logger.debug(`Started lock extension (every ${this.LOCK_EXTENSION_INTERVAL}s)`);
}

private stopLockExtension(): void {
if (this.lockExtensionInterval) {
clearInterval(this.lockExtensionInterval);
this.lockExtensionInterval = null;
this.logger.debug('Stopped lock extension');
}
}

private async getTweetsToAnalyze(): Promise<Array<{ id: bigint; content: string }>> {
this.logger.debug('Fetching tweets to analyze from repository');
const tweets = await this.repository.findTweetsToClassify();
Expand Down
Loading