Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt the platform for the new api data model #178

Merged
merged 13 commits into from
Jan 29, 2025
167 changes: 109 additions & 58 deletions app/callbacks/data_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,26 @@
# See LICENSE or go to <https://www.apache.org/licenses/LICENSE-2.0> for full license details.

import json
from io import StringIO

import dash
import logging_config
import pandas as pd
from dash import dcc, html
from dash import callback_context, dcc, html
from dash.dependencies import Input, Output, State
from dash.exceptions import PreventUpdate
from main import app
from pyroclient import Client

import config as cfg
from services import api_client, call_api
from utils.data import (
convert_time,
past_ndays_api_events,
process_bbox,
read_stored_DataFrame,
)
from services import api_client, get_token
from utils.data import process_bbox

logger = logging_config.configure_logging(cfg.DEBUG, cfg.SENTRY_DSN)


@app.callback(
[
Output("user_credentials", "data"),
Output("user_headers", "data"),
Output("user_token", "data"),
Output("form_feedback_area", "children"),
Output("username_input", "style"),
Output("password_input", "style"),
Expand All @@ -41,19 +35,19 @@
[
State("username_input", "value"),
State("password_input", "value"),
State("user_headers", "data"),
State("user_token", "data"),
State("language", "data"),
],
)
def login_callback(n_clicks, username, password, user_headers, lang):
def login_callback(n_clicks, username, password, user_token, lang):
"""
Callback to handle user login.

Parameters:
n_clicks (int): Number of times the login button has been clicked.
username (str or None): The value entered in the username input field.
password (str or None): The value entered in the password input field.
user_headers (dict or None): Existing user headers, if any, containing authentication details.
user_token (dict or None): Existing user headers, if any, containing authentication details.

This function is triggered when the login button is clicked. It verifies the provided username and password,
attempts to authenticate the user via the API, and updates the user credentials and headers.
Expand All @@ -80,9 +74,8 @@ def login_callback(n_clicks, username, password, user_headers, lang):
},
}

if user_headers is not None:
if user_token is not None:
return (
dash.no_update,
dash.no_update,
dash.no_update,
input_style_unchanged,
Expand All @@ -104,7 +97,6 @@ def login_callback(n_clicks, username, password, user_headers, lang):

# The login modal remains open; other outputs are updated with arbitrary values
return (
dash.no_update,
dash.no_update,
form_feedback,
input_style_unchanged,
Expand All @@ -116,11 +108,10 @@ def login_callback(n_clicks, username, password, user_headers, lang):
else:
# This is the route of the API that we are going to use for the credential check
try:
client = Client(cfg.API_URL, username, password)
user_token = get_token(username, password)

return (
{"username": username, "password": password},
client.headers,
user_token,
dash.no_update,
hide_element_style,
hide_element_style,
Expand All @@ -133,7 +124,6 @@ def login_callback(n_clicks, username, password, user_headers, lang):
form_feedback.append(html.P(translate[lang]["wrong_credentials"]))

return (
dash.no_update,
dash.no_update,
form_feedback,
input_style_unchanged,
Expand All @@ -147,25 +137,36 @@ def login_callback(n_clicks, username, password, user_headers, lang):


@app.callback(
Output("api_cameras", "data"),
Input("user_token", "data"),
prevent_initial_call=True,
)
def get_cameras(user_token):
logger.info("Get cameras data")
if user_token is not None:
api_client.token = user_token
cameras = pd.DataFrame(api_client.fetch_cameras().json())

return cameras.to_json(orient="split")


@app.callback(
Output("api_sequences", "data"),
[Input("main_api_fetch_interval", "n_intervals"), Input("api_cameras", "data")],
[
Output("store_api_alerts_data", "data"),
],
[Input("main_api_fetch_interval", "n_intervals"), Input("user_credentials", "data")],
[
State("store_api_alerts_data", "data"),
State("user_headers", "data"),
State("api_sequences", "data"),
State("user_token", "data"),
],
prevent_initial_call=True,
)
def api_watcher(n_intervals, user_credentials, local_alerts, user_headers):
def api_watcher(n_intervals, api_cameras, local_sequences, user_token):
"""
Callback to periodically fetch alerts data from the API.

Parameters:
n_intervals (int): Number of times the interval has been triggered.
user_credentials (dict or None): Current user credentials for API authentication.
local_alerts (dict or None): Locally stored alerts data, serialized as JSON.
user_headers (dict or None): Current user headers containing authentication details.
user_token (dict or None): Current user headers containing authentication details.

This function is triggered at specified intervals and when user credentials are updated.
It retrieves unacknowledged events from the API, processes the data, and stores it locally.
Expand All @@ -174,35 +175,85 @@ def api_watcher(n_intervals, user_credentials, local_alerts, user_headers):
Returns:
dash.dependencies.Output: Serialized JSON data of alerts and a flag indicating if data is loaded.
"""
if user_headers is None:
if user_token is None:
raise PreventUpdate
user_token = user_headers["Authorization"].split(" ")[1]
api_client.token = user_token

# Read local data
local_alerts, alerts_data_loaded = read_stored_DataFrame(local_alerts)
logger.info("Start Fetching the events")

# Fetch events
api_alerts = pd.DataFrame(call_api(api_client.get_unacknowledged_events, user_credentials)())
api_alerts["created_at"] = convert_time(api_alerts)
api_alerts = past_ndays_api_events(api_alerts, n_days=0)

if len(api_alerts) == 0:
return [
json.dumps(
{
"data": pd.DataFrame().to_json(orient="split"),
"data_loaded": True,
}
)
]

logger.info("Start Fetching Sequences")
# Fetch Sequences
response = api_client.fetch_latest_sequences()
api_sequences = pd.DataFrame(response.json())

local_sequences = pd.read_json(StringIO(local_sequences), orient="split")
if len(api_sequences) == 0:
return pd.DataFrame().to_json(orient="split")

else:
if not local_sequences.empty:
aligned_api_sequences, aligned_local_sequences = api_sequences["id"].align(local_sequences["id"])
if all(aligned_api_sequences == aligned_local_sequences):
return dash.no_update

return api_sequences.to_json(orient="split")


@app.callback(
[Output("are_detections_loaded", "data"), Output("sequence_on_display", "data"), Output("api_detections", "data")],
[Input("api_sequences", "data"), Input("sequence_id_on_display", "data"), Input("api_detections", "data")],
State("are_detections_loaded", "data"),
prevent_initial_call=True,
)
def load_detections(api_sequences, sequence_id_on_display, api_detections, are_detections_loaded):
# Deserialize data
api_sequences = pd.read_json(StringIO(api_sequences), orient="split")
sequence_id_on_display = str(sequence_id_on_display)
are_detections_loaded = json.loads(are_detections_loaded)
api_detections = json.loads(api_detections)

# Initialize sequence_on_display
sequence_on_display = pd.DataFrame().to_json(orient="split")

# Identify which input triggered the callback
ctx = callback_context
if not ctx.triggered:
raise PreventUpdate

triggered_input = ctx.triggered[0]["prop_id"].split(".")[0]

if triggered_input == "sequence_id_on_display":
# If the displayed sequence changes, load its detections if not already loaded
if sequence_id_on_display not in api_detections:
response = api_client.fetch_sequences_detections(sequence_id_on_display)
detections = pd.DataFrame(response.json())
detections["processed_bboxes"] = detections["bboxes"].apply(process_bbox)
api_detections[sequence_id_on_display] = detections.to_json(orient="split")

sequence_on_display = api_detections[sequence_id_on_display]
last_seen_at = api_sequences.loc[
api_sequences["id"].astype("str") == sequence_id_on_display, "last_seen_at"
].iloc[0]

# Ensure last_seen_at is stored as a string
are_detections_loaded[sequence_id_on_display] = str(last_seen_at)

else:
api_alerts["processed_loc"] = api_alerts["localization"].apply(process_bbox)
if alerts_data_loaded and not local_alerts.empty:
aligned_api_alerts, aligned_local_alerts = api_alerts["alert_id"].align(local_alerts["alert_id"])
if all(aligned_api_alerts == aligned_local_alerts):
return [dash.no_update]
# If no specific sequence is triggered, load detections for the first missing sequence
for _, row in api_sequences.iterrows():
sequence_id = str(row["id"])
last_seen_at = row["last_seen_at"]

if sequence_id not in are_detections_loaded or are_detections_loaded[sequence_id] != str(last_seen_at):
response = api_client.fetch_sequences_detections(sequence_id)
detections = pd.DataFrame(response.json())
detections["processed_bboxes"] = detections["bboxes"].apply(process_bbox)
api_detections[sequence_id] = detections.to_json(orient="split")
are_detections_loaded[sequence_id] = str(last_seen_at)
break

# Clean up old sequences that are no longer in api_sequences
sequences_in_api = api_sequences["id"].astype("str").values
to_drop = [key for key in are_detections_loaded if key not in sequences_in_api]
for key in to_drop:
are_detections_loaded.pop(key, None)

return [json.dumps({"data": api_alerts.to_json(orient="split"), "data_loaded": True})]
# Serialize and return data
return json.dumps(are_detections_loaded), sequence_on_display, json.dumps(api_detections)
Loading
Loading