Skip to content

Commit

Permalink
add scheduler for monitors and placeholder for functionalities
Browse files Browse the repository at this point in the history
  • Loading branch information
sahasourav123 committed Sep 7, 2024
1 parent e81dda1 commit f2e93ae
Show file tree
Hide file tree
Showing 14 changed files with 348 additions and 94 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/sandbox-deploy-frontend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
workflow_dispatch:

concurrency:
group: deploy-backend
group: deploy-frontend
cancel-in-progress: true

jobs:
Expand Down
17 changes: 16 additions & 1 deletion backend/scripts/schema.sql
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
-- table to store monitors
CREATE TABLE monitors (
monitor_id SERIAL PRIMARY KEY,
org_id INT NOT NULL,
monitor_name TEXT NOT NULL,
monitor_type TEXT NOT NULL,
monitor_body JSONB NOT NULL,
is_active boolean DEFAULT TRUE,
frequency INT DEFAULT 300,
interval INT DEFAULT 300,
timeout INT DEFAULT 5,
expectation JSONB,
alerts TEXT[],
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- monitor run history
create table run_history (
run_id SERIAL PRIMARY KEY,
org_id INT NOT NULL,
monitor_id INT NOT NULL,
outcome boolean NOT NULL,
response_time INT NOT NULL,
response text,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);


select * from monitors;
16 changes: 16 additions & 0 deletions backend/src/commons.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
Created On: July 2024
Created By: Sourav Saha
"""
import os
import logging
from rich.logging import RichHandler

# load environment variables
from dotenv import load_dotenv
base_dir = os.path.dirname(os.getcwd())
load_dotenv(f"{base_dir}/.env")
load_dotenv(f"{base_dir}/.env.local", override=True)

logging.basicConfig(level='INFO', format='%(message)s', datefmt="[%X]", handlers=[RichHandler()])
logger = logging.getLogger()
77 changes: 73 additions & 4 deletions backend/src/controller.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,82 @@
from utils import logger
from utils import DatabaseManager
import time
import requests
from commons import logger
from db_util import DatabaseManager

db = DatabaseManager()

def create_monitor(data: dict):
def get_monitor_by_id(monitor_id: int):
sql = f"select * from monitors where monitor_id={monitor_id}"
_df = db.query(sql)
monitor = _df.to_dict('records')[0]
return monitor

def get_monitor_by_orgid(org_id: int):
sql = f"select * from monitors where org_id={org_id}"
return db.query(sql)

def get_all_monitors():
sql = "select * from monitors where is_active"
df = db.query(sql)
return df

def insert_monitor(data: dict):
logger.info(f"Creating monitor: {data}")
sql = """insert into monitors (monitor_type, monitor_name, monitor_body, timeout, frequency, expectation, alerts)
sql = """insert into monitors (monitor_type, monitor_name, monitor_body, timeout, interval, expectation, alerts)
values (%(monitor_type)s, %(monitor_name)s, %(monitor_body)s, %(timeout)s, %(frequency)s, %(expectation)s, %(alerts)s)
returning monitor_id
"""
monitor_id = db.insert(sql, data)
logger.info(f"Inserted Monitor with id {monitor_id}")
return monitor_id

def update_monitor(monitor_id: int, data: dict):
logger.info(f"Updating monitor: {data}")
logger.info(f"Updated Monitor with id {monitor_id}")
pass

def delete_monitor(monitor_id: int):
logger.info(f"Deleting monitor: #{monitor_id}")
sql = f"delete from monitors where monitor_id = {monitor_id}"
db.query(sql)
logger.info(f"Deleted Monitor with id {monitor_id}")

def run_monitor_by_id(monitor_id):
monitor = get_monitor_by_id(monitor_id)
start_time = time.time()
try:
if monitor['monitor_type'] == 'api':
outcome, response = run_api_monitor(monitor['monitor_body'], monitor.get('expectation'))
else:
outcome, response = False

except Exception as e:
logger.error(f"Error running monitor: {e}")
outcome, response = False

# store run history
response_time = time.time() - start_time
sql = f"""insert into run_history (org_id, monitor_id, outcome, response_time, response) values
({monitor['org_id']}, {monitor_id}, {outcome}, {response_time}, '{response}')
"""
db.insert(sql)
return outcome

def run_api_monitor(monitor_body: dict, expectation: dict):
res = requests.request(
monitor_body.get('method'), monitor_body.get('url'),
headers=monitor_body.get('headers'),
params=monitor_body.get('params'),
data=monitor_body.get('body')
)
logger.info(f"Response: {res.status_code} | {res.reason}")

if expectation:
response_code_list = expectation.get('response_codes')
is_allow_list = expectation.get('is_allow_list')
outcome = (is_allow_list and res.status_code in response_code_list) or (not is_allow_list and res.status_code not in response_code_list)

else:
outcome = 200 <= res.status_code < 300

return outcome, res.status_code
13 changes: 13 additions & 0 deletions backend/src/data_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import Literal, Optional
from pydantic import BaseModel, Extra, PositiveInt, Field

class MonitorModel(BaseModel):
org_id: PositiveInt | None = None
# monitor_type: Literal["api", "website", "database", "server", "ssl", "mq"]
monitor_name: str | None = None
monitor_body: dict | None = None
timeout: int | None = None
interval: int | None = None
expectation: dict | None = None
alerts: list[int] | None = None
is_active: bool | None = None
20 changes: 2 additions & 18 deletions backend/src/utils.py → backend/src/db_util.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,9 @@
"""
Created On: July 2024
Created By: Sourav Saha
"""
import os
import logging
from rich.logging import RichHandler

# load environment variables
from dotenv import load_dotenv
base_dir = os.path.dirname(os.getcwd())
load_dotenv(f"{base_dir}/.env")
load_dotenv(f"{base_dir}/.env.local", override=True)

logging.basicConfig(level='INFO', format='%(message)s', datefmt="[%X]", handlers=[RichHandler()])
logger = logging.getLogger()

import re
import json
import psycopg2
import socket
import logging
import threading
import pandas as pd
from contextlib import contextmanager
Expand All @@ -40,7 +25,6 @@ def _get_connection(self):
try:
_this_ = f"wt-{os.getenv('ENV')}-{socket.gethostname()}"
conn_str = f"dbname=postgres user=postgres password={os.getenv('TIMESCALEDB_PASS')} host={os.getenv('TIMESCALEDB_HOST')} port={os.getenv('TIMESCALEDB_PORT')} application_name={_this_}"
print(conn_str)
conn = psycopg2.connect(conn_str)
self.logger.info(f'database connection created...')
return conn
Expand Down Expand Up @@ -75,7 +59,7 @@ def cursor_context(self, query, data):
c.close()
self.lock.release()

def insert(self, sql: str, data: tuple | dict) -> int:
def insert(self, sql: str, data: tuple | dict = ()) -> int:
_data = self._pre_process(data) if isinstance(data, dict) else data
with self.cursor_context(sql, _data) as c:
c.execute(sql, _data)
Expand Down
71 changes: 48 additions & 23 deletions backend/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Created On: July 2024
Created By: Sourav Saha
"""
from utils import logger
from commons import logger
import os
import logging
from datetime import datetime
Expand All @@ -14,10 +14,12 @@
from fastapi import FastAPI, Request, Response, APIRouter, Body
from fastapi_redis_cache import FastApiRedisCache, cache

import controller
import data_model as dm
import scheduler as sch
import controller as ct

# Initialize app
__service__ = 'the-towerhouse-backend'
__service__ = 'the-watchtower-backend'
tags_metadata = []

@asynccontextmanager
Expand Down Expand Up @@ -47,43 +49,66 @@ async def root():
dt = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return {"service": __service__, 'version': __version__, 'server-time': dt}

# create api monitor
@route.post("/create/monitor")
def create_monitor(monitor_type: Literal["api", "website", "database", "server", "ssl", "mq"], monitor_data=Body(...)):
monitor_id = controller.create_monitor({'monitor_type': monitor_type, **monitor_data})
return {"message": "Monitor created successfully", "monitor_id": monitor_id}

@route.post("/import/monitor")
def create_monitor(monitor_type: Literal["api", "website", "database", "server", "ssl", "mq"]):
def import_monitor(org_id: int):
return {"message": "Monitor imported successfully"}

@route.post("/export/monitor")
def create_monitor(monitor_type: Literal["api", "website", "database", "server", "ssl", "mq"]):
return {"message": "Monitor exported successfully"}
def export_monitor(org_id: int):
pass

# create api monitor
@route.post("/create/monitor")
def create_monitor(monitor_type: Literal["api", "website", "database", "server", "ssl", "mq"], monitor_data=dm.MonitorModel):
# insert into database
monitor_id = ct.insert_monitor({'monitor_type': monitor_type, **monitor_data})
# schedule monitoring
sch.create_job(monitor_id, monitor_data.interval)
return {"message": "Monitor created successfully", "monitor_id": monitor_id}

# update monitor
@route.put("/update/monitor/{monitor_id}")
def update_monitor(monitor_id: int):
def update_monitor(monitor_id: int, monitor_data=dm.MonitorModel):
if monitor_data.interval:
sch.create_job(monitor_id, monitor_data.interval)
return {"message": "Monitor updated successfully"}

# delete monitor
@route.delete("/delete/monitor/{monitor_id}")
def delete_monitor(monitor_id: int):
sch.scheduler.remove_job(f"monitor#{monitor_id}")
return {"message": "Monitor deleted successfully"}

# get monitor(s)
@route.get("/fetch/monitors/{monitor_id}")
def get_monitors(monitor_id: int):
return {"message": "Monitor fetched successfully"}
@route.get("/fetch/monitor")
@cache(expire=30)
def get_monitors(response: Response, org_id: int):
df = ct.get_monitor_by_orgid(org_id)
return {"message": "Monitor fetched successfully", "data": df.to_dict('records')}

# run monitor
@route.get("/run/monitor/{monitor_id}")
def run_monitor(monitor_id: int):
outcome = ct.run_monitor_by_id(monitor_id)
return {'is_success': outcome}

# refresh monitor
@route.get("/refresh/monitor")
def refresh_monitor():
df = ct.get_all_monitors()
for idx, row in df.iterrows():
sch.create_job(row['monitor_id'], row['interval'])

return {"message": "Monitor refreshed successfully"}

# get monitoring history
@route.get("/fetch/history/monitors/{monitor_id}")
@route.get("/fetch/history/monitor/{monitor_id}")
def get_monitor_history(monitor_id: int):
return {"message": "Monitor history fetched successfully"}

# get status page
@route.get("/fetch/statuspage/{statuspage_id}")
def get_statuspage(statuspage_id: int):
@route.get("/fetch/statuspage/{page_id}")
def get_statuspage(page_id: int):
return {"message": "Status page fetched successfully"}

# create status page
Expand All @@ -92,13 +117,13 @@ def create_statuspage():
return {"message": "Status page created successfully"}

# update status page
@route.put("/update/statuspage/{statuspage_id}")
def update_statuspage(statuspage_id: int):
@route.put("/update/statuspage/{page_id}")
def update_statuspage(page_id: int):
return {"message": "Status page updated successfully"}

# delete status page
@route.delete("/delete/statuspage/{statuspage_id}")
def delete_statuspage(statuspage_id: int):
@route.delete("/delete/statuspage/{page_id}")
def delete_statuspage(page_id: int):
return {"message": "Status page deleted successfully"}

# create alert group
Expand Down
Loading

0 comments on commit f2e93ae

Please sign in to comment.