Skip to content

Commit

Permalink
Merge pull request #737 from openedx/bmtcril/pop_query_context
Browse files Browse the repository at this point in the history
fix: Remove query context after asset import
  • Loading branch information
Cristhian Garcia authored Apr 22, 2024
2 parents f7cdf70 + ca4ee49 commit 79d05b6
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
RUN --mount=type=cache,target=/openedx/.cache/pip,sharing=shared \
pip install "platform-plugin-aspects==v0.7.0"
pip install "platform-plugin-aspects==v0.7.2"
RUN --mount=type=cache,target=/openedx/.cache/pip,sharing=shared \
pip install "edx-event-routing-backends==v9.0.0"
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
"""Import a list of assets from a yaml file and create them in the superset assets folder."""
import os
import uuid

import yaml
from superset.app import create_app

app = create_app()
app.app_context().push()

import logging
import uuid
import yaml
from copy import deepcopy
from pathlib import Path

from superset import security_manager
from superset.examples.utils import load_configs_from_directory
from superset.extensions import db
from superset.models.dashboard import Dashboard
from superset.models.slice import Slice
from superset.connectors.sqla.models import SqlaTable
from superset.utils.database import get_or_create_db
from superset.models.embedded_dashboard import EmbeddedDashboard
from pythonpath.localization import get_translation
from pythonpath.create_row_level_security import create_rls_filters


logger = logging.getLogger("create_assets")

BASE_DIR = "/app/assets/superset"

ASSET_FOLDER_MAPPING = {
Expand Down Expand Up @@ -235,6 +240,16 @@ def import_assets():
force_data=False,
)

# Query contexts use slice IDs instead of UUIDs, which breaks for us
# especially in translated datasets. We do need them for the
# performance_metric script however, so we keep them in the assets.
# This just blanks them in the database after import, which forces a
# query to get the assets instead of using the query context.
for o in db.session.query(Slice).all():
if o.query_context:
o.query_context = None
db.session.commit()


def update_dashboard_roles(roles):
"""Update the roles of the dashboards"""
Expand All @@ -249,7 +264,7 @@ def update_dashboard_roles(roles):

for dashboard_uuid, role_ids in roles.items():
dashboard = db.session.query(Dashboard).filter_by(uuid=dashboard_uuid).one()
print("Importing dashboard roles", dashboard_uuid, role_ids)
logger.info(f"Importing dashboard roles: {dashboard_uuid} - {role_ids}")
dashboard.roles = role_ids
if owners:
dashboard.owners = owners
Expand All @@ -270,10 +285,10 @@ def update_embeddable_uuids():

def create_embeddable_dashboard_by_slug(dashboard_slug, embeddable_uuid):
"""Create an embeddable dashboard by slug"""
print(f"Creating embeddable dashboard {dashboard_slug}, {embeddable_uuid}")
logger.info(f"Creating embeddable dashboard {dashboard_slug}, {embeddable_uuid}")
dashboard = db.session.query(Dashboard).filter_by(slug=dashboard_slug).first()
if dashboard is None:
print(f"WARNING: Dashboard {dashboard_slug} not found")
logger.info(f"WARNING: Dashboard {dashboard_slug} not found")
return

embedded_dashboard = db.session.query(EmbeddedDashboard).filter_by(dashboard_id=dashboard.id).first()
Expand All @@ -287,13 +302,13 @@ def create_embeddable_dashboard_by_slug(dashboard_slug, embeddable_uuid):

def update_datasets():
"""Update the datasets"""
print("Refreshing datasets")
logger.info("Refreshing datasets")
if {{SUPERSET_REFRESH_DATASETS}}:
datasets = (
db.session.query(SqlaTable).all()
)
for dataset in datasets:
print(f"Refreshing dataset {dataset.table_name}")
logger.info(f"Refreshing dataset {dataset.table_name}")
dataset.fetch_metadata(commit=True)


Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import sys
"""
Gather performance metrics on Superset chart queries.
from superset.app import create_app
Reads the queries from the superset database, and enriches them with the
query_context from the asset files. The query_context cannot be stored in the
database on import due to is using database primary keys which do not match
across Superset installations.
"""

app = create_app()
app.app_context().push()
from create_assets import BASE_DIR, ASSET_FOLDER_MAPPING, app

import json
import logging
import os
import time
import uuid
from datetime import datetime
from unittest.mock import patch

