Skip to content

Commit

Permalink
feat: migrate aspects dictionaries to dbt
Browse files Browse the repository at this point in the history
feat: migrate aspects dictionaries to dbt
  • Loading branch information
Ian2012 committed Apr 8, 2024
1 parent 99564a7 commit 0b56a57
Show file tree
Hide file tree
Showing 3 changed files with 374 additions and 2 deletions.
2 changes: 1 addition & 1 deletion tutoraspects/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@
# For now we are pulling this from github, which should allow maximum
# flexibility for forking, running branches, specific versions, etc.
("DBT_REPOSITORY", "https://github.com/openedx/aspects-dbt"),
("DBT_BRANCH", "v3.12.0"),
("DBT_BRANCH", "cag/migrate-dictionaries"),
("DBT_SSH_KEY", ""),
("DBT_STATE_DIR", "/app/aspects/dbt_state/"),
# This is a pip compliant list of Python packages to install to run dbt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ aspects: # this needs to match the profile in your dbt_project.yml file
type: clickhouse
schema: {{ DBT_PROFILE_TARGET_DATABASE }}
host: {{ CLICKHOUSE_HOST }}
port: {{ CLICKHOUSE_INTERNAL_HTTP_PORT }}
port: {{ CLICKHOUSE_INTERNAL_NATIVE_PORT }}
user: {{ CLICKHOUSE_ADMIN_USER }}
password: '{{ CLICKHOUSE_ADMIN_PASSWORD }}'
cluster: {{ CLICKHOUSE_CLUSTER_NAME }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,372 @@
"""
Migrate dictionaries to DBT.
"""
from alembic import op


revision = "0035"
down_revision = "0034"
branch_labels = None
depends_on = None
on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else ""
engine = (
"ReplicatedReplacingMergeTree"
if "{{CLICKHOUSE_CLUSTER_NAME}}"
else "ReplacingMergeTree"
)

def upgrade():
op.execute(
f"""
DROP DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.user_pii
{on_cluster}
"""
)
op.execute(
f"""
DROP DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.user_pii_dict
{on_cluster}
"""
)
op.execute(
f"""
DROP DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_block_names
{on_cluster}
"""
)
op.execute(
f"""
DROP DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_block_names_dict
{on_cluster}
"""
)
op.execute(
f"""
DROP DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_names
{on_cluster}
"""
)
op.execute(
f"""
DROP DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_names_dict
{on_cluster}
"""
)





def downgrade():
## Course Names
op.execute(
f"""
DROP DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.user_pii
{on_cluster}
"""
)
op.execute(
f"""
DROP DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.user_pii_dict
{on_cluster}
"""
)
op.execute(
f"""
DROP DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_block_names
{on_cluster}
"""
)
op.execute(
f"""
DROP DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_block_names_dict
{on_cluster}
"""
)
op.execute(
f"""
DROP DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_names
{on_cluster}
"""
)
op.execute(
f"""
DROP DICTIONARY IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.course_names_dict
{on_cluster}
"""
)

op.execute(
f"""
CREATE DICTIONARY {{ ASPECTS_EVENT_SINK_DATABASE }}.course_names_dict
{on_cluster}
(
course_key String,
course_name String,
course_run String,
org String
)
PRIMARY KEY course_key
SOURCE(CLICKHOUSE(
user '{{ CLICKHOUSE_ADMIN_USER }}'
password '{{ CLICKHOUSE_ADMIN_PASSWORD }}'
db '{{ ASPECTS_EVENT_SINK_DATABASE }}'
query 'with most_recent_overviews as (
select org, course_key, max(modified) as last_modified
from {{ ASPECTS_EVENT_SINK_DATABASE }}.course_overviews
group by org, course_key
)
select
course_key,
display_name,
splitByString(\\'+\\', course_key)[-1] as course_run,
org
from {{ ASPECTS_EVENT_SINK_DATABASE }}.course_overviews co
inner join most_recent_overviews mro on
co.org = mro.org and
co.course_key = mro.course_key and
co.modified = mro.last_modified
'
))
LAYOUT(COMPLEX_KEY_HASHED())
LIFETIME(120);
"""
)
op.execute(
f"""
CREATE TABLE {{ ASPECTS_EVENT_SINK_DATABASE }}.course_names
{on_cluster}
(
course_key String,
course_name String,
course_run String,
org String
) engine = Dictionary({{ ASPECTS_EVENT_SINK_DATABASE }}.course_names_dict);
"""
)

## Course Block Names
op.execute(
f"""
CREATE TABLE IF NOT EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.{{ ASPECTS_EVENT_SINK_RECENT_BLOCKS_TABLE }}
{on_cluster}
(
location String NOT NULL,
display_name String NOT NULL,
display_name_with_location String NOT NULL,
section Int32,
subsection Int32,
unit Int32,
graded Bool,
course_key String,
dump_id UUID NOT NULL,
time_last_dumped String NOT NULL
) engine = {engine}
PRIMARY KEY (location);
"""
)

op.execute(
f"""
create materialized view if not exists {{ ASPECTS_EVENT_SINK_DATABASE }}.{{ ASPECTS_EVENT_SINK_RECENT_BLOCKS_MV }}
{on_cluster}
to {{ ASPECTS_EVENT_SINK_DATABASE }}.{{ ASPECTS_EVENT_SINK_RECENT_BLOCKS_TABLE }} as
select
location,
display_name,
toString(section) || ':' || toString(subsection) || ':' || toString(unit) || ' - ' || display_name as display_name_with_location,
JSONExtractInt(xblock_data_json, 'section') as section,
JSONExtractInt(xblock_data_json, 'subsection') as subsection,
JSONExtractInt(xblock_data_json, 'unit') as unit,
JSONExtractBool(xblock_data_json, 'graded') as graded,
course_key,
dump_id,
time_last_dumped
from {{ ASPECTS_EVENT_SINK_DATABASE }}.{{ ASPECTS_EVENT_SINK_NODES_TABLE }}
"""
)

op.execute(
"""
insert into {{ ASPECTS_EVENT_SINK_DATABASE }}.{{ ASPECTS_EVENT_SINK_RECENT_BLOCKS_TABLE }} (
location, display_name, display_name_with_location, section, subsection, unit, graded, course_key, dump_id, time_last_dumped
)
select
location,
display_name,
toString(section) || ':' || toString(subsection) || ':' || toString(unit) || '- ' || display_name as display_name_with_location,
JSONExtractInt(xblock_data_json, 'section') as section,
JSONExtractInt(xblock_data_json, 'subsection') as subsection,
JSONExtractInt(xblock_data_json, 'unit') as unit,
JSONExtractBool(xblock_data_json, 'graded') as graded,
course_key,
dump_id,
time_last_dumped
from {{ ASPECTS_EVENT_SINK_DATABASE }}.{{ ASPECTS_EVENT_SINK_NODES_TABLE }};
"""
)

op.execute(
f"""
CREATE DICTIONARY {{ ASPECTS_EVENT_SINK_DATABASE }}.course_block_names_dict
{on_cluster}
(
location String,
block_name String,
course_key String,
graded Bool,
display_name_with_location String
)
PRIMARY KEY location
SOURCE(CLICKHOUSE(
user '{{ CLICKHOUSE_ADMIN_USER }}'
password '{{ CLICKHOUSE_ADMIN_PASSWORD }}'
db '{{ ASPECTS_EVENT_SINK_DATABASE }}'
query "
select
location,
display_name,
course_key,
graded,
display_name_with_location
from {{ ASPECTS_EVENT_SINK_DATABASE }}.{{ ASPECTS_EVENT_SINK_RECENT_BLOCKS_TABLE }}
final
"
))
LAYOUT(COMPLEX_KEY_SPARSE_HASHED())
LIFETIME(120);
"""
)

op.execute(
f"""
CREATE OR REPLACE TABLE {{ ASPECTS_EVENT_SINK_DATABASE }}.course_block_names
{on_cluster}
(
location String,
block_name String,
course_key String,
graded Bool,
display_name_with_location String
) engine = Dictionary({{ ASPECTS_EVENT_SINK_DATABASE }}.course_block_names_dict)
;
"""
)

## User PII
op.execute(
f"""
CREATE DICTIONARY {{ ASPECTS_EVENT_SINK_DATABASE }}.user_pii_dict
{on_cluster}
(
user_id Int32,
external_user_id UUID,
external_id_type String,
username String,
name String,
meta String,
courseware String,
language String,
location String,
year_of_birth String,
gender String,
level_of_education String,
mailing_address String,
city String,
country String,
state String,
goals String,
bio String,
profile_image_uploaded_at String,
phone_number String
)
PRIMARY KEY (user_id, external_user_id)
SOURCE(CLICKHOUSE(
user '{{ CLICKHOUSE_ADMIN_USER }}'
password '{{ CLICKHOUSE_ADMIN_PASSWORD }}'
db '{{ ASPECTS_EVENT_SINK_DATABASE }}'
query "
with most_recent_user_profile as (
select
user_id,
name,
meta,
courseware,
language,
location,
year_of_birth,
gender,
level_of_education,
mailing_address,
city,
country,
state,
goals,
bio,
profile_image_uploaded_at,
phone_number,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY (id, time_last_dumped) DESC) as rn
from {{ ASPECTS_EVENT_SINK_DATABASE }}.{{ ASPECTS_EVENT_SINK_USER_PROFILE_TABLE }}
)
select
mrup.user_id as user_id,
external_user_id,
external_id_type,
username,
name,
meta,
courseware,
language,
location,
year_of_birth,
gender,
level_of_education,
mailing_address,
city,
country,
state,
goals,
bio,
profile_image_uploaded_at,
phone_number
FROM {{ ASPECTS_EVENT_SINK_DATABASE }}.{{ ASPECTS_EVENT_SINK_EXTERNAL_ID_TABLE }} ex
RIGHT OUTER JOIN most_recent_user_profile mrup ON
mrup.user_id = ex.user_id and (
ex.external_id_type = 'xapi' OR
ex.external_id_type is NULL
)
WHERE mrup.rn = 1
"
))
LAYOUT(COMPLEX_KEY_SPARSE_HASHED())
LIFETIME({{ ASPECTS_PII_CACHE_LIFETIME }})
"""
)
op.execute(
f"""
CREATE TABLE IF NOT EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.user_pii
{on_cluster}
(
user_id Int32,
external_user_id UUID,
external_id_type String,
username String,
name String,
meta String,
courseware String,
language String,
location String,
year_of_birth String,
gender String,
level_of_education String,
mailing_address String,
city String,
country String,
state String,
goals String,
bio String,
profile_image_uploaded_at String,
phone_number String
) engine = Dictionary({{ ASPECTS_EVENT_SINK_DATABASE }}.user_pii_dict);
"""
)

0 comments on commit 0b56a57

Please sign in to comment.