diff --git a/README.md b/README.md index 94ed1e9..7c31a26 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Sensor Flow API -**Sensor Flow API** é uma API desenvolvida para receber e gerenciar registros de sensores. +**Sensor Flow API** é uma API desenvolvida para receber, processar e gerenciar registros de sensores. --- @@ -12,6 +12,7 @@ A API conta com um sistema de autenticação baseado em **Cognito**, com suporte - **Login**: Autenticação de usuários utilizando `username` e `password`. - Gera um token JWT com validade de 1 hora. - O token é armazenado em um cookie seguro `httpOnly`. + #### **Endpoints** - **POST `/auth/login`** @@ -59,6 +60,43 @@ A API conta com um sistema de autenticação baseado em **Cognito**, com suporte --- +### **Listagem de Dados Agregados** + +#### **Endpoints** + +- **GET `/sensor-data/aggregated`** + - Consulta dados agregados da tabela `aggregated` no DynamoDB. + - Suporta filtros dinâmicos: + - `24h`: últimas 24 horas + - `48h`: últimas 48 horas + - `1w`: última semana + - `1m`: último mês + - Parâmetros de consulta: + - `interval`: Especifica o intervalo de tempo (`24h`, `48h`, `1w`, `1m`). + + - Exemplo de requisição: + ```http + GET /sensor-data/aggregated?interval=24h + ``` + + - Exemplo de resposta: + ```json + { + "average": 78.42, + "totalCount": 20, + "items": [ + { + "equipmentId": "EQ-12495", + "intervalStart": 1693468800, + "totalValue": 1500.75, + "sampleCount": 20 + } + ] + } + ``` + +--- + ### **Processamento de Arquivos com Lambda** Uma função Lambda em Python é responsável por processar os arquivos CSV enviados para o S3. Esta função: @@ -70,7 +108,26 @@ Uma função Lambda em Python é responsável por processar os arquivos CSV envi - `value` - `register_time` -### **Estrutura Modular** +--- + +### **Popular Dados Agregados** + +Uma função Lambda em python é acionada via **DynamoDB Streams** para popular a tabela de agregados com base nos dados da tabela de escrita. Esta função: + +1. Processa eventos do **DynamoDB Streams**. +2. Para cada evento: + - Extrai as informações do equipamento, timestamp e valor. + - Determina o intervalo de tempo (`intervalStart` arredondado para a hora cheia). + - Atualiza ou cria o registro correspondente na tabela de agregados. +3. A tabela de agregados tem as seguintes colunas: + - `partitionKey`: Valor fixo (`GLOBAL`) para consultas eficientes. + - `equipmentId`: Identificação do equipamento. + - `intervalStart`: Timestamp do início do intervalo. + - `totalValue`: Soma dos valores agregados. + - `sampleCount`: Número de amostras no intervalo. + +## **Estrutura Modular** + A aplicação foi projetada com uma estrutura modular utilizando o framework **NestJS**. #### **Módulos** @@ -130,6 +187,7 @@ A aplicação foi projetada com uma estrutura modular utilizando o framework **N COGNITO_CLIENT_SECRET=seu-client-secret COGNITO_AUTH_URI=seu-auth-uri DYNAMODB_TABLE_NAME=sensor-data-table + DYNAMODB_AGGREGATE_TABLE_NAME=aggregated-data-table S3_BUCKET_NAME=sensor-data-bucket ``` diff --git a/etl_lambda/app.py b/lambdas/csv_etl/app.py similarity index 100% rename from etl_lambda/app.py rename to lambdas/csv_etl/app.py diff --git a/etl_lambda/requirements.txt b/lambdas/csv_etl/requirements.txt similarity index 100% rename from etl_lambda/requirements.txt rename to lambdas/csv_etl/requirements.txt diff --git a/lambdas/data_aggregator/app.py b/lambdas/data_aggregator/app.py new file mode 100644 index 0000000..8e656ff --- /dev/null +++ b/lambdas/data_aggregator/app.py @@ -0,0 +1,69 @@ +import boto3 +import os +from datetime import datetime, timezone +from decimal import Decimal + +dynamodb = boto3.resource("dynamodb") + +def lambda_handler(event, context): + + for record in event["Records"]: + if record["eventName"] == "INSERT" or record["eventName"] == "MODIFY": + new_image = record["dynamodb"]["NewImage"] + + equipment_id = new_image["equipmentId"]["S"] + timestamp_str = new_image["timestamp"]["S"] + value = Decimal(new_image["value"]["N"]) + + timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) + interval_start = timestamp.replace( + minute=0, second=0, microsecond=0, tzinfo=timezone.utc + ) + # converting to unix timestamp for more efficient storage and querying + interval_start_unix = int(interval_start.timestamp()) + + update_aggregate_table(equipment_id, interval_start_unix, value) + + +def update_aggregate_table(equipment_id, interval_start_unix, value): + try: + aggregate_table = dynamodb.Table(os.environ["AGGREGATE_TABLE_NAME"]) + aggregate_table.update_item( + Key={"equipmentId": equipment_id, "intervalStartTime": interval_start_unix}, + UpdateExpression="SET totalValue = if_not_exists(totalValue, :zero) + :val, sampleCount = if_not_exists(sampleCount, :zero) + :one, partitionKey = :global", + ExpressionAttributeValues={ + ":val": value, + ":one": Decimal(1), + ":zero": Decimal(0), + ":global": "GLOBAL", + }, + ReturnValues="UPDATED_NEW", + ) + except Exception as e: + print(f"Erro ao atualizar a tabela de agregados: {e}") + + +if __name__ == "__main__": + os.environ["AGGREGATE_TABLE_NAME"] = "sensor-flow-aggregates" + payload = { + "equipmentId": "EQ-12495", + "timestamp": "2023-02-15T01:30:00.000-05:00", + "value": 78.42, + } + + event = { + "Records": [ + { + "eventName": "INSERT", + "dynamodb": { + "NewImage": { + "equipmentId": {"S": payload["equipmentId"]}, + "timestamp": {"S": payload["timestamp"]}, + "value": {"N": str(payload["value"])}, + } + }, + } + ] + } + + lambda_handler(event, None) diff --git a/lambdas/data_aggregator/requirements.txt b/lambdas/data_aggregator/requirements.txt new file mode 100644 index 0000000..4dbe667 --- /dev/null +++ b/lambdas/data_aggregator/requirements.txt @@ -0,0 +1,2 @@ +boto3 +pandas \ No newline at end of file diff --git a/src/app.module.ts b/src/app.module.ts index 910699d..dff0ce7 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -3,11 +3,12 @@ import { HealthModule } from './modules/health/health.module'; import { ConfigModule } from '@nestjs/config'; import { AuthModule } from './modules/auth/auth.module'; import { SensorDataModule } from './modules/sensor-data/sensor-data.module'; - +import { SensorAggregatedModule } from './modules/sensor-aggregated/sensor-aggregated.module'; @Module({ imports: [ AuthModule, SensorDataModule, + SensorAggregatedModule, HealthModule, ConfigModule.forRoot({ isGlobal: true, diff --git a/src/modules/sensor-aggregated/dto/create-aggregated.dto.ts b/src/modules/sensor-aggregated/dto/create-aggregated.dto.ts new file mode 100644 index 0000000..0bd5b0d --- /dev/null +++ b/src/modules/sensor-aggregated/dto/create-aggregated.dto.ts @@ -0,0 +1,7 @@ +import { IsString, IsISO8601, IsNumber } from 'class-validator'; +import { IsOptional } from 'class-validator'; + +export class GetSensorAggregatedDto { + @IsString() + interval?: '24h' | '48h' | '1w' | '1m'; +} diff --git a/src/modules/sensor-aggregated/sensor-aggragated.service.spec.ts b/src/modules/sensor-aggregated/sensor-aggragated.service.spec.ts new file mode 100644 index 0000000..65c3527 --- /dev/null +++ b/src/modules/sensor-aggregated/sensor-aggragated.service.spec.ts @@ -0,0 +1,51 @@ +import { SensorAggregatedService } from './sensor-aggragated.service'; +import { ConfigService } from '@nestjs/config'; +import { DynamoDB } from 'aws-sdk'; + +jest.mock('@nestjs/config'); +jest.mock('aws-sdk'); + +describe('SensorAggregatedService', () => { + let service: SensorAggregatedService; + let configService: ConfigService; + let dynamoDb: DynamoDB.DocumentClient; + + beforeEach(() => { + configService = new ConfigService(); + dynamoDb = new DynamoDB.DocumentClient(); + + (configService.get as jest.Mock).mockReturnValue('TestTable'); + + service = new SensorAggregatedService(configService, dynamoDb); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); + + it('should return correct average for valid interval "24h"', async () => { + const mockItems = [ + { totalValue: 100, sampleCount: 10 }, + { totalValue: 200, sampleCount: 20 }, + ]; + + (dynamoDb.query as jest.Mock).mockReturnValue({ + promise: jest.fn().mockResolvedValue({ + Items: mockItems, + LastEvaluatedKey: null, + }), + }); + + const result = await service.get({ interval: '24h' }); + + expect(result.average).toBe(300 / 30); + expect(result.totalCount).toBe(30); + expect(result.items).toEqual(mockItems); + }); + + it('should throw an error for invalid interval', async () => { + await expect(service.get({ interval: 'invalid' as any })).rejects.toThrow( + 'Invalid interval provided. Use "24h", "48h", "1w", or "1m".', + ); + }); +}); diff --git a/src/modules/sensor-aggregated/sensor-aggragated.service.ts b/src/modules/sensor-aggregated/sensor-aggragated.service.ts new file mode 100644 index 0000000..0e6f36b --- /dev/null +++ b/src/modules/sensor-aggregated/sensor-aggragated.service.ts @@ -0,0 +1,91 @@ +import { Injectable, Inject } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { DynamoDB } from 'aws-sdk'; +import { GetSensorAggregatedDto } from './dto/create-aggregated.dto'; + +@Injectable() +export class SensorAggregatedService { + private tableName: string; + + constructor( + private configService: ConfigService, + @Inject('DynamoDBDocumentClient') private dynamoDb: DynamoDB.DocumentClient, + ) { + this.tableName = this.configService.get( + 'DYNAMODB_AGGREGATE_TABLE_NAME', + ); + } + + async get(query: GetSensorAggregatedDto): Promise { + const { interval } = query; + + const intervalMap: Record = { + '24h': 24, + '48h': 48, + '1w': 168, + '1m': 720, + }; + + const hours = intervalMap[interval]; + if (!hours) { + throw new Error( + 'Invalid interval provided. Use "24h", "48h", "1w", or "1m".', + ); + } + + const now = Math.floor(Date.now() / 1000); + const startTime = now - hours * 3600; + + try { + let items: DynamoDB.DocumentClient.ItemList = []; + let lastEvaluatedKey: DynamoDB.DocumentClient.Key | undefined = undefined; + let params: DynamoDB.DocumentClient.QueryInput; + + params = { + TableName: this.tableName, + IndexName: 'partitionKey-intervalStartTime-index', + KeyConditionExpression: + '#partitionKey = :partitionKey AND #intervalStartTime >= :startTime', + ExpressionAttributeNames: { + '#partitionKey': 'partitionKey', + '#intervalStartTime': 'intervalStartTime', + }, + ExpressionAttributeValues: { + ':partitionKey': 'GLOBAL', + ':startTime': startTime, + }, + }; + + do { + if (lastEvaluatedKey) { + params.ExclusiveStartKey = lastEvaluatedKey; + } + + const result = await this.dynamoDb.query(params).promise(); + + items = items.concat(result.Items || []); + lastEvaluatedKey = result.LastEvaluatedKey; + } while (lastEvaluatedKey); + + const totalValue = items.reduce( + (sum, item) => sum + (item.totalValue || 0), + 0, + ); + const totalCount = items.reduce( + (sum, item) => sum + (item.sampleCount || 0), + 0, + ); + const average = totalCount > 0 ? totalValue / totalCount : 0; + + return { + average, + totalCount, + items, + }; + } catch (error) { + throw new Error( + `Error retrieving aggregated sensor data in DynamoDB: ${error.message}`, + ); + } + } +} diff --git a/src/modules/sensor-aggregated/sensor-aggregated.controller.ts b/src/modules/sensor-aggregated/sensor-aggregated.controller.ts new file mode 100644 index 0000000..2f057ab --- /dev/null +++ b/src/modules/sensor-aggregated/sensor-aggregated.controller.ts @@ -0,0 +1,16 @@ +import { Controller, Get, Query, UseGuards } from '@nestjs/common'; +import { AuthGuard } from '@nestjs/passport'; +import { SensorAggregatedService } from './sensor-aggragated.service'; +import { GetSensorAggregatedDto } from './dto/create-aggregated.dto'; + +@Controller('sensor-data-aggregated') +export class SensorAggragatedController { + constructor( + private readonly sensorAggregatedService: SensorAggregatedService, + ) {} + @UseGuards(AuthGuard('jwt')) + @Get() + async create(@Query() query: GetSensorAggregatedDto) { + return this.sensorAggregatedService.get(query); + } +} diff --git a/src/modules/sensor-aggregated/sensor-aggregated.module.ts b/src/modules/sensor-aggregated/sensor-aggregated.module.ts new file mode 100644 index 0000000..a9aebe1 --- /dev/null +++ b/src/modules/sensor-aggregated/sensor-aggregated.module.ts @@ -0,0 +1,29 @@ +import { Module } from '@nestjs/common'; +import { SensorAggragatedController } from './sensor-aggregated.controller'; +import { SensorAggregatedService } from './sensor-aggragated.service'; +import { ConfigModule, ConfigService } from '@nestjs/config'; +import { DynamoDB } from 'aws-sdk'; +import * as https from 'https'; + +@Module({ + imports: [ConfigModule], + controllers: [SensorAggragatedController], + providers: [ + SensorAggregatedService, + { + provide: 'DynamoDBDocumentClient', + useFactory: (configService: ConfigService) => { + return new DynamoDB.DocumentClient({ + region: configService.get('AWS_REGION'), + maxRetries: 3, + httpOptions: { + timeout: 5000, + agent: new https.Agent({ maxSockets: 100 }), + }, + }); + }, + inject: [ConfigService], + }, + ], +}) +export class SensorAggregatedModule {}