Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Sensor Flow API

**Sensor Flow API** é uma API desenvolvida para receber e gerenciar registros de sensores.
**Sensor Flow API** é uma API desenvolvida para receber, processar e gerenciar registros de sensores.

---

Expand All @@ -12,6 +12,7 @@ A API conta com um sistema de autenticação baseado em **Cognito**, com suporte
- **Login**: Autenticação de usuários utilizando `username` e `password`.
- Gera um token JWT com validade de 1 hora.
- O token é armazenado em um cookie seguro `httpOnly`.

#### **Endpoints**

- **POST `/auth/login`**
Expand Down Expand Up @@ -59,6 +60,43 @@ A API conta com um sistema de autenticação baseado em **Cognito**, com suporte

---

### **Listagem de Dados Agregados**

#### **Endpoints**

- **GET `/sensor-data/aggregated`**
- Consulta dados agregados da tabela `aggregated` no DynamoDB.
- Suporta filtros dinâmicos:
- `24h`: últimas 24 horas
- `48h`: últimas 48 horas
- `1w`: última semana
- `1m`: último mês
- Parâmetros de consulta:
- `interval`: Especifica o intervalo de tempo (`24h`, `48h`, `1w`, `1m`).

- Exemplo de requisição:
```http
GET /sensor-data/aggregated?interval=24h
```

- Exemplo de resposta:
```json
{
"average": 78.42,
"totalCount": 20,
"items": [
{
"equipmentId": "EQ-12495",
"intervalStart": 1693468800,
"totalValue": 1500.75,
"sampleCount": 20
}
]
}
```

---

### **Processamento de Arquivos com Lambda**

Uma função Lambda em Python é responsável por processar os arquivos CSV enviados para o S3. Esta função:
Expand All @@ -70,7 +108,26 @@ Uma função Lambda em Python é responsável por processar os arquivos CSV envi
- `value`
- `register_time`

### **Estrutura Modular**
---

### **Popular Dados Agregados**

Uma função Lambda em python é acionada via **DynamoDB Streams** para popular a tabela de agregados com base nos dados da tabela de escrita. Esta função:

1. Processa eventos do **DynamoDB Streams**.
2. Para cada evento:
- Extrai as informações do equipamento, timestamp e valor.
- Determina o intervalo de tempo (`intervalStart` arredondado para a hora cheia).
- Atualiza ou cria o registro correspondente na tabela de agregados.
3. A tabela de agregados tem as seguintes colunas:
- `partitionKey`: Valor fixo (`GLOBAL`) para consultas eficientes.
- `equipmentId`: Identificação do equipamento.
- `intervalStart`: Timestamp do início do intervalo.
- `totalValue`: Soma dos valores agregados.
- `sampleCount`: Número de amostras no intervalo.

## **Estrutura Modular**

A aplicação foi projetada com uma estrutura modular utilizando o framework **NestJS**.

