Skip to content

Commit

Permalink
Add a demo CLI to run during live demos (#5)
Browse files Browse the repository at this point in the history
* rename files

* add typer

* typer first draft

* command to append more raw data

* describe args

* print schemas

* add error handling

* color code

* first draft of tests

* fix patch errors

* fix imports

* ruff format

* passing tests

* couple more tests

* add clear instructions
  • Loading branch information
sungchun12 authored Jul 26, 2024
1 parent 28ed116 commit 668e9f3
Show file tree
Hide file tree
Showing 18 changed files with 672 additions and 185 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"macros"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
"python.testing.pytestEnabled": true,
"editor.wordWrap": "on"
}
73 changes: 67 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,76 @@ Test connection succeeded

## Story #1

How do I run SQLMesh as fast as possible and get the general look and feel?
Run a SQLMesh error simulation for raw data schema evolution.

Run this command!
<details>

```bash
sqlmesh plan # follow the instructions in the CLI prompt
```
```python
# run all these from the root directory

# rename a raw table column, the table should already exist
python demo_scripts/main.py rename-column --old event_name --new named_events

# expected output
# Original Schema:
# event_id: STRING
# event_name: STRING
# event_timestamp: TIMESTAMP
# user_id: STRING

# Column 'event_name' has been renamed to 'named_events' in table sqlmesh-public-demo.tcloud_raw_data.raw_events

# Updated Schema:
# event_id: STRING
# named_events: STRING
# event_timestamp: TIMESTAMP
# user_id: STRING

# run sqlmesh for the incremental_events.sql model
sqlmesh run --ignore-cron

# expected output
# google.api_core.exceptions.BadRequest: 400 GET https://bigquery.googleapis.com/bigquery/v2/projects/sqlmesh-public-demo/queries/0af1142b-cf71-4dc2-aa56-f60b09af777b?maxResults=0&location=US&prettyPrint=false: Unrecognized name: event_name; Did you mean event_id? at
# [1:254]

# Location: US
# Job ID: 0af1142b-cf71-4dc2-aa56-f60b09af777b

More details on the demo video will be added soon!
# look at the error within the Tobiko Cloud UI
# https://sqlmesh-prod-enterprise-public-demo-sefz6ezt4q-uc.a.run.app/observer/environments/prod/runs/8ee6076967e342409d321f8c644282fd

# fix the error
# this command defaults to fixing the error, so no need to add options
python demo_scripts/main.py rename-column

# expected output
# Original Schema:
# event_id: STRING
# named_events: STRING
# event_timestamp: TIMESTAMP
# user_id: STRING

# Column 'named_events' has been renamed to 'event_name' in table sqlmesh-public-demo.tcloud_raw_data.raw_events

# Updated Schema:
# event_id: STRING
# event_name: STRING
# event_timestamp: TIMESTAMP
# user_id: STRING

# rerun sqlmesh for the incremental_events.sql model
sqlmesh run --ignore-cron

# expected output
# [1/1] tcloud_demo.incremental_events_allow_partials evaluated in 7.76s
# [1/1] tcloud_demo.incremental_events evaluated in 13.76s
# Evaluating models ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 2/2 • 0:00:13

# All model batches have been executed successfully

# Run finished for environment 'prod'
```
</details>


### Credits
Expand Down
Empty file added demo_scripts/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions demo_scripts/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import os
import json


def get_service_account_info():
return json.loads(os.environ["GOOGLE_SQLMESH_CREDENTIALS"])
64 changes: 64 additions & 0 deletions demo_scripts/load_raw_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""This is a public demo script to generate demo data"""

import pandas as pd
import uuid
from datetime import datetime
import random
from google.cloud import bigquery

# Define the list of possible event names
event_names = ["page_view", "product_view", "ad_view", "video_view", "blog_view"]


class RawEventLoader:
def __init__(self, credentials, project_id):
self.client = bigquery.Client(credentials=credentials, project=project_id)

def generate_fake_data(self, num_rows: int, end_date: str):
end_date_parsed = datetime.strptime(end_date, "%Y-%m-%d")
data = []
for _ in range(num_rows):
event_id = str(uuid.uuid4())
event_name = random.choice(event_names)
event_timestamp = end_date_parsed
user_id = str(uuid.uuid4())
row = {
"event_id": event_id,
"event_name": event_name,
"event_timestamp": event_timestamp,
"user_id": user_id,
}
data.append(row)
return data

def create_table_if_not_exists(self, dataset_name: str, table_name: str):
dataset_ref = self.client.dataset(dataset_name)
table_ref = dataset_ref.table(table_name)

try:
self.client.get_table(table_ref)
except:
schema = [
bigquery.SchemaField("event_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("event_name", "STRING", mode="NULLABLE"),
bigquery.SchemaField("event_timestamp", "TIMESTAMP", mode="NULLABLE"),
bigquery.SchemaField("user_id", "STRING", mode="NULLABLE"),
]
table = bigquery.Table(table_ref, schema=schema)
table = self.client.create_table(table)
print(f"Created table {table.table_id}")

def append_to_bigquery_table(self, table_name: str, num_rows: int, end_date: str):
dataset_name, table_name = table_name.split(".")

fake_data = self.generate_fake_data(num_rows, end_date)
df = pd.DataFrame(fake_data)

self.create_table_if_not_exists(dataset_name, table_name)

job = self.client.load_table_from_dataframe(df, f"{dataset_name}.{table_name}")
job.result()

print(
f"{num_rows} rows of raw events demo data with date [{end_date}] appended to {dataset_name}.{table_name}"
)
90 changes: 90 additions & 0 deletions demo_scripts/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""
This CLI tool is designed to interact with Google BigQuery
to manage and manipulate raw event data for live demos.
It provides commands to append generated fake data to a specified BigQuery table
and to rename columns in an existing BigQuery table.
The tool uses service account credentials saved in an environment variable for authentication with Google Cloud services.
"""

