From 9d7585efa850a91445047d0168e30fc6f3446183 Mon Sep 17 00:00:00 2001 From: Kamesh Sampath Date: Wed, 10 Apr 2024 15:52:48 +0530 Subject: [PATCH] (feat!): use dynamic tables --- .envrc.example | 6 ++- README.md | 74 +++++++++++++++++++++--------- etc/config/todolist-snow-sink.json | 2 +- etc/snowflake/acl.sql | 2 +- etc/snowflake/cleanup.sql | 7 ++- etc/snowflake/core.sql | 11 +++++ etc/snowflake/dashboard_data.sql | 1 - etc/snowflake/dynamic_table.sql | 15 ++++++ 8 files changed, 89 insertions(+), 29 deletions(-) create mode 100644 etc/snowflake/core.sql delete mode 100644 etc/snowflake/dashboard_data.sql create mode 100644 etc/snowflake/dynamic_table.sql diff --git a/.envrc.example b/.envrc.example index 4991e99..7ee0c5e 100644 --- a/.envrc.example +++ b/.envrc.example @@ -8,10 +8,14 @@ export TODOAPP_USER= export TODOAPP_USER_PWD= export TODOAPP_USER_ROLE=todoapp_user export TODOAPP_DATABASE= +# table to hold redpanda topic records +export TODO_LIST_TABLE=TODO_LIST +# table that will be used by Streamlit dashboard +export TODOS_TABLE=TODOS # update to different warehouse, this is default for trial accounts export SNOWSQL_WAREHOUSE=COMPUTE_WH # update the schema to use if you are on a different schema, demo code is set to work with this by default, changing this need to update ACL accordingly -export SNOWSQL_SCHEMA="public" +export SNOWSQL_SCHEMA=public ## Redpanda export RPK_BROKERS=localhost:19092 export COMPOSE_PROJECT_NAME=grpc-todo-app diff --git a/README.md b/README.md index e65be35..003139c 100644 --- a/README.md +++ b/README.md @@ -212,14 +212,17 @@ export TODOAPP_USER_RSA_PUBLIC_KEY=$(awk 'NR > 2 {print last} {last=$0}' ORS='' > **IMPORTANT**: The commands in this section will be executed as Snowflake Account Admin user -Create Database `$TODOAPP_DATABASE`, +Create Database `$TODOAPP_DATABASE` and `$TODO_LIST_TABLE`, ```shell snowsql -c admin \ ---query "CREATE DATABASE IF NOT EXISTS $TODOAPP_DATABASE;" +--warehouse "$SNOWSQL_WAREHOUSE" \ +--variable db_name=$TODOAPP_DATABASE \ +--variable todo_list_table="$TODO_LIST_TABLE" \ +--filename etc/snowflake/core.sql ``` -Create the `$TODOAPP_USER` user and provide the requried **GRANTs**, +Create the `$TODOAPP_USER` user and provide the required **GRANTs**, ```shell snowsql -c admin \ @@ -229,6 +232,7 @@ snowsql -c admin \ --variable todo_user_role=$TODOAPP_USER_ROLE \ --variable todoapp_user=$TODOAPP_USER \ --variable todo_pub_key=$TODOAPP_USER_RSA_PUBLIC_KEY \ +--variable todos_table_name=$TODOS_TABLE \ --filename "$DEMO_HOME/etc/snowflake/acl.sql" ``` @@ -248,6 +252,18 @@ snowsql -u $TODOAPP_USER \ --query "SELECT CURRENT_DATE;" ``` +Create a Stream to capture the changes of `$TODO_LIST_TABLE`, + +```shell +snowsql -u "$TODOAPP_USER" \ +--private-key-path="$DEMO_HOME/keys/rsa_key.p8" \ +--warehouse "$SNOWSQL_WAREHOUSE" \ +--dbname "$TODOAPP_DATABASE" \ +--variable stream_name="$TODO_LIST_TABLE"_stream \ +--variable todo_list_table=$TODO_LIST_TABLE \ +--filename etc/snowflake/stream.sql +``` + > **NOTE**: First time login will ask for a password change. ## Kafka Connect API @@ -330,6 +346,7 @@ http --body "$KAFKA_CONNECT_URI/connectors?expand=status" ```shell snowsql -u $TODOAPP_USER \ --private-key-path="$DEMO_HOME/keys/rsa_key.p8" \ +--warehouse "$SNOWSQL_WAREHOUSE" \ --dbname $TODOAPP_DATABASE \ --query "SHOW TERSE TABLES LIKE '%todo%';" ``` @@ -386,46 +403,50 @@ snowsql -u $TODOAPP_USER \ --query "SELECT count(*) from TODO_LIST;" ``` -### Visualizing the Data - -You can now use the synchronized data to build a dashboard using [Streamlit](https://streamlit.io). The demo builds a simple dashboard that shows task by category along with a tabular representation of the tasks data. - -#### Create the Stored Procedure - -Stored Procedure to extract the data the table synchronized with Redpanda topic to any custom table. +Verify the inserted records are captured by the stream ```shell snowsql -u $TODOAPP_USER \ --private-key-path="$DEMO_HOME/keys/rsa_key.p8" \ --warehouse $SNOWSQL_WAREHOUSE \ --dbname $TODOAPP_DATABASE \ ---filename "$DEMO_HOME/etc/snowflake/todos_sp.sql" +--query "SELECT * FROM "$TODO_LIST_TABLE"_STREAM" ``` -#### Prepare Data for Dashboard +You should see all the inserted records on the stream. + +### Visualizing the Data + +You can now use the synchronized data to build a dashboard using [Streamlit](https://streamlit.io). The demo builds a simple dashboard that shows task by category along with a tabular representation of the tasks data. -Extract the data from `TODO_LIST` table and load the same on to `todos` which will be used by Streamlit dashboard, +#### Create Dynamic Table + +The demo will use Snowflake [Dynamic Table](https://docs.snowflake.com/en/user-guide/dynamic-tables-about) to transform the raw records on `$TODO_LIST_TABLE` table to a structure usable for the Streamlit dashboard, ```shell snowsql -u $TODOAPP_USER \ --private-key-path="$DEMO_HOME/keys/rsa_key.p8" \ --warehouse $SNOWSQL_WAREHOUSE \ ---dbname "$TODOAPP_DATABASE" \ ---query "CALL todos('todo_list','todos')" +--dbname $TODOAPP_DATABASE \ +--variable todo_warehouse=$SNOWSQL_WAREHOUSE \ +--variable topic_name=$TOPICS \ +--variable table_name=$TODOS_TABLE \ +--filename "$DEMO_HOME/etc/snowflake/dynamic_table.sql" ``` -Query the `todos` table, +#### Prepare Data for Dashboard + +Query the `$TODOS_TABLE` table, ```shell snowsql -u $TODOAPP_USER \ --private-key-path="$DEMO_HOME/keys/rsa_key.p8" \ --warehouse $SNOWSQL_WAREHOUSE \ --dbname $TODOAPP_DATABASE \ ---query "SELECT * FROM TODOS;" +--query "SELECT * FROM $TODOS_TABLE" ``` -> **NOTE**: You need to refresh the data on the `TODOS` table on every new task added to th Redpanda topic. -> **WIP**: Leveraging streams and dynamic tables +> **NOTE**: `$TODOS_TABLE` gets refreshed within a minute of data updates on `$TODO_LIST_TABLE` #### Use predefined conda environment @@ -467,14 +488,25 @@ snowsql -u $TODOAPP_USER \ --private-key-path="$DEMO_HOME/keys/rsa_key.p8" \ --warehouse $SNOWSQL_WAREHOUSE \ --dbname $TODOAPP_DATABASE \ +--variable todo_list_table=$TODO_LIST_TABLE \ +--variable todos_table_name=$TODOS_TABLE \ --filename "$DEMO_HOME/etc/snowflake/cleanup.sql" ``` +### Drop User + +```shell +snowsql -c admin \ +--warehouse $SNOWSQL_WAREHOUSE \ +--query "DROP USER IF EXISTS $TODOAPP_USER;" +``` + ### Drop Database ```shell -snowsql -c admin --warehouse $SNOWSQL_WAREHOUSE \ ---query "DROP DATABASE IF EXISTS $SNOWSQL_DATABASE;" +snowsql -c admin \ +--warehouse $SNOWSQL_WAREHOUSE \ +--query "DROP DATABASE IF EXISTS $TODOAPP_DATABASE;" ``` ## References diff --git a/etc/config/todolist-snow-sink.json b/etc/config/todolist-snow-sink.json index 1712a5d..afd4262 100644 --- a/etc/config/todolist-snow-sink.json +++ b/etc/config/todolist-snow-sink.json @@ -4,7 +4,7 @@ "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", "tasks.max": "8", "topics": "${TOPICS}", - "snowflake.topic2table.map": "${TOPICS}:TODO_LIST", + "snowflake.topic2table.map": "${TOPICS}:${TODO_LIST_TABLE}", "buffer.count.records": "10", "buffer.flush.time": "10", "buffer.size.bytes": "500", diff --git a/etc/snowflake/acl.sql b/etc/snowflake/acl.sql index f380e87..6ef74e7 100644 --- a/etc/snowflake/acl.sql +++ b/etc/snowflake/acl.sql @@ -36,7 +36,7 @@ GRANT INSERT,DELETE,SELECT,UPDATE,TRUNCATE ON FUTURE TABLES IN SCHEMA &db_name.P -- Ability to create stage(s) GRANT CREATE STAGE ON SCHEMA &db_name.PUBLIC TO ROLE &todo_user_role; --- GRANT OWNERSHIP ON PROCEDURE TODOS(string,string) TO ROLE &todo_user_role; +-- GRANT OWNERSHIP ON DYNAMIC TABLE &todos_table_name TO ROLE &todo_user_role; -- Create Streamlit apps in the PUBLIC schema GRANT CREATE STREAMLIT ON SCHEMA &db_name.PUBLIC TO ROLE &todo_user_role; diff --git a/etc/snowflake/cleanup.sql b/etc/snowflake/cleanup.sql index 2a788d7..e9eaac8 100644 --- a/etc/snowflake/cleanup.sql +++ b/etc/snowflake/cleanup.sql @@ -1,9 +1,8 @@ -- Drop table -DROP TABLE IF EXISTS todo_list; -DROP TABLE IF EXISTS todos; +DROP TABLE IF EXISTS &todo_list_table; --- Stored Procedure -DROP PROCEDURE IF EXISTS TODOS(string,string); +-- Dynamic Table +DROP DYNAMIC TABLE IF EXISTS &todos_table_name; -- Pipe DROP PIPE IF EXISTS SNOWFLAKE_KAFKA_CONNECTOR_TODOLIST_PIPE_TODO_LIST_0; diff --git a/etc/snowflake/core.sql b/etc/snowflake/core.sql new file mode 100644 index 0000000..7d6fcfa --- /dev/null +++ b/etc/snowflake/core.sql @@ -0,0 +1,11 @@ +-- Create Database +CREATE DATABASE IF NOT EXISTS &db_name; + +-- Create table to hold the records from Redpanda todo_list topic +CREATE TABLE IF NOT EXISTS "&db_name".PUBLIC."&todo_list_table" ( + RECORD_METADATA VARIANT, + RECORD_CONTENT VARIANT +); + +-- Enable Change Tracking +ALTER TABLE"&db_name".PUBLIC."&todo_list_table" SET CHANGE_TRACKING = TRUE; diff --git a/etc/snowflake/dashboard_data.sql b/etc/snowflake/dashboard_data.sql deleted file mode 100644 index 9dd9f24..0000000 --- a/etc/snowflake/dashboard_data.sql +++ /dev/null @@ -1 +0,0 @@ -CALL todos('todo_list','&to_table') \ No newline at end of file diff --git a/etc/snowflake/dynamic_table.sql b/etc/snowflake/dynamic_table.sql new file mode 100644 index 0000000..c4029c0 --- /dev/null +++ b/etc/snowflake/dynamic_table.sql @@ -0,0 +1,15 @@ +-- Create dynamic table to capture the records from todo_list_table +CREATE OR REPLACE DYNAMIC TABLE &table_name +TARGET_LAG = '1minute' +WAREHOUSE = '&todo_warehouse' +AS +select + record_metadata:key::string as key, + record_content:title::string as title, + record_content:description::text as description, + record_content:category::string as category, + record_content:status::boolean as status, + record_metadata:CreateTime::bigint as tz + from todo_list + where record_metadata:topic = '&topic_name' + ORDER BY key \ No newline at end of file