Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/aggregator/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ PORT=3001
INGESTOR_WS_URL=ws://localhost:3000
INGESTOR_HTTP_URL=http://localhost:3000

# Redis (optional; used for caching/session when set; health check verifies connectivity)
# REDIS_URL=redis://localhost:6379

# Signer Service (for publishing aggregated data)
SIGNER_URL=http://localhost:3002
24 changes: 24 additions & 0 deletions apps/aggregator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@ The Aggregator service is responsible for calculating a single consensus price p
- Per-source weight configuration
- Custom aggregation parameters

## API Endpoints (Health, Metrics & Debug)

| Endpoint | Method | Purpose |
|----------|--------|---------|
| `/health` | GET | Full health check. Returns **200** if all configured dependencies (Redis, Ingestor) are healthy, **503** otherwise. Used for overall service health. |
| `/ready` | GET | Readiness probe for Kubernetes. Same checks as `/health`; returns 200 when the service can accept traffic. |
| `/live` | GET | Liveness probe for Kubernetes. Returns 200 when the process is alive (no dependency checks). |
| `/status` | GET | Detailed system information: uptime, memory usage, dependency check results, and version. |
| `/metrics` | GET | Prometheus metrics in [exposition format](https://prometheus.io/docs/instrumenting/exposition_formats/). Scrape this endpoint for aggregation count, latency, errors, and default Node.js metrics. |
| `/debug/prices` | GET | Last aggregated and normalized prices held in memory. Useful for debugging without hitting external systems. |

**Health checks**: When `REDIS_URL` or `INGESTOR_URL` are set, the health check verifies connectivity. If a configured dependency is unreachable, `/health` and `/ready` return 503. If not set, that dependency is skipped (not included in the check).

## Architecture

```
Expand All @@ -43,6 +56,17 @@ aggregator/
│ │ └── trimmed-mean.aggregator.ts
│ ├── services/
│ │ └── aggregation.service.ts # Main aggregation service
│ ├── health/ # Health checks (Terminus)
│ │ ├── health.controller.ts
│ │ └── indicators/
│ │ ├── redis.health.ts
│ │ └── ingestor.health.ts
│ ├── metrics/ # Prometheus metrics
│ │ ├── metrics.controller.ts
│ │ └── metrics.service.ts
│ ├── debug/ # Debug endpoints
│ │ ├── debug.controller.ts
│ │ └── debug.service.ts
│ ├── config/
│ │ └── source-weights.config.ts # Weight configuration
│ └── app.module.ts
Expand Down
4 changes: 4 additions & 0 deletions apps/aggregator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
"@nestjs/core": "^10.0.0",
"@nestjs/event-emitter": "^3.0.1",
"@nestjs/platform-express": "^10.0.0",
"@nestjs/terminus": "^10.0.0",
"axios": "^1.6.0",
"ioredis": "^5.3.2",
"prom-client": "^15.1.0",
"axios": "^1.13.4",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.3",
Expand Down
10 changes: 7 additions & 3 deletions apps/aggregator/src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { HttpModule } from '@nestjs/axios';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { DataReceptionService } from './services/data-reception.service';
import { AggregationService } from './services/aggregation.service';
import { WeightedAverageAggregator } from './strategies/aggregators/weighted-average.aggregator';
import { MedianAggregator } from './strategies/aggregators/median.aggregator';
import { TrimmedMeanAggregator } from './strategies/aggregators/trimmed-mean.aggregator';
import { HealthModule } from './health/health.module';
import { MetricsModule } from './metrics/metrics.module';
import { DebugModule } from './debug/debug.module';

@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true, envFilePath: '.env' }),
HealthModule,
MetricsModule,
DebugModule,
ConfigModule.forRoot({
isGlobal: true,
}),
Expand Down
57 changes: 57 additions & 0 deletions apps/aggregator/src/debug/debug.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { Test, TestingModule } from '@nestjs/testing';
import { DebugController } from './debug.controller';
import { DebugService } from './debug.service';

describe('DebugController', () => {
let controller: DebugController;
let debugService: DebugService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
controllers: [DebugController],
providers: [DebugService],
}).compile();

controller = module.get<DebugController>(DebugController);
debugService = module.get<DebugService>(DebugService);
});

it('should be defined', () => {
expect(controller).toBeDefined();
});

describe('GET /debug/prices', () => {
it('should return last prices from DebugService', () => {
const result = controller.getLastPrices();
expect(result).toMatchObject({
aggregated: expect.any(Object),
normalized: expect.any(Object),
updatedAt: expect.any(Number),
});
expect(result.aggregated).toEqual({});
expect(result.normalized).toEqual({});
});

it('should return stored prices after they are set', () => {
debugService.setLastAggregated('AAPL', {
symbol: 'AAPL',
price: 150.25,
method: 'weighted-average',
confidence: 95,
metrics: {
standardDeviation: 0.05,
spread: 0.1,
sourceCount: 3,
variance: 0.0025,
},
startTimestamp: 0,
endTimestamp: 0,
sources: ['S1', 'S2', 'S3'],
computedAt: Date.now(),
});
const result = controller.getLastPrices();
expect(Object.keys(result.aggregated)).toContain('AAPL');
expect(result.aggregated['AAPL'].price).toBe(150.25);
});
});
});
22 changes: 22 additions & 0 deletions apps/aggregator/src/debug/debug.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Controller, Get, HttpCode, HttpStatus } from '@nestjs/common';
import { DebugService } from './debug.service';

/**
* Debug controller for development and troubleshooting.
*
* - GET /debug/prices - Returns last aggregated and normalized prices held in memory.
*/
@Controller('debug')
export class DebugController {
constructor(private readonly debugService: DebugService) {}

/**
* Returns the last aggregated prices and last normalized prices per symbol.
* Useful for verifying recent aggregation results without hitting external systems.
*/
@Get('prices')
@HttpCode(HttpStatus.OK)
getLastPrices() {
return this.debugService.getLastPrices();
}
}
10 changes: 10 additions & 0 deletions apps/aggregator/src/debug/debug.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Module } from '@nestjs/common';
import { DebugController } from './debug.controller';
import { DebugService } from './debug.service';

@Module({
controllers: [DebugController],
providers: [DebugService],
exports: [DebugService],
})
export class DebugModule {}
54 changes: 54 additions & 0 deletions apps/aggregator/src/debug/debug.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { Injectable } from '@nestjs/common';
import { AggregatedPrice } from '../interfaces/aggregated-price.interface';
import { NormalizedPrice } from '../interfaces/normalized-price.interface';

export interface LastPricesDto {
aggregated: Record<string, AggregatedPrice>;
normalized: Record<string, NormalizedPrice[]>;
updatedAt: number;
}

/**
* In-memory store for last aggregated and normalized prices, used by the debug endpoint.
*/
@Injectable()
export class DebugService {
private lastAggregated: Map<string, AggregatedPrice> = new Map();
private lastNormalized: Map<string, NormalizedPrice[]> = new Map();
private updatedAt = 0;

/**
* Record an aggregated result for a symbol (called by aggregation flow).
*/
setLastAggregated(symbol: string, result: AggregatedPrice): void {
this.lastAggregated.set(symbol, result);
this.updatedAt = Date.now();
}

/**
* Record normalized prices for a symbol (called before aggregation).
*/
setLastNormalized(symbol: string, prices: NormalizedPrice[]): void {
this.lastNormalized.set(symbol, [...prices]);
this.updatedAt = Date.now();
}

/**
* Get last aggregated and normalized prices for the debug endpoint.
*/
getLastPrices(): LastPricesDto {
const aggregated: Record<string, AggregatedPrice> = {};
for (const [symbol, value] of this.lastAggregated) {
aggregated[symbol] = value;
}
const normalized: Record<string, NormalizedPrice[]> = {};
for (const [symbol, value] of this.lastNormalized) {
normalized[symbol] = value;
}
return {
aggregated,
normalized,
updatedAt: this.updatedAt,
};
}
}
125 changes: 125 additions & 0 deletions apps/aggregator/src/health/health.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import { Test, TestingModule } from '@nestjs/testing';
import { ServiceUnavailableException } from '@nestjs/common';
import { HealthCheckService, HealthCheckResult } from '@nestjs/terminus';
import { HealthController } from './health.controller';
import { RedisHealthIndicator } from './indicators/redis.health';
import { IngestorHealthIndicator } from './indicators/ingestor.health';

describe('HealthController', () => {
let controller: HealthController;
let healthCheckService: HealthCheckService;

const mockRedisHealthy = {
redis: { status: 'up' as const, message: 'Redis is reachable' },
};
const mockIngestorHealthy = {
ingestor: { status: 'up' as const, message: 'Ingestor is reachable' },
};
const healthyResult: HealthCheckResult = {
status: 'ok',
info: { ...mockRedisHealthy, ...mockIngestorHealthy },
error: {},
details: { ...mockRedisHealthy, ...mockIngestorHealthy },
};
const unhealthyResult: HealthCheckResult = {
status: 'error',
info: {},
error: mockRedisHealthy,
details: { ...mockRedisHealthy },
};

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
controllers: [HealthController],
providers: [
{
provide: HealthCheckService,
useValue: {
check: jest.fn(),
},
},
{
provide: RedisHealthIndicator,
useValue: { isHealthy: jest.fn().mockResolvedValue(mockRedisHealthy) },
},
{
provide: IngestorHealthIndicator,
useValue: {
isHealthy: jest.fn().mockResolvedValue(mockIngestorHealthy),
},
},
],
}).compile();

controller = module.get<HealthController>(HealthController);
healthCheckService = module.get<HealthCheckService>(HealthCheckService);
jest.mocked(healthCheckService.check).mockResolvedValue(healthyResult);
});

it('should be defined', () => {
expect(controller).toBeDefined();
});

describe('GET /health', () => {
it('should return 200 and health result when all checks pass', async () => {
const result = await controller.check();
expect(result).toEqual(healthyResult);
expect(result.status).toBe('ok');
expect(healthCheckService.check).toHaveBeenCalledWith([
expect.any(Function),
expect.any(Function),
]);
});

it('should throw ServiceUnavailableException when a check fails', async () => {
jest.mocked(healthCheckService.check).mockResolvedValue(unhealthyResult);
await expect(controller.check()).rejects.toThrow(ServiceUnavailableException);
});
});

describe('GET /ready', () => {
it('should return 200 and health result when ready', async () => {
const result = await controller.ready();
expect(result).toEqual(healthyResult);
expect(result.status).toBe('ok');
});

it('should throw ServiceUnavailableException when not ready', async () => {
jest.mocked(healthCheckService.check).mockResolvedValue(unhealthyResult);
await expect(controller.ready()).rejects.toThrow(ServiceUnavailableException);
});
});

describe('GET /live', () => {
it('should return 200 with status ok', () => {
const result = controller.live();
expect(result).toEqual({ status: 'ok' });
});

it('should not call any health indicators', () => {
controller.live();
expect(healthCheckService.check).not.toHaveBeenCalled();
});
});

describe('GET /status', () => {
it('should return detailed status with uptime, memory, and checks', async () => {
const result = await controller.status();
expect(result).toMatchObject({
status: 'ok',
checks: healthyResult,
});
expect(typeof result.uptimeSeconds).toBe('number');
expect(result.uptimeSeconds).toBeGreaterThanOrEqual(0);
expect(typeof result.timestamp).toBe('number');
expect(result.version).toBeDefined();
expect(result.memory).toMatchObject({
rss: expect.any(Number),
heapTotal: expect.any(Number),
heapUsed: expect.any(Number),
external: expect.any(Number),
arrayBuffers: expect.any(Number),
});
});
});
});
Loading
Loading