import sys
from pathlib import Path

# Add the project root to the Python path
project_root = str(Path(__file__).resolve().parent.parent)
sys.path.insert(0, project_root)

import typer
from google.oauth2 import service_account
from demo_scripts.rename_column_util import rename_column_util
from demo_scripts.load_raw_events import RawEventLoader
from datetime import datetime
from demo_scripts.config import get_service_account_info

app = typer.Typer()


@app.command()
def rename_column(
project_name: str = typer.Option(
"sqlmesh-public-demo", help="The Google Cloud project name."
),
dataset_name: str = typer.Option(
"tcloud_raw_data", help="The BigQuery dataset name."
),
table_name: str = typer.Option("raw_events", help="The BigQuery table name."),
old: str = typer.Option(
"named_events", help="The name of the column to be renamed."
),
new: str = typer.Option("event_name", help="The new name for the column."),
):
"""
Rename a column in a BigQuery table to create an error OR fix for a raw table
"""
# Get the service account info securely
service_account_info = get_service_account_info()

# Create credentials object
credentials = service_account.Credentials.from_service_account_info(
service_account_info
)

# Call the rename_column_util with credentials
rename_column_util(
credentials=credentials,
project_name=project_name,
dataset_name=dataset_name,
table_name=table_name,
column_to_rename=old,
new_column_name=new,
)


@app.command()
def append_rawdata(
table_name: str = typer.Option(
"tcloud_raw_data.raw_events",
help="The fully qualified BigQuery table name (dataset.table).",
),
num_rows: int = typer.Option(20, help="The number of rows to append.", min=1),
end_date: str = typer.Option(
datetime.today().strftime("%Y-%m-%d"),
help="End date in YYYY-MM-DD format. Defaults to today's date.",
),
project_id: str = typer.Option(
"sqlmesh-public-demo", help="The Google Cloud project ID."
),
):
"""
Append raw data to a BigQuery table intended to impact the incremental_events.sql model
"""
service_account_info = get_service_account_info()
credentials = service_account.Credentials.from_service_account_info(
service_account_info
)

loader = RawEventLoader(credentials, project_id)
loader.append_to_bigquery_table(table_name, num_rows, end_date)


if __name__ == "__main__":
app()
53 changes: 53 additions & 0 deletions demo_scripts/rename_column_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from google.cloud import bigquery
from google.oauth2 import service_account
import typer


def rename_column_util(
credentials,
project_name: str,
dataset_name: str,
table_name: str,
column_to_rename: str,
new_column_name: str,
):
# Use the provided credentials
client = bigquery.Client(credentials=credentials, project=project_name)

# Define the table
table_id = f"{client.project}.{dataset_name}.{table_name}"

# Get the original schema
table = client.get_table(table_id)
original_schema = "\n".join(
f" {field.name}: {field.field_type}" for field in table.schema
)
typer.echo("\nOriginal Schema:")
typer.echo(original_schema)

# SQL to rename the column
sql = f"ALTER TABLE `{table_id}` RENAME COLUMN {column_to_rename} TO {new_column_name};"

try:
# Execute the SQL
job = client.query(sql)
job.result() # Wait for the job to complete

typer.echo(
f"\nColumn '{typer.style(column_to_rename, fg=typer.colors.RED)}' "
f"has been renamed to '{typer.style(new_column_name, fg=typer.colors.GREEN)}' "
f"in table {table_id}"
)

# Verify the change
table = client.get_table(table_id)
updated_schema = "\n".join(
f" {field.name}: {field.field_type}" for field in table.schema
)
typer.echo("\nUpdated Schema:")
typer.echo(updated_schema)
except Exception as e:
if "Column already exists" in str(e):
print(f"\nError: {e}")
else:
raise
Empty file added demo_scripts/tests/__init__.py
Empty file.
36 changes: 36 additions & 0 deletions demo_scripts/tests/test_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os
import json
import pytest
from demo_scripts.config import get_service_account_info


def test_get_service_account_info():
# Mock the environment variable
mock_credentials = {
"type": "service_account",
"project_id": "test-project",
"private_key_id": "test-key-id",
"private_key": "test-private-key",
"client_email": "test@example.com",
"client_id": "test-client-id",
}
os.environ["GOOGLE_SQLMESH_CREDENTIALS"] = json.dumps(mock_credentials)

# Call the function
result = get_service_account_info()

# Assert the result
assert result == mock_credentials
assert isinstance(result, dict)
assert "type" in result
assert result["type"] == "service_account"


def test_get_service_account_info_missing_env_var():
# Remove the environment variable if it exists
if "GOOGLE_SQLMESH_CREDENTIALS" in os.environ:
del os.environ["GOOGLE_SQLMESH_CREDENTIALS"]

# Assert that the function raises a KeyError
with pytest.raises(KeyError):
get_service_account_info()
Loading

0 comments on commit 668e9f3

Please sign in to comment.