From f2e93ae156b4f2f8131e77c357982e292eb323c9 Mon Sep 17 00:00:00 2001 From: Sourav Saha Date: Sat, 7 Sep 2024 23:27:05 +0530 Subject: [PATCH] add scheduler for monitors and placeholder for functionalities --- .github/workflows/sandbox-deploy-frontend.yml | 2 +- backend/scripts/schema.sql | 17 +++- backend/src/commons.py | 16 ++++ backend/src/controller.py | 77 ++++++++++++++++- backend/src/data_model.py | 13 +++ backend/src/{utils.py => db_util.py} | 20 +---- backend/src/main.py | 71 +++++++++++----- backend/src/scheduler.py | 83 +++++++++++++++++++ frontend/src/Dashboard.py | 49 +---------- frontend/src/pages/1_Monitors.py | 60 ++++++++++++++ frontend/src/pages/Alert_Groups.py | 4 + frontend/src/pages/Status_Pages.py | 4 + frontend/src/svc/svc_backend.py | 16 ++++ frontend/src/utils.py | 10 +++ 14 files changed, 348 insertions(+), 94 deletions(-) create mode 100644 backend/src/commons.py create mode 100644 backend/src/data_model.py rename backend/src/{utils.py => db_util.py} (87%) create mode 100644 backend/src/scheduler.py create mode 100644 frontend/src/pages/1_Monitors.py create mode 100644 frontend/src/pages/Alert_Groups.py create mode 100644 frontend/src/pages/Status_Pages.py diff --git a/.github/workflows/sandbox-deploy-frontend.yml b/.github/workflows/sandbox-deploy-frontend.yml index 9d7645a..5580db8 100644 --- a/.github/workflows/sandbox-deploy-frontend.yml +++ b/.github/workflows/sandbox-deploy-frontend.yml @@ -9,7 +9,7 @@ on: workflow_dispatch: concurrency: - group: deploy-backend + group: deploy-frontend cancel-in-progress: true jobs: diff --git a/backend/scripts/schema.sql b/backend/scripts/schema.sql index 305976d..7254d96 100644 --- a/backend/scripts/schema.sql +++ b/backend/scripts/schema.sql @@ -1,13 +1,28 @@ -- table to store monitors CREATE TABLE monitors ( monitor_id SERIAL PRIMARY KEY, + org_id INT NOT NULL, monitor_name TEXT NOT NULL, monitor_type TEXT NOT NULL, monitor_body JSONB NOT NULL, is_active boolean DEFAULT TRUE, - frequency INT DEFAULT 300, + interval INT DEFAULT 300, timeout INT DEFAULT 5, expectation JSONB, alerts TEXT[], created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); + +-- monitor run history +create table run_history ( + run_id SERIAL PRIMARY KEY, + org_id INT NOT NULL, + monitor_id INT NOT NULL, + outcome boolean NOT NULL, + response_time INT NOT NULL, + response text, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + + +select * from monitors; diff --git a/backend/src/commons.py b/backend/src/commons.py new file mode 100644 index 0000000..8730e9d --- /dev/null +++ b/backend/src/commons.py @@ -0,0 +1,16 @@ +""" +Created On: July 2024 +Created By: Sourav Saha +""" +import os +import logging +from rich.logging import RichHandler + +# load environment variables +from dotenv import load_dotenv +base_dir = os.path.dirname(os.getcwd()) +load_dotenv(f"{base_dir}/.env") +load_dotenv(f"{base_dir}/.env.local", override=True) + +logging.basicConfig(level='INFO', format='%(message)s', datefmt="[%X]", handlers=[RichHandler()]) +logger = logging.getLogger() diff --git a/backend/src/controller.py b/backend/src/controller.py index d5e6149..3fb3b85 100644 --- a/backend/src/controller.py +++ b/backend/src/controller.py @@ -1,13 +1,82 @@ -from utils import logger -from utils import DatabaseManager +import time +import requests +from commons import logger +from db_util import DatabaseManager db = DatabaseManager() -def create_monitor(data: dict): +def get_monitor_by_id(monitor_id: int): + sql = f"select * from monitors where monitor_id={monitor_id}" + _df = db.query(sql) + monitor = _df.to_dict('records')[0] + return monitor + +def get_monitor_by_orgid(org_id: int): + sql = f"select * from monitors where org_id={org_id}" + return db.query(sql) + +def get_all_monitors(): + sql = "select * from monitors where is_active" + df = db.query(sql) + return df + +def insert_monitor(data: dict): logger.info(f"Creating monitor: {data}") - sql = """insert into monitors (monitor_type, monitor_name, monitor_body, timeout, frequency, expectation, alerts) + sql = """insert into monitors (monitor_type, monitor_name, monitor_body, timeout, interval, expectation, alerts) values (%(monitor_type)s, %(monitor_name)s, %(monitor_body)s, %(timeout)s, %(frequency)s, %(expectation)s, %(alerts)s) returning monitor_id """ monitor_id = db.insert(sql, data) + logger.info(f"Inserted Monitor with id {monitor_id}") return monitor_id + +def update_monitor(monitor_id: int, data: dict): + logger.info(f"Updating monitor: {data}") + logger.info(f"Updated Monitor with id {monitor_id}") + pass + +def delete_monitor(monitor_id: int): + logger.info(f"Deleting monitor: #{monitor_id}") + sql = f"delete from monitors where monitor_id = {monitor_id}" + db.query(sql) + logger.info(f"Deleted Monitor with id {monitor_id}") + +def run_monitor_by_id(monitor_id): + monitor = get_monitor_by_id(monitor_id) + start_time = time.time() + try: + if monitor['monitor_type'] == 'api': + outcome, response = run_api_monitor(monitor['monitor_body'], monitor.get('expectation')) + else: + outcome, response = False + + except Exception as e: + logger.error(f"Error running monitor: {e}") + outcome, response = False + + # store run history + response_time = time.time() - start_time + sql = f"""insert into run_history (org_id, monitor_id, outcome, response_time, response) values + ({monitor['org_id']}, {monitor_id}, {outcome}, {response_time}, '{response}') + """ + db.insert(sql) + return outcome + +def run_api_monitor(monitor_body: dict, expectation: dict): + res = requests.request( + monitor_body.get('method'), monitor_body.get('url'), + headers=monitor_body.get('headers'), + params=monitor_body.get('params'), + data=monitor_body.get('body') + ) + logger.info(f"Response: {res.status_code} | {res.reason}") + + if expectation: + response_code_list = expectation.get('response_codes') + is_allow_list = expectation.get('is_allow_list') + outcome = (is_allow_list and res.status_code in response_code_list) or (not is_allow_list and res.status_code not in response_code_list) + + else: + outcome = 200 <= res.status_code < 300 + + return outcome, res.status_code diff --git a/backend/src/data_model.py b/backend/src/data_model.py new file mode 100644 index 0000000..4b34fe3 --- /dev/null +++ b/backend/src/data_model.py @@ -0,0 +1,13 @@ +from typing import Literal, Optional +from pydantic import BaseModel, Extra, PositiveInt, Field + +class MonitorModel(BaseModel): + org_id: PositiveInt | None = None + # monitor_type: Literal["api", "website", "database", "server", "ssl", "mq"] + monitor_name: str | None = None + monitor_body: dict | None = None + timeout: int | None = None + interval: int | None = None + expectation: dict | None = None + alerts: list[int] | None = None + is_active: bool | None = None diff --git a/backend/src/utils.py b/backend/src/db_util.py similarity index 87% rename from backend/src/utils.py rename to backend/src/db_util.py index 97df486..be9feef 100644 --- a/backend/src/utils.py +++ b/backend/src/db_util.py @@ -1,24 +1,9 @@ -""" -Created On: July 2024 -Created By: Sourav Saha -""" import os -import logging -from rich.logging import RichHandler - -# load environment variables -from dotenv import load_dotenv -base_dir = os.path.dirname(os.getcwd()) -load_dotenv(f"{base_dir}/.env") -load_dotenv(f"{base_dir}/.env.local", override=True) - -logging.basicConfig(level='INFO', format='%(message)s', datefmt="[%X]", handlers=[RichHandler()]) -logger = logging.getLogger() - import re import json import psycopg2 import socket +import logging import threading import pandas as pd from contextlib import contextmanager @@ -40,7 +25,6 @@ def _get_connection(self): try: _this_ = f"wt-{os.getenv('ENV')}-{socket.gethostname()}" conn_str = f"dbname=postgres user=postgres password={os.getenv('TIMESCALEDB_PASS')} host={os.getenv('TIMESCALEDB_HOST')} port={os.getenv('TIMESCALEDB_PORT')} application_name={_this_}" - print(conn_str) conn = psycopg2.connect(conn_str) self.logger.info(f'database connection created...') return conn @@ -75,7 +59,7 @@ def cursor_context(self, query, data): c.close() self.lock.release() - def insert(self, sql: str, data: tuple | dict) -> int: + def insert(self, sql: str, data: tuple | dict = ()) -> int: _data = self._pre_process(data) if isinstance(data, dict) else data with self.cursor_context(sql, _data) as c: c.execute(sql, _data) diff --git a/backend/src/main.py b/backend/src/main.py index 5f8961a..03726f7 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -2,7 +2,7 @@ Created On: July 2024 Created By: Sourav Saha """ -from utils import logger +from commons import logger import os import logging from datetime import datetime @@ -14,10 +14,12 @@ from fastapi import FastAPI, Request, Response, APIRouter, Body from fastapi_redis_cache import FastApiRedisCache, cache -import controller +import data_model as dm +import scheduler as sch +import controller as ct # Initialize app -__service__ = 'the-towerhouse-backend' +__service__ = 'the-watchtower-backend' tags_metadata = [] @asynccontextmanager @@ -47,43 +49,66 @@ async def root(): dt = datetime.now().strftime('%Y-%m-%d %H:%M:%S') return {"service": __service__, 'version': __version__, 'server-time': dt} -# create api monitor -@route.post("/create/monitor") -def create_monitor(monitor_type: Literal["api", "website", "database", "server", "ssl", "mq"], monitor_data=Body(...)): - monitor_id = controller.create_monitor({'monitor_type': monitor_type, **monitor_data}) - return {"message": "Monitor created successfully", "monitor_id": monitor_id} - @route.post("/import/monitor") -def create_monitor(monitor_type: Literal["api", "website", "database", "server", "ssl", "mq"]): +def import_monitor(org_id: int): return {"message": "Monitor imported successfully"} @route.post("/export/monitor") -def create_monitor(monitor_type: Literal["api", "website", "database", "server", "ssl", "mq"]): - return {"message": "Monitor exported successfully"} +def export_monitor(org_id: int): + pass + +# create api monitor +@route.post("/create/monitor") +def create_monitor(monitor_type: Literal["api", "website", "database", "server", "ssl", "mq"], monitor_data=dm.MonitorModel): + # insert into database + monitor_id = ct.insert_monitor({'monitor_type': monitor_type, **monitor_data}) + # schedule monitoring + sch.create_job(monitor_id, monitor_data.interval) + return {"message": "Monitor created successfully", "monitor_id": monitor_id} # update monitor @route.put("/update/monitor/{monitor_id}") -def update_monitor(monitor_id: int): +def update_monitor(monitor_id: int, monitor_data=dm.MonitorModel): + if monitor_data.interval: + sch.create_job(monitor_id, monitor_data.interval) return {"message": "Monitor updated successfully"} # delete monitor @route.delete("/delete/monitor/{monitor_id}") def delete_monitor(monitor_id: int): + sch.scheduler.remove_job(f"monitor#{monitor_id}") return {"message": "Monitor deleted successfully"} # get monitor(s) -@route.get("/fetch/monitors/{monitor_id}") -def get_monitors(monitor_id: int): - return {"message": "Monitor fetched successfully"} +@route.get("/fetch/monitor") +@cache(expire=30) +def get_monitors(response: Response, org_id: int): + df = ct.get_monitor_by_orgid(org_id) + return {"message": "Monitor fetched successfully", "data": df.to_dict('records')} + +# run monitor +@route.get("/run/monitor/{monitor_id}") +def run_monitor(monitor_id: int): + outcome = ct.run_monitor_by_id(monitor_id) + return {'is_success': outcome} + +# refresh monitor +@route.get("/refresh/monitor") +def refresh_monitor(): + df = ct.get_all_monitors() + for idx, row in df.iterrows(): + sch.create_job(row['monitor_id'], row['interval']) + + return {"message": "Monitor refreshed successfully"} # get monitoring history -@route.get("/fetch/history/monitors/{monitor_id}") +@route.get("/fetch/history/monitor/{monitor_id}") def get_monitor_history(monitor_id: int): return {"message": "Monitor history fetched successfully"} # get status page -@route.get("/fetch/statuspage/{statuspage_id}") -def get_statuspage(statuspage_id: int): +@route.get("/fetch/statuspage/{page_id}") +def get_statuspage(page_id: int): return {"message": "Status page fetched successfully"} # create status page @@ -92,13 +117,13 @@ def create_statuspage(): return {"message": "Status page created successfully"} # update status page -@route.put("/update/statuspage/{statuspage_id}") -def update_statuspage(statuspage_id: int): +@route.put("/update/statuspage/{page_id}") +def update_statuspage(page_id: int): return {"message": "Status page updated successfully"} # delete status page -@route.delete("/delete/statuspage/{statuspage_id}") -def delete_statuspage(statuspage_id: int): +@route.delete("/delete/statuspage/{page_id}") +def delete_statuspage(page_id: int): return {"message": "Status page deleted successfully"} # create alert group diff --git a/backend/src/scheduler.py b/backend/src/scheduler.py new file mode 100644 index 0000000..2278c4a --- /dev/null +++ b/backend/src/scheduler.py @@ -0,0 +1,83 @@ +import os +import pytz +import logging +from typing import Literal +from commons import logger + +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore + +from apscheduler.executors.pool import ThreadPoolExecutor +import controller as ct + +class Scheduler: + __instance = None + scheduler = None + + def __new__(cls, *args, **kwargs): + if not Scheduler.__instance: + Scheduler.__instance = object.__new__(cls) + + return Scheduler.__instance + + def __init__(self): + self.logger = logging.getLogger() + pass + + def get_scheduler(self, max_instances=3, grace_time=10): + + # return scheduler if already exists + if self.scheduler: + return self.scheduler + + # create new scheduler + tz = pytz.timezone('Asia/Kolkata') + self.scheduler = BackgroundScheduler(timezone=tz) + + job_defaults = { + 'coalesce': True, + 'max_instances': max_instances, + 'misfire_grace_time': grace_time + } + + job_stores = { + 'default': SQLAlchemyJobStore( + url=f"postgresql://postgres:{os.getenv('TIMESCALEDB_PASS')}@{os.getenv('TIMESCALEDB_HOST')}:{os.getenv('TIMESCALEDB_PORT')}/postgres?application_name=scheduler", + tablename="monitor_jobs" + ) + } + + job_executors = { + 'default': ThreadPoolExecutor(3), + } + + self.scheduler.configure(jobstores=job_stores, executors=job_executors, job_defaults=job_defaults) + self.scheduler.start() + return self.scheduler + + +scheduler = Scheduler().get_scheduler() + +def create_job(monitor_id, interval: int): + job_id = f"monitor#{monitor_id}" + scheduler.add_job(ct.run_monitor_by_id, trigger='interval', args=[monitor_id], id=job_id, seconds=interval, replace_existing=True, jitter=30) + logger.info(f"{job_id} scheduled with interval {interval} sec") + +def manage_job(action: Literal["pause", "resume", "delete"], monitor_id: int): + job_id = f"monitor#{monitor_id}" + logger.info(f"Job Manager: {action} - {job_id}") + # retrieve job if exists + job = scheduler.get_job(job_id) + + match action.lower(): + case "pause": + if job: scheduler.pause_job(job_id) + + case "resume": + if job: scheduler.resume_job(job_id) + + case "remove": + if job: scheduler.remove_job(job_id) + + logger.info(f"{action} job: {job_id}") + return job_id diff --git a/frontend/src/Dashboard.py b/frontend/src/Dashboard.py index 9cef7b5..7338b84 100644 --- a/frontend/src/Dashboard.py +++ b/frontend/src/Dashboard.py @@ -24,50 +24,5 @@ """ st.markdown(f"", unsafe_allow_html=True) -monitor_type = st.selectbox('Select Monitor Type', ['api', 'website', 'server'], index=0) - -sample_curl = """curl --location 'https://api.example.com/path?foo=bar&hello=world' \ ---header 'Authorization:Bearer XXXXX==' \ ---header 'version: 2' -""" -cc = st.columns(2) -query = cc[0].text_area('Insert cURL', value=sample_curl, height=150) -monitor_body = utils.parse_curl_command(query) -cc[1].write('Parsed cURL') -cc[1].code(f"Method:\t {monitor_body.get('method')} \nURL:\t {monitor_body.get('url')} \nHeaders: {monitor_body.get('headers')} \nParams:\t {monitor_body.get('params')} \nBody:\t {monitor_body.get('body')}", language='bash') - -monitor_name = st.text_input('Monitor Name', value='Monitor Name') -monitor_timeout = st.number_input('Monitor Timeout', value=5) -monitor_frequency = st.number_input('Monitor Frequency (Minutes)', value=5) - -# Expectation -response_code_type = st.radio('Expect Response Codes', ['Response Code - Allowed', 'Response Code - Not Allowed'], index=0, horizontal=True) -response_codes = [200, 201, 202, 204, 301, 302, 304, 400, 401, 403, 404, 500, 502, 503, 504] -if response_code_type == 'Response Code - Allowed': - default_response_codes = [200, 201, 202, 204] -else: - default_response_codes = [400, 401, 403, 404, 500, 502, 503, 504] -selected_response_codes = st.multiselect('Response Codes', response_codes, default=default_response_codes) -monitor_expectation = { - 'response_code_type': response_code_type, - 'response_codes': selected_response_codes -} - -alerts = st.multiselect('Alerts', ['slack', 'email', 'telegram']) - -cc = st.columns(2) -if cc[0].button('Test Monitor'): - import requests - res = requests.request(monitor_body.get('method'), monitor_body.get('url'), headers=monitor_body.get('headers'), params=monitor_body.get('params'), data=monitor_body.get('body')) - st.write(f"Response: {res.status_code} | {res.reason}") - st.write(res.json()) - -if cc[1].button('Create Monitor', type='primary'): - _url = monitor_body.get('url') - if _url == 'https://api.example.com/path': - st.warning('Default URL, change the URL') - elif _url in [None, '']: - st.warning('Enter valid cURL command') - else: - res = backend.create_monitor(monitor_type, monitor_name, monitor_body, monitor_timeout, monitor_frequency * 60, monitor_expectation, alerts) - st.json(res) +monitor_df = backend.fetch_monitors(1) +st.dataframe(monitor_df) diff --git a/frontend/src/pages/1_Monitors.py b/frontend/src/pages/1_Monitors.py new file mode 100644 index 0000000..dff02ae --- /dev/null +++ b/frontend/src/pages/1_Monitors.py @@ -0,0 +1,60 @@ +import utils +from utils import logger +import streamlit as st +from svc import svc_backend as backend + +st.header("Monitors") + +monitor_type = st.selectbox('Select Monitor Type', ['api', 'website', 'server'], index=0) + +sample_curl = """curl --location 'https://watchtower.finanssure.com/api/v1' \ +--header 'Authorization:Bearer XXXXX==' \ +--header 'version: 1' +""" +cc = st.columns(2) +query = cc[0].text_area('Insert cURL', value=sample_curl, height=150) +monitor_body = utils.parse_curl_command(query) +cc[1].write('Parsed cURL') +cc[1].code(f"Method:\t {monitor_body.get('method')} \nURL:\t {monitor_body.get('url')} \nHeaders: {monitor_body.get('headers')} \nParams:\t {monitor_body.get('params')} \nBody:\t {monitor_body.get('body')}", language='bash') + +monitor_name = st.text_input('Monitor Name', placeholder='Enter Monitor Name') + +cc = st.columns(2) +monitor_timeout = cc[0].number_input('Request Timeout', value=5) +monitor_frequency = cc[1].number_input('Monitor Frequency (Minutes)', value=5) + +# Expectation +test_response = st.radio('Test Response Codes', ['Response Codes - Success', 'Response Codes - Failure'], index=0, horizontal=True) +all_response_codes = [200, 201, 202, 204, 301, 302, 304, 400, 401, 403, 404, 500, 502, 503, 504] +if test_response == 'Response Codes - Success': + is_allow_list = True + default_response_codes = [200, 201, 202, 204] +else: + is_allow_list = False + default_response_codes = [400, 401, 403, 404, 500, 502, 503, 504] + +selected_response_codes = st.multiselect('Response Codes', all_response_codes, default=default_response_codes) +monitor_expectation = { + 'is_allow_list': is_allow_list, + 'response_codes': selected_response_codes +} + +alerts = st.multiselect('Alerts', ['slack', 'email', 'telegram']) + +cc = st.columns([1, 1, 5]) +if cc[0].button('Test Monitor'): + _url = monitor_body.get('url') + if _url in [None, '']: + st.warning('Enter valid cURL command') + else: + utils.test_monitor_config(monitor_body) + +if cc[1].button('Create Monitor', type='primary'): + _url = monitor_body.get('url') + if _url == 'https://watchtower.finanssure.com/api/v1': + st.warning('Default URL, change the URL') + elif _url in [None, '']: + st.warning('Enter valid cURL command') + else: + res = backend.create_monitor(monitor_type, monitor_name, monitor_body, monitor_timeout, monitor_frequency * 60, monitor_expectation, alerts) + st.json(res) diff --git a/frontend/src/pages/Alert_Groups.py b/frontend/src/pages/Alert_Groups.py new file mode 100644 index 0000000..430b6bc --- /dev/null +++ b/frontend/src/pages/Alert_Groups.py @@ -0,0 +1,4 @@ +import streamlit as st + +st.header("Alert Groups") +st.info("Page Under Development...") diff --git a/frontend/src/pages/Status_Pages.py b/frontend/src/pages/Status_Pages.py new file mode 100644 index 0000000..c24e720 --- /dev/null +++ b/frontend/src/pages/Status_Pages.py @@ -0,0 +1,4 @@ +import streamlit as st + +st.header("Status Pages") +st.info("Page Under Development...") \ No newline at end of file diff --git a/frontend/src/svc/svc_backend.py b/frontend/src/svc/svc_backend.py index b2b68b4..0976f54 100644 --- a/frontend/src/svc/svc_backend.py +++ b/frontend/src/svc/svc_backend.py @@ -1,6 +1,7 @@ import os import json import requests +import streamlit as st from utils import logger BACKEND_SERVICE = os.getenv('BACKEND_SERVICE', 'http://backend:8000/api/v1') @@ -21,4 +22,19 @@ def create_monitor(monitor_type, monitor_name, monitor_body, timeout, frequency, 'alerts': alerts } res = requests.post(url, data=json.dumps(monitor_data), headers={'Content-Type': 'application/json'}) + + # clear cache if successful + if res.status_code == 200: + fetch_monitors.clear() + return res.json() + +@st.cache_data(ttl=300) +def fetch_monitors(org_id): + url = f'{BACKEND_SERVICE}/fetch/monitor?org_id={org_id}' + res = requests.get(url) + if res.status_code != 200: + return [] + + data = res.json()['data'] + return data diff --git a/frontend/src/utils.py b/frontend/src/utils.py index 87188cc..95243c9 100644 --- a/frontend/src/utils.py +++ b/frontend/src/utils.py @@ -7,6 +7,7 @@ from rich.logging import RichHandler import shlex +import requests from urllib.parse import urlparse, parse_qs # load environment variables @@ -94,3 +95,12 @@ def parse_curl_command(curl_command): 'params': params, 'body': body } + + +def test_monitor_config(monitor_body: dict): + try: + res = requests.request(monitor_body.get('method'), monitor_body.get('url'), headers=monitor_body.get('headers'), params=monitor_body.get('params'), data=monitor_body.get('body')) + st.write(f"Response: {res.status_code} | {res.reason}") + st.write(res.json()) + except Exception as e: + st.error(f"Invalid Monitor config \n\n {e.args}")