#### **Módulos**
Expand Down Expand Up @@ -130,6 +187,7 @@ A aplicação foi projetada com uma estrutura modular utilizando o framework **N
COGNITO_CLIENT_SECRET=seu-client-secret
COGNITO_AUTH_URI=seu-auth-uri
DYNAMODB_TABLE_NAME=sensor-data-table
DYNAMODB_AGGREGATE_TABLE_NAME=aggregated-data-table
S3_BUCKET_NAME=sensor-data-bucket
```

Expand Down
File renamed without changes.
File renamed without changes.
69 changes: 69 additions & 0 deletions lambdas/data_aggregator/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import boto3
import os
from datetime import datetime, timezone
from decimal import Decimal

dynamodb = boto3.resource("dynamodb")

def lambda_handler(event, context):

for record in event["Records"]:
if record["eventName"] == "INSERT" or record["eventName"] == "MODIFY":
new_image = record["dynamodb"]["NewImage"]

equipment_id = new_image["equipmentId"]["S"]
timestamp_str = new_image["timestamp"]["S"]
value = Decimal(new_image["value"]["N"])

timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
interval_start = timestamp.replace(
minute=0, second=0, microsecond=0, tzinfo=timezone.utc
)
# converting to unix timestamp for more efficient storage and querying
interval_start_unix = int(interval_start.timestamp())

update_aggregate_table(equipment_id, interval_start_unix, value)


def update_aggregate_table(equipment_id, interval_start_unix, value):
try:
aggregate_table = dynamodb.Table(os.environ["AGGREGATE_TABLE_NAME"])
aggregate_table.update_item(
Key={"equipmentId": equipment_id, "intervalStartTime": interval_start_unix},
UpdateExpression="SET totalValue = if_not_exists(totalValue, :zero) + :val, sampleCount = if_not_exists(sampleCount, :zero) + :one, partitionKey = :global",
ExpressionAttributeValues={
":val": value,
":one": Decimal(1),
":zero": Decimal(0),
":global": "GLOBAL",
},
ReturnValues="UPDATED_NEW",
)
except Exception as e:
print(f"Erro ao atualizar a tabela de agregados: {e}")


if __name__ == "__main__":
os.environ["AGGREGATE_TABLE_NAME"] = "sensor-flow-aggregates"
payload = {
"equipmentId": "EQ-12495",
"timestamp": "2023-02-15T01:30:00.000-05:00",
"value": 78.42,
}

event = {
"Records": [
{
"eventName": "INSERT",
"dynamodb": {
"NewImage": {
"equipmentId": {"S": payload["equipmentId"]},
"timestamp": {"S": payload["timestamp"]},
"value": {"N": str(payload["value"])},
}
},
}
]
}

lambda_handler(event, None)
2 changes: 2 additions & 0 deletions lambdas/data_aggregator/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3
pandas
3 changes: 2 additions & 1 deletion src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ import { HealthModule } from './modules/health/health.module';
import { ConfigModule } from '@nestjs/config';
import { AuthModule } from './modules/auth/auth.module';
import { SensorDataModule } from './modules/sensor-data/sensor-data.module';

import { SensorAggregatedModule } from './modules/sensor-aggregated/sensor-aggregated.module';
@Module({
imports: [
AuthModule,
SensorDataModule,
SensorAggregatedModule,
HealthModule,
ConfigModule.forRoot({
isGlobal: true,
Expand Down
7 changes: 7 additions & 0 deletions src/modules/sensor-aggregated/dto/create-aggregated.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { IsString, IsISO8601, IsNumber } from 'class-validator';
import { IsOptional } from 'class-validator';

export class GetSensorAggregatedDto {
@IsString()
interval?: '24h' | '48h' | '1w' | '1m';
}
51 changes: 51 additions & 0 deletions src/modules/sensor-aggregated/sensor-aggragated.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { SensorAggregatedService } from './sensor-aggragated.service';
import { ConfigService } from '@nestjs/config';
import { DynamoDB } from 'aws-sdk';

jest.mock('@nestjs/config');
jest.mock('aws-sdk');

describe('SensorAggregatedService', () => {
let service: SensorAggregatedService;
let configService: ConfigService;
let dynamoDb: DynamoDB.DocumentClient;

beforeEach(() => {
configService = new ConfigService();
dynamoDb = new DynamoDB.DocumentClient();

(configService.get as jest.Mock).mockReturnValue('TestTable');

service = new SensorAggregatedService(configService, dynamoDb);
});

it('should be defined', () => {
expect(service).toBeDefined();
});

it('should return correct average for valid interval "24h"', async () => {
const mockItems = [
{ totalValue: 100, sampleCount: 10 },
{ totalValue: 200, sampleCount: 20 },
];

(dynamoDb.query as jest.Mock).mockReturnValue({
promise: jest.fn().mockResolvedValue({
Items: mockItems,
LastEvaluatedKey: null,
}),
});

const result = await service.get({ interval: '24h' });

expect(result.average).toBe(300 / 30);
expect(result.totalCount).toBe(30);
expect(result.items).toEqual(mockItems);
});

it('should throw an error for invalid interval', async () => {
await expect(service.get({ interval: 'invalid' as any })).rejects.toThrow(
'Invalid interval provided. Use "24h", "48h", "1w", or "1m".',
);
});
});
91 changes: 91 additions & 0 deletions src/modules/sensor-aggregated/sensor-aggragated.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { Injectable, Inject } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { DynamoDB } from 'aws-sdk';
import { GetSensorAggregatedDto } from './dto/create-aggregated.dto';

@Injectable()
export class SensorAggregatedService {
private tableName: string;

constructor(
private configService: ConfigService,
@Inject('DynamoDBDocumentClient') private dynamoDb: DynamoDB.DocumentClient,
) {
this.tableName = this.configService.get<string>(
'DYNAMODB_AGGREGATE_TABLE_NAME',
);
}

async get(query: GetSensorAggregatedDto): Promise<any> {
const { interval } = query;

const intervalMap: Record<string, number> = {
'24h': 24,
'48h': 48,
'1w': 168,
'1m': 720,
};

const hours = intervalMap[interval];
if (!hours) {
throw new Error(
'Invalid interval provided. Use "24h", "48h", "1w", or "1m".',
);
}

const now = Math.floor(Date.now() / 1000);
const startTime = now - hours * 3600;

try {
let items: DynamoDB.DocumentClient.ItemList = [];
let lastEvaluatedKey: DynamoDB.DocumentClient.Key | undefined = undefined;
let params: DynamoDB.DocumentClient.QueryInput;

params = {
TableName: this.tableName,
IndexName: 'partitionKey-intervalStartTime-index',
KeyConditionExpression:
'#partitionKey = :partitionKey AND #intervalStartTime >= :startTime',
ExpressionAttributeNames: {
'#partitionKey': 'partitionKey',
'#intervalStartTime': 'intervalStartTime',
},
ExpressionAttributeValues: {
':partitionKey': 'GLOBAL',
':startTime': startTime,
},
};

do {
if (lastEvaluatedKey) {
params.ExclusiveStartKey = lastEvaluatedKey;
}

const result = await this.dynamoDb.query(params).promise();

items = items.concat(result.Items || []);
lastEvaluatedKey = result.LastEvaluatedKey;
} while (lastEvaluatedKey);

const totalValue = items.reduce(
(sum, item) => sum + (item.totalValue || 0),
0,
);
const totalCount = items.reduce(
(sum, item) => sum + (item.sampleCount || 0),
0,
);
const average = totalCount > 0 ? totalValue / totalCount : 0;

return {
average,
totalCount,
items,
};
} catch (error) {
throw new Error(
`Error retrieving aggregated sensor data in DynamoDB: ${error.message}`,
);
}
}
}
16 changes: 16 additions & 0 deletions src/modules/sensor-aggregated/sensor-aggregated.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Controller, Get, Query, UseGuards } from '@nestjs/common';
import { AuthGuard } from '@nestjs/passport';
import { SensorAggregatedService } from './sensor-aggragated.service';
import { GetSensorAggregatedDto } from './dto/create-aggregated.dto';

@Controller('sensor-data-aggregated')
export class SensorAggragatedController {
constructor(
private readonly sensorAggregatedService: SensorAggregatedService,
) {}
@UseGuards(AuthGuard('jwt'))
@Get()
async create(@Query() query: GetSensorAggregatedDto) {
return this.sensorAggregatedService.get(query);
}
}
29 changes: 29 additions & 0 deletions src/modules/sensor-aggregated/sensor-aggregated.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Module } from '@nestjs/common';
import { SensorAggragatedController } from './sensor-aggregated.controller';
import { SensorAggregatedService } from './sensor-aggragated.service';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { DynamoDB } from 'aws-sdk';
import * as https from 'https';

@Module({
imports: [ConfigModule],
controllers: [SensorAggragatedController],
providers: [
SensorAggregatedService,
{
provide: 'DynamoDBDocumentClient',
useFactory: (configService: ConfigService) => {
return new DynamoDB.DocumentClient({
region: configService.get<string>('AWS_REGION'),
maxRetries: 3,
httpOptions: {
timeout: 5000,
agent: new https.Agent({ maxSockets: 100 }),
},
});
},
inject: [ConfigService],
},
],
})
export class SensorAggregatedModule {}