diff --git a/notebooks/Perf Lab.ipynb b/notebooks/Perf Lab.ipynb new file mode 100644 index 00000000..6d4acd74 --- /dev/null +++ b/notebooks/Perf Lab.ipynb @@ -0,0 +1 @@ +{"cells":[{"cell_type":"markdown","source":["### Install the latest .whl package\n","\n","Check [here](https://pypi.org/project/semantic-link-labs/) to see the latest version."],"metadata":{"nteract":{"transient":{"deleting":false}}},"id":"5c27dfd1-4fe0-4a97-92e6-ddf78889aa93"},{"cell_type":"code","source":["# %pip install semantic-link-labs\n","%pip install \"builtin/semantic_link_labs-0.9.1-py3-none-any.whl\""],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"outputs_hidden":true,"source_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"d5cae9db-cef9-48a8-a351-9c5fcc99645c"},{"cell_type":"markdown","source":["### Requirements\n","* Fabric Capacity with XMLA read/write enabled\n"," * A Fabric Trial Capacity is sufficient for evaluation.\n"," * The XMLA Endpoint must be read/write enabled because the perf lab provisions semantic models automatically.\n","* Fabric Permissions\n"," * User must have permissions to create workspaces, lakehouses, and semantic models. This notebook provisions sample resources to demonstrate the use of a perf lab.\n"," * User should have access to a Fabric capacity. This notebook provisions workspaces, lakehouses, and semantic models on a Fabric capacity.\n"," * Connect this notebook to a lakehouse without a schema to persist test definitions and test results. Although strictly not a requirement, it eliminates the need to provide the name and Id of a disconnected lakehouse.\n","\n","### Result\n","* A master and test workspaces, lakehouses, and semantic models are created to establish a perf lab\n"," * The master workspace contains a lakehouse and a sample semantic model in Direct Lake on OneLake mode that uses the lakehouse as its data source. \n"," * The test workspace contains semantic models cloned from the sample semantic model in the master workspace.\n"," * Various Delta tables are created in the lakehouse connected to this notebook to persist test definitions, table analysis, and test results.\n"," * The resources in the master workspace and in the test workspace are deprovisioned upon completion of the perf lab. Delete the workspaces manually.\n","* The names of the newly created resources can be adjusted to customize the perf lab.\n"],"metadata":{},"id":"2856d26d"},{"cell_type":"markdown","source":["### Import the library and set global notebook parameters\n","\n","This notebook deploys lakehouses and semantic models across different workspaces, but the resources can also be hosted together in a centralized workspace. The master workspace contains a lakehouse with sample data, used as the data source for the sample semantic models in Direct Lake on OneLake mode. The master semantic model serves as a template for the actual test models, which this notebook provisions prior to running the performance tests by cloning the master semantic model."],"metadata":{},"id":"b195eae8"},{"cell_type":"code","source":["import sempy_labs.perf_lab as perf_lab\n","\n","\n","master_workspace = 'Perf Lab Master' # Enter the name of the master workspace.\n","lakehouse = 'SampleLakehouse' # Enter the name of the lakehouse used as the data source.\n","master_dataset = 'Master Semantic Model' # Enter the name of the master semantic model.\n","\n","test_workspace = 'Perf Lab Testing' # Enter the name of the workspace for the semantic model clone.\n","target_dataset_prefix = 'Test Model_' # Enter the common part of the name for all semantic model clones.\n","test_dataset_A = target_dataset_prefix + 'A' # Enter the name of the first semantic model clone.\n","test_dataset_B = target_dataset_prefix + 'B' # Enter the name of the second semantic model clone.\n","\n","capacity_id = None # The Id of the capacity for the workspaces. \n"," #Leave this as None to use the capacity of the attached lakehouse or perf lab notebook.\n"," \n","test_definitions_tbl = 'TestDefinitions' # The name of the table in the notebook-attached lakehouse to store the test definitions.\n","column_segments_tbl = 'StorageTableColumnSegments' # The name of the table in the notebook-attached lakehouse to store the test definitions.\n","trace_events_tbl = \"TraceEvents\" # The name of the table in the notebook-attached lakehouse to store the captured trace events."],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"1344e286"},{"cell_type":"markdown","source":["### Provision master workspace, lakehouse, and semantic model with sample data\n","A sample lakehouse can be provisioned by calling the provision_perf_lab_lakehouse() function with the provision_sample_semantic_model() table-generator functions. If you want to customize the table generation, use the source code for the _get_sample_tables_property_bag() and provision_sample_delta_tables() functions as a starting point. The sample semantic model uses the sample lakehouse, but you can bring your own semantic model and set the name and id accordingly."],"metadata":{"nteract":{"transient":{"deleting":false}}},"id":"5a3fe6e8-b8aa-4447-812b-7931831e07fe"},{"cell_type":"code","source":["# Create a workspace and a lakehouse.\n","# Keep track of the workspace_id and lakehouse_id for subsequent function calls.\n","\n","(master_workspace_id, lakehouse_id) = perf_lab.provision_perf_lab_lakehouse(\n"," workspace = master_workspace, \n"," lakehouse = lakehouse,\n"," table_properties=perf_lab._get_sample_tables_property_bag(fact_rows_in_millions = 10),\n"," table_generator=perf_lab.provision_sample_delta_tables,\n"," capacity_id = capacity_id\n",") \n","\n","# Create a master semantic model.\n","# Keep track of name and Id for subsequent function calls.\n","(master_dataset_name, master_dataset_id) = perf_lab.provision_sample_semantic_model(\n"," workspace = master_workspace_id, \n"," lakehouse=lakehouse_id, \n"," semantic_model_name = master_dataset\n"," )"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"3655dd88"},{"cell_type":"markdown","source":["### Generate sample test definitions using sample queries\n","The sample test definition leverage some predefined sample queries that work with the sample semantic models provisioned by using the provision_sample_semantic_model() function. You can also use the _get_test_definitions_from_trace_events() function to generate the test definitions from a Profiler trace as demonstrated in a subsequent cell. "],"metadata":{},"id":"175a59b8"},{"cell_type":"code","source":["# _sample_queries is a dictionary with a query name or Id as the key and the query text as the value.\n","# The _get_test_definitions() functions generates test definitions to execute all _sample_queries against the same test semantic model.\n","\n","test_definitions_A = perf_lab._get_test_definitions(\n"," dax_queries = perf_lab._sample_queries, \n"," target_dataset = test_dataset_A,\n"," target_workspace = test_workspace,\n"," master_dataset = master_dataset_name,\n"," master_workspace = master_workspace_id,\n"," data_source = lakehouse,\n"," data_source_workspace = master_workspace_id,\n"," )\n","\n","test_definitions_B = perf_lab._get_test_definitions(\n"," dax_queries = perf_lab._sample_queries, \n"," target_dataset = test_dataset_B,\n"," target_workspace = test_workspace,\n"," master_dataset = master_dataset_name,\n"," master_workspace = master_workspace_id,\n"," data_source = lakehouse,\n"," data_source_workspace = master_workspace_id,\n"," )\n","\n","# Presisting the results in the notebook-attached lakehouse so that the definitions can easily be retrieved for subsequent runs.\n","# Note that the _save_as_delta_table() functions overwrites any existing test definitions table.\n","\n","perf_lab._save_as_delta_table(\n"," dataframe = test_definitions_A.union(test_definitions_B),\n"," delta_table_name = test_definitions_tbl\n"," )"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"3a7a80ec"},{"cell_type":"markdown","source":["### Generate sample test definitions from trace events\n","As an alternative to the cell above, you can run the _get_test_definitions_from_trace_events() function and interact with the master semantic model using Power BI Desktop in LiveConnect mode or other client tools or reports to generate the test definitions from a Profiler trace. The _get_test_definitions_from_trace_events() function will run in a loop until a customizable timeout expires (5 minutes by default) or until you send an EVALUATE {\"Stop\"} DAX query to the master model."],"metadata":{},"id":"30438799"},{"cell_type":"code","source":["# The _get_test_definitions_from_trace_events() function generates test definitions from trace events against the master model\n","# and associates each query with a separate test semantic model.\n","# Start the trace collection by running the _get_test_definitions_from_trace_events() function, then interact with the model to send DAX queries.\n","# Wait for the timeout to expire to end the trace collection or send an EVALUATE {\"Stop\"} DAX query to exist the trace collection sooner.\n","\n","traced_definitions = perf_lab._get_test_definitions_from_trace_events(\n"," target_dataset_prefix = target_dataset_prefix,\n"," target_workspace = test_workspace,\n"," master_dataset = master_dataset_name,\n"," master_workspace = master_workspace_id,\n"," data_source = lakehouse,\n"," data_source_workspace = master_workspace_id,\n"," timeout = 300\n"," )\n","\n","# Presist the results in the notebook-attached lakehouse so that the definitions can easily be retrieved for subsequent runs.\n","# Note that the _save_as_delta_table() functions overwrites any existing test definitions table.\n","\n","perf_lab._save_as_delta_table(\n"," dataframe = traced_definitions,\n"," delta_table_name = test_definitions_tbl\n"," )\n"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"315c2dc7"},{"cell_type":"markdown","source":["### Provision test semantic models\n","Creating numerous semantic models for testing can easily be accomplished by passing the spark dataframe with the test definitions to the _provision_test_models() function. For every unique combination of 'MasterWorkspace', 'MasterDataset', 'TargetWorkspace', and 'TargetDataset', this function creates the necessary semantic model clones that the test cycles later use to run DAX queries."],"metadata":{},"id":"1d8e73b2"},{"cell_type":"code","source":["# Load the test definitions from the notebook-attached lakehouse.\n","\n","test_definitions = perf_lab._read_delta_table(\n"," delta_table_name = test_definitions_tbl\n"," )\n","\n","# Provision the test models by cloning the master semantic models\n","# in the specified test workspaces according to the test definitions.\n","\n","perf_lab._provision_test_models( \n"," test_definitions = test_definitions,\n"," capacity_id = capacity_id,\n"," refresh_clone = True,\n"," )"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"2854bf8a"},{"cell_type":"markdown","source":["### Prepare a test cycle\n","Text cycle-specific information includes a test run Id and a timestamp, which must be passed along with the test definition so that the text cycle-specific Id and timestamp can be included in the test results for later analysis of individual test runs."],"metadata":{},"id":"c3f497c8"},{"cell_type":"code","source":["# Add a test run Id and a timestamp to all test definitions.\n","\n","test_cycle_definitions = perf_lab._initialize_test_cycle(\n"," test_definitions = test_definitions\n"," )\n","\n","display(test_cycle_definitions)"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"a4f0b5a2"},{"cell_type":"markdown","source":["### Warm up the test models\n","Before updating Delta tables and refreshing Direct Lake models, it is a good idea to simulate semantic models that are currently in use by running all the test queries without tracing. This brings the test semantic models into warm state."],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"3cd3ff4b-cf36-4a0c-991f-55078959afba"},{"cell_type":"code","source":["# Execute all queries in test definitions against their test models\n","# so that all relevant column data is loaded into memory.\n","\n","perf_lab._warmup_test_models(\n"," test_definitions = test_cycle_definitions\n",") "],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"0f5cc1b8-250a-482f-9fa9-1201db929fca"},{"cell_type":"markdown","source":["### Simulate Lakehouse ETL\n","The perf lab has no real ETL pipeline and must therefore rely on a simulated ETL process. The perf lab accomplishes the work with the help of a sample callback function. Refer to the source code if you want to implement your own table update logic."],"metadata":{},"id":"e0db744b"},{"cell_type":"code","source":["# To update Delta tables, determine the list of Delta tables that must be processed. \n","\n","source_table_props = perf_lab.PropertyBag()\n","source_table_props.add_property(\"Prefix\", \"sales\")\n","\n","table_info = perf_lab.get_source_tables(\n"," test_definitions = test_cycle_definitions,\n"," filter_properties = source_table_props,\n"," filter_function = perf_lab._filter_by_prefix\n"," )\n","\n","# Use the sample _filter_by_prefix() callback function \n","# to perform a rolling window update by deleting the oldest DateID\n","# and reinserting it as the newest DateID.\n","# The _filter_by_prefix() callback function expects a property bag\n","# that identifies the DateID column as the key column.\n","delete_reinsert_props = perf_lab.PropertyBag()\n","delete_reinsert_props.add_property(\"key_column\", \"DateID\")\n","\n","perf_lab.simulate_etl(\n"," source_tables_info = table_info,\n"," update_properties = delete_reinsert_props,\n"," update_function = perf_lab._delete_reinsert_rows\n",")"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"0e04d519"},{"cell_type":"markdown","source":["### Analyze Delta tables and semantic model tables\n","To investigate the dependencies and interactions between Delta tables and Direct Lake models in various configurations, the perf lab includes functions to analyze the column segments for each table in the semantic model as well as the parquet files, storage groups, and other information for the Delta tables."],"metadata":{},"id":"67787c3a"},{"cell_type":"code","source":["# Under the covers, calls the INFO.STORAGETABLECOLUMNSEGMENTS DAX function \n","# to retrieve details about the column segments for all model tables listed in tables_info.\n","\n","column_segments = perf_lab.get_storage_table_column_segments(\n"," test_cycle_definitions = test_cycle_definitions,\n"," tables_info = table_info\n"," ) \n","\n","# Insert the results in the notebook-attached lakehouse for later analysis.\n","\n","perf_lab._insert_into_delta_table(\n"," dataframe = column_segments,\n"," delta_table_name = column_segments_tbl\n"," )"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"4efb55b2"},{"cell_type":"markdown","source":["### Run default (incremental), cold, and warm query tests\n","The main purpose of a test run is to measure the performance of a set of DAX queries against the test semantic models with different memory states: Cold (full framing), Semi-warm (incremental framing), and Warm (no framing). Other than running the queries and measuring response times, the run_test_cycle() function must therefore perform additional actions, specifically clearing the cache and refreshing the model."],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"61538c9c-3b80-4364-96c0-2fefbf65710a"},{"cell_type":"code","source":["# Lakehouse ETL was simulated earlier. The Delta tables are updated.\n","# Performing a full refresh triggers incremental framing to reload \n","# only the data that was impacted by the latest updates.\n","\n","inc_results = perf_lab.run_test_cycle(\n"," test_cycle_definitions = test_cycle_definitions,\n"," clear_query_cache = True,\n"," refresh_type = \"full\",\n"," tag = \"incremental\"\n"," )\n","\n","# In order to compare query perf after incremental framing with\n","# truly cold query performance, it is necessary to perform a\n","# clearValues refresh first and then a full refresh. Now,\n","# all of the column data must be loaded again from the Delta tables.\n","\n","cold_results = perf_lab.run_test_cycle(\n"," test_cycle_definitions = test_cycle_definitions,\n"," clear_query_cache = True,\n"," refresh_type = \"clearValuesFull\",\n"," tag = \"cold\"\n"," )\n","\n","# A warm test run is a run without refreshing the model.\n","\n","warm_results = perf_lab.run_test_cycle(\n"," test_cycle_definitions = test_cycle_definitions,\n"," clear_query_cache = True,\n"," refresh_type = None,\n"," tag = \"warm\"\n"," )\n","\n","# The run_test_cycle() functions returns a tuple of a Spark DataFrame with the\n","# captured trace events and a dictionary with the query results. We are only\n","# interested in the trace events. Combine them in one DataFrame and insert them\n","# into a Delta table in the notebook-attached lakehouse.\n","\n","all_trace_events = inc_results[0].union(cold_results[0]).union(warm_results[0])\n","\n","perf_lab._insert_into_delta_table(\n"," dataframe = column_segments,\n"," delta_table_name = trace_events_tbl\n"," )"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"475f4189-7994-41df-8fc0-9129d66a1a98"},{"cell_type":"markdown","source":["### Deprovision perf lab resources\n","The perf lab also provides functions to clean up provisioned resources. However, the perf lab does not delete workspaces to avoid accidental data loss if an existing workspace with unrelated items was used in the perf lab. Note that deprovisioning lakehouses listed in the test definitions does indeed remove these lakehouses with all their tables. Make sure these lakehouses only contain perf lab tables or delete the resources manually."],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"3e62543e-f713-4aa7-aed2-5590124fb098"},{"cell_type":"code","source":["# Delete the master and test models listed in the test definitions.\n","# Set masters_and_clones = False if you only want to delete the test models\n","# but want to keep the master semantic model(s). \n","\n","perf_lab.deprovision_perf_lab_models(\n"," test_definitions = test_cycle_definitions,\n"," masters_and_clones = True\n"," )\n","\n","# Delete the lakehouses listed in the test definitions.\n","\n","perf_lab.deprovision_perf_lab_lakehouses(\n"," test_definitions = test_cycle_definitions\n"," )\n"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"b93c2c61-679c-4df1-bfa9-6d65bac71702"}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"name":"synapse_pyspark","language":"Python","display_name":"Synapse PySpark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"nteract":{"version":"nteract-front-end@1.0.0"},"widgets":{},"synapse_widget":{"state":{},"version":"0.1"},"spark_compute":{"compute_id":"/trident/default","session_options":{"conf":{"spark.synapse.nbs.session.timeout":"1200000"}}},"dependencies":{"lakehouse":{"default_lakehouse":"80666873-cc01-4e04-ad14-3909238a5fbe","default_lakehouse_name":"LabsDevLH","default_lakehouse_workspace_id":"215b4fad-043f-4b53-a580-2bf33d3860d3"}}},"nbformat":4,"nbformat_minor":5} \ No newline at end of file diff --git a/src/sempy_labs/perf_lab/__init__.py b/src/sempy_labs/perf_lab/__init__.py index 66fa317a..b1e56f48 100644 --- a/src/sempy_labs/perf_lab/__init__.py +++ b/src/sempy_labs/perf_lab/__init__.py @@ -7,6 +7,7 @@ _get_measure_table_df, _get_sales_df, _save_as_delta_table, + _insert_into_delta_table, _read_delta_table, _get_sample_tables_property_bag, _generate_onelake_shared_expression, @@ -79,6 +80,7 @@ "_get_measure_table_df", "_get_sales_df", "_save_as_delta_table", + "_insert_into_delta_table", "_read_delta_table", "_get_sample_tables_property_bag", "_generate_onelake_shared_expression", diff --git a/src/sempy_labs/perf_lab/_sample_lab.py b/src/sempy_labs/perf_lab/_sample_lab.py index 20124a07..ac809c39 100644 --- a/src/sempy_labs/perf_lab/_sample_lab.py +++ b/src/sempy_labs/perf_lab/_sample_lab.py @@ -372,6 +372,41 @@ def _save_as_delta_table( dataframe.write.mode("overwrite").format("delta").save(filePath) print(f"{icons.green_dot} Delta table '{delta_table_name}' created and {dataframe.count()} rows inserted.") +def _insert_into_delta_table( + dataframe: DataFrame, + delta_table_name: str, + lakehouse: Optional [str | UUID] = None, + workspace: Optional [str | UUID] = None, +): + """ + Inserts a spark dataframe into a delta table in a Fabric lakehouse. + + Parameters + ---------- + dataframe : DataFrame + The spark dataframe to be inserted into a delta table. + delta_table_name : str + The name of the delta table. + lakehouse : uuid.UUID + The Fabric lakehouse ID. + Defaults to None which resolves to the lakehouse attached to the notebook. + workspace : uuid.UUID + The Fabric workspace ID where the specified lakehouse is located. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + """ + + (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + (lakehouse_name, lakehouse_id) = resolve_lakehouse_name_and_id(lakehouse=lakehouse,workspace=workspace_id) + + filePath = create_abfss_path( + lakehouse_id=lakehouse_id, + lakehouse_workspace_id=workspace_id, + delta_table_name=delta_table_name, + ) + dataframe.write.mode("append").format("delta").save(filePath) + print(f"{icons.green_dot} {dataframe.count()} rows inserted into Delta table '{delta_table_name}'.") + def _read_delta_table( delta_table_name: str, lakehouse: Optional [str | UUID] = None, diff --git a/src/sempy_labs/perf_lab/_simulated_etl.py b/src/sempy_labs/perf_lab/_simulated_etl.py index e02b40a7..56ee1795 100644 --- a/src/sempy_labs/perf_lab/_simulated_etl.py +++ b/src/sempy_labs/perf_lab/_simulated_etl.py @@ -109,7 +109,7 @@ def get_source_tables( data_source_workspace = row['DatasourceWorkspace'] data_source_type = row['DatasourceType'] - # Skip this row if the data_source_type is invalud. + # Skip this row if the data_source_type is invalid. if not data_source_type == "Lakehouse": print(f"{icons.red_dot} Invalid data source type '{data_source_type}' detected. Ignoring this row. Please review your test definitions.") continue @@ -423,19 +423,19 @@ def _sliding_window_update( UpdateTableCallback = Callable[[PropertyBag, PropertyBag], None] def _delete_reinsert_rows( - table_info: Row, + source_table_info: Row, custom_properties: Optional[PropertyBag] = None ) -> None: """ Deletes and reinserts rows in a Delta table and optimizes the Delta table between deletes and inserts. Parameters ---------- - table_info: Row + source_table_info: Row A Spark DataFrame row with the following columns: - +----------+--------------------+--------------+---------------+----------------+-------------------+---------------+------------+----------+--------------+ - | ModelName| ModelWorkspace|ModelTableName| DatasourceName| DatasourceType|DatasourceWorkspace|SourceTableName|SourceFormat|SourceType|SourceLocation| - +----------+--------------------+--------------+---------------+----------------+-------------------+---------------+------------+----------+--------------+ - custom_properties: PropertyBag, default=None + +---------------+----------------+-------------------+---------------+--------------+ + | DatasourceName| DatasourceType|DatasourceWorkspace|SourceTableName|SourceLocation| + +---------------+----------------+-------------------+---------------+--------------+ + custom_properties: PropertyBag, default=None A collection of property values specific to the callback function. Returns @@ -486,7 +486,7 @@ def simulate_etl( None """ if not update_function is None: - for row in source_tables_info.collect(): + for row in source_tables_info.dropDuplicates(["SourceLocation", "DatasourceName", "DatasourceType", "DatasourceWorkspace", "SourceTableName"]).collect(): update_function(row, update_properties) else: raise ValueError("Unable to process tables without an UpdateTableCallback. Please set the update_function parameter.")