Skip to content

Commit

Permalink
Added big data functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
denysgerasymuk799 committed Jun 11, 2022
1 parent 9956428 commit da9ba3a
Show file tree
Hide file tree
Showing 36 changed files with 1,009 additions and 56 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ transaction-service-data
wallet-service-data
card-service-data
.env
*.env
*.pem
confluent_credentials.config
*.crt
20K.txt
secrets
env.js
secrets
report.html

doc/build/*
doc/_build/*
Expand Down
31 changes: 19 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

## Features

- Functionality: `Deposit money` `Send money` `List transactions` `Login/Sign Up with JWT Token`
- Technologies: `Kafka` `AWS` `React` `Python` `Docker` `Grafana` `Prometheus`
- Functionality: `Deposit money` `Send money` `List transactions` `Login/Sign Up with JWT Token` `User/General Bank Analytics`
- Technologies: `Kafka` `AWS` `React` `Python` `Docker` `Grafana` `Prometheus` `Databricks`
- Frameworks: `FastAPI` `Faust`
- Databases: `AWS Keyspaces` `MongoDB`
- AWS Resources: `EKS` `ELB` `CloudWatch` `API Gateway` `S3` `Amplify` `IAM` `KMS` `VPC`
Expand All @@ -20,7 +20,7 @@ The high-level diagram of our services from the infrastructure side looks like t


<p align="center">
<img src="https://user-images.githubusercontent.com/42843889/171746032-9b8625da-eac6-404a-8a6c-3190639e0181.png" alt="SA_project_architecture_v5"/>
<img src="https://user-images.githubusercontent.com/42843889/173257312-ad4db461-2367-423c-8aed-4f4e464500ee.png" alt="SA_project_architecture_v6"/>
</p>


Expand All @@ -37,15 +37,15 @@ The high-level diagram of our services from the interaction side looks like this
</p>


**Cassandra interaction**
**Cassandra Interaction**

Card Manager — Cassandra(cards)

Card Service — Cassandra(cards, reserved_transactions)

Transaction Service — Cassandra(transactions, transactions_by_card, transactions_preaggregated_daily, transactions_preaggregated_monthly)

Orchestrator Service — Cassandra(cards, transactions_by_card, reserved_transactions)
| Microservice | Cassandra Tables |
| --- | --- |
| CardManager | [cards, unique_users_daily] |
| OrchestratorService | [cards, transactions_by_card, reserved_transactions] |
| CardService | [cards, reserved_transactions] |
| TransactionService | [transactions, transactions_by_card, successful_transactions_daily, transactions_preaggregated_daily, transactions_preaggregated_monthly] |
| AnalyticsService | [bank_statistics_daily, transactions_preaggregated_daily, transactions_preaggregated_monthly] |

<pre>

Expand Down Expand Up @@ -116,6 +116,11 @@ pip install -r requirements.txt
python3.8 -m venv orchestrator_service_venv
source orchestrator_service_venv/bin/activate
pip install -r requirements.txt

# Prepare the Analytics Service
python3.8 -m venv analytics_service_venv
source analytics_service_venv/bin/activate
pip install -r requirements.txt
```

### Start the Microservices
Expand Down Expand Up @@ -189,10 +194,12 @@ aws eks --region eu-central-1 update-kubeconfig --name web-banking
# Deploy Kafka
# See "Configure Confluent for Kubernetes" section

# Create topics in control-center: TransactionService,
# Create topics in control-center: TransactionService, CardService, ResultsTopic.
# To connect to it use the next command
kubectl port-forward controlcenter-0 9021:9021

# Change log level from DEBUG to INFO in all microservices

# [If needed] Deploy services interacted with kafka
docker build . -t transaction_service:0.1
docker tag transaction_service:0.1 denys8herasymuk/web-banking-transaction-service:0.1
Expand Down
14 changes: 14 additions & 0 deletions analytics_service/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM python:3.8-slim

RUN apt-get update

ADD ./requirements.txt /opt/app/requirements.txt
WORKDIR /opt/app/

RUN pip install --upgrade pip

RUN pip install -r ./requirements.txt

COPY ./ /opt/app/

CMD uvicorn --host 0.0.0.0 --port 8080 --workers 4 app:app
91 changes: 81 additions & 10 deletions analytics_service/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from fastapi import FastAPI, status, Request
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, HTMLResponse

from config import logger
from database.__db import AnalyticsServiceOperator
from domain_logic.__constants import *
from domain_logic.__utils import validate_token, create_bank_stats_report


# Create app object.
Expand All @@ -20,7 +23,7 @@
}


def get_user_spendings(table: str, request: Request):
async def get_user_spendings(table: str, request: Request):
"""
Get user spendings from the database for specified date range.
Expand All @@ -30,6 +33,23 @@ def get_user_spendings(table: str, request: Request):
"""
# Get request parameters.
request_params = request.query_params

try:
is_valid_token, msg, auth_card_id = await validate_token(request_params, request)
except Exception as err:
logger.error(f'Validate token error: {err}')
return JSONResponse(content={'content': 'unauthorized'},
status_code=status.HTTP_401_UNAUTHORIZED,
headers=cors)

# Check authorization of the request.
if not is_valid_token:
return JSONResponse(content={'content': msg}, status_code=status.HTTP_401_UNAUTHORIZED, headers=cors)

if str(auth_card_id) != request_params["card_id"]:
return JSONResponse(content={'content': 'user can view only data related to him, not ot other users'},
status_code=status.HTTP_401_UNAUTHORIZED, headers=cors)

try:
card_id, from_date, to_date = request_params["card_id"], request_params["from_date"], request_params["to_date"]
except (ValueError, KeyError):
Expand All @@ -39,6 +59,7 @@ def get_user_spendings(table: str, request: Request):

# Get user spendings.
spendings = operator.get_user_spendings(table=table, card_id=card_id, from_date=from_date, to_date=to_date)
print('spendings -- ', spendings)

# In case there is no such record.
if not spendings:
Expand All @@ -48,32 +69,37 @@ def get_user_spendings(table: str, request: Request):
return JSONResponse(content={"spendings": spendings}, status_code=status.HTTP_200_OK, headers=cors)


@app.options("/{full_path:path}")
async def options():
return JSONResponse(status_code=status.HTTP_200_OK, content={"ok": "true"}, headers=cors)


@app.get("/get_spendings_by_days")
def get_spendings_by_days(request: Request):
async def get_spendings_by_days(request: Request):
"""
Get user spending for specified in the request day dates.
Request date has to have the following format: YYYY-MM-DD.
:param request: (fastapi.Request) - request.
:return: (JSONResponse) - response with the returned record.
"""
return get_user_spendings(table=TR_PREAGGREGATED_DAILY_TABLE, request=request)
return await get_user_spendings(table=TR_PREAGGREGATED_DAILY_TABLE, request=request)


@app.get("/get_spendings_by_months")
def get_spendings_by_months(request: Request):
async def get_spendings_by_months(request: Request):
"""
Get user spending for specified in the request day dates.
Request date has to have the following format: YYYY-MM-DD.
:param request: (fastapi.Request) - request.
:return: (JSONResponse) - response with the returned record.
"""
return get_user_spendings(table=TR_PREAGGREGATED_MONTHLY_TABLE, request=request)
return await get_user_spendings(table=TR_PREAGGREGATED_MONTHLY_TABLE, request=request)


@app.get("/get_bank_statistics")
def get_spendings_by_months(request: Request):
@app.post("/get_bank_statistics")
async def get_bank_statistics(request: Request):
"""
Get bank statistics for the specified date.
Request date has to have the following format: YYYY-MM-DD.
Expand All @@ -82,14 +108,22 @@ def get_spendings_by_months(request: Request):
:return: (JSONResponse) - response with the returned record.
"""
# Get request parameters.
request_params = request.query_params
request_params = await request.json()
logger.info(f'request_params: {request_params}')

try:
token = request_params["token"]
date = request_params["date"]
except (ValueError, KeyError):
return JSONResponse(
content={"content": "Invalid request parameters."}, status_code=status.HTTP_401_UNAUTHORIZED, headers=cors
content={"content": "Invalid request parameters"}, status_code=status.HTTP_401_UNAUTHORIZED, headers=cors
)

if token != os.getenv("SECRET_TOKEN"):
return JSONResponse(content={'errors': 'Wrong token'},
status_code=status.HTTP_401_UNAUTHORIZED,
headers=cors)

# Get bank statistics.
stats = operator.get_bank_statistics(date=date)

Expand All @@ -107,6 +141,43 @@ def get_spendings_by_months(request: Request):
return JSONResponse(content=content, status_code=status.HTTP_200_OK, headers=cors)


@app.get("/get_bank_statistics_report")
async def get_bank_statistics_report(request: Request):
# Get request parameters.
request_params = request.query_params
logger.info(f'request_params: {request_params}')

try:
is_valid_token, msg, auth_card_id = await validate_token(request_params,
request=None, token=request_params.get('token'))
except Exception as err:
logger.error(f'Validate token error: {err}')
return JSONResponse(content={'content': 'unauthorized'},
status_code=status.HTTP_401_UNAUTHORIZED,
headers=cors)

# Check authorization of the request.
if not is_valid_token:
return JSONResponse(content={'content': msg}, status_code=status.HTTP_401_UNAUTHORIZED, headers=cors)

# check admin rights
if str(auth_card_id) != ADMIN_CARD_ID:
return JSONResponse(content={'content': 'Only admins have access to this endpoint'},
status_code=status.HTTP_401_UNAUTHORIZED, headers=cors)

# If all checks are passed, generate a report and return it
result = create_bank_stats_report(operator)
if result == -1:
return JSONResponse(
content={"content": "There is no such record."}, status_code=status.HTTP_200_OK, headers=cors
)

with open('./report.html', 'r') as html_file:
content = html_file.read()

return HTMLResponse(content=content, status_code=200)


@app.on_event("shutdown")
def shutdown():
print("Closing connection to AnalyticsServiceOperator database...")
Expand Down
32 changes: 32 additions & 0 deletions analytics_service/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import logging
import aiohttp
import asyncio

from domain_logic.__custom_logger import CustomHandler
from domain_logic.cryptographer import Cryptographer
from domain_logic.__constants import *

cors = {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Authorization, Content-Type',
'Access-Control-Allow-Methods': 'GET, PUT, POST, DELETE, HEAD, OPTIONS',
'Allow': 'POST, OPTIONS'
}

# Add logic for asynchronous requests
loop = asyncio.get_event_loop()
client = aiohttp.ClientSession(loop=loop)

# Prepare own helper class objects
cryptographer = Cryptographer(public_key_location=os.getenv('PUBLIC_KEY_LOCATION'),
private_key_location=os.getenv('PRIVATE_KEY_LOCATION'))

# Prepare own helper class objects
logger = logging.getLogger('root')
if DEBUG_MODE:
logger.setLevel('DEBUG')
else:
logger.setLevel('INFO')
logging.disable(logging.DEBUG)
logger.addHandler(CustomHandler())
14 changes: 14 additions & 0 deletions analytics_service/database/__db.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,19 @@ def get_bank_statistics(self, date: str):
except IndexError:
return None, None, None

def get_all_bank_statistics(self):
# Get statistics for a certain date.
query = f"""
SELECT "date", number_transactions, number_unique_users, capital_turnover
FROM {BANK_STATISTICS_DAILY_TABLE}
LIMIT 92;
"""
# In case there are no records.
try:
statistics = list(self.__client.execute_read_query(query))
return statistics
except IndexError:
return None, None, None

def shutdown(self):
self.__client.close()
Loading

0 comments on commit da9ba3a

Please sign in to comment.