import click
import sqlparse
import yaml
from flask import g
from superset import security_manager
from superset.commands.chart.data.get_data_command import ChartDataCommand
Expand All @@ -27,6 +33,7 @@
ASPECTS_VERSION = "{{ASPECTS_VERSION}}"
UUID = str(uuid.uuid4())[0:6]
RUN_ID = f"aspects-{ASPECTS_VERSION}-{UUID}"
CHART_PATH = "/app/openedx-assets/assets/charts/"

report_format = "{i}. {slice}\n" "Superset time: {superset_time} (s).\n"

Expand Down Expand Up @@ -64,10 +71,12 @@ def performance_metrics(course_key, print_sql):
.all()
)
report = []
query_contexts = get_query_contexts_from_assets()
for dashboard in dashboards:
logger.info(f"Dashboard: {dashboard.slug}")
for slice in dashboard.slices:
result = measure_chart(slice, extra_filters)
query_context = get_slice_query_context(slice, query_contexts)
result = measure_chart(slice, query_context)
if not result:
continue
for query in result["queries"]:
Expand All @@ -78,16 +87,33 @@ def performance_metrics(course_key, print_sql):

logger.info("Waiting for clickhouse log...")
time.sleep(20)
get_query_log_from_clickhouse(report, print_sql)
get_query_log_from_clickhouse(report, query_contexts, print_sql)
return report


def measure_chart(slice, extra_filters=[]):
"""
Measure the performance of a chart and return the results.
"""
logger.info(f"Fetching slice data: {slice}")
query_context = json.loads(slice.query_context)
def get_query_contexts_from_assets():
query_contexts = {}

for root, dirs, files in os.walk(CHART_PATH):
for file in files:
if not file.endswith(".yaml"):
continue

path = os.path.join(root, file)
with open(path, "r") as file:
asset = yaml.safe_load(file)
if "query_context" in asset and asset["query_context"]:
query_contexts[asset["uuid"]] = json.loads(asset["query_context"])

logger.info(f"Found {len(query_contexts)} query contexts")
return query_contexts

def get_slice_query_context(slice, query_contexts, extra_filters=[]):
query_context = query_contexts.get(str(slice.uuid), {})
if not query_context:
logger.info(f"SLICE {slice} has no query context! {slice.uuid}")
logger.info(query_contexts.keys())

query_context.update(
{
"result_format": "json",
Expand All @@ -104,6 +130,15 @@ def measure_chart(slice, extra_filters=[]):
for query in query_context["queries"]:
query["filters"]+=extra_filters

return query_context


def measure_chart(slice, query_context):
"""
Measure the performance of a chart and return the results.
"""
logger.info(f"Fetching slice data: {slice}")

g.user = security_manager.find_user(username="{{SUPERSET_ADMIN_USERNAME}}")
query_context = ChartDataQueryContextSchema().load(query_context)
command = ChartDataCommand(query_context)
Expand All @@ -121,7 +156,7 @@ def measure_chart(slice, extra_filters=[]):
return result


def get_query_log_from_clickhouse(report, print_sql):
def get_query_log_from_clickhouse(report, query_contexts, print_sql):
"""
Get the query log from clickhouse and print the results.
"""
Expand All @@ -130,13 +165,12 @@ def get_query_log_from_clickhouse(report, print_sql):

slice = db.session.query(Slice).filter(Slice.uuid == chart_uuid).one()

query_context = json.loads(slice.query_context)
query_context = get_slice_query_context(slice, query_contexts)
query_context["queries"][0]["filters"].append(
{"col": "http_user_agent", "op": "==", "val": RUN_ID}
)
slice.query_context = json.dumps(query_context)

ch_chart_result = measure_chart(slice)
ch_chart_result = measure_chart(slice, query_context)

clickhouse_queries = {}
for query in ch_chart_result["queries"]:
Expand All @@ -145,7 +179,7 @@ def get_query_log_from_clickhouse(report, print_sql):
clickhouse_queries[parsed_sql] = row

if print_sql:
print("ClickHouse SQL: ")
logger.info("ClickHouse SQL: ")
logger.info(parsed_sql)

# Sort report by slowest queries
Expand All @@ -167,7 +201,7 @@ def get_query_log_from_clickhouse(report, print_sql):
)

if print_sql:
print("Superset SQL: ")
logger.info("Superset SQL: ")
logger.info(parsed_sql)

clickhouse_report = clickhouse_queries.get(parsed_sql, {})
Expand Down

0 comments on commit 79d05b6

Please sign in to comment.