Skip to content

Commit

Permalink
Session management created by OML in UDF
Browse files Browse the repository at this point in the history
  • Loading branch information
aosingh committed Nov 21, 2023
1 parent 9d95bbe commit 9b57970
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 36 deletions.
37 changes: 23 additions & 14 deletions dbt/include/oracle/macros/materializations/python_model/python.sql
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,23 @@
{% macro py_script_postfix(model) %}
def main(action, client_identifier, clientinfo, module):
import oml
try:
connection = oml.core.methods._get_conn()
except Exception:
print("Exception getting connection object from OML client")
else:
session_info = {"action": action,
"client_identifier": client_identifier,
"clientinfo": clientinfo,
"module": module}
for k, v in session_info.items():
try:
setattr(connection, k, v)
except AttributeError:
print(f"Python driver does not support setting {k}")
def set_connection_attributes():
try:
connection = oml.core.methods._get_conn()
except Exception:
raise
else:
session_info = {"action": action,
"client_identifier": client_identifier,
"clientinfo": clientinfo,
"module": module}
for k, v in session_info.items():
try:
setattr(connection, k, v)
except AttributeError:
print(f"Python driver does not support setting {k}")

set_connection_attributes()

import pandas as pd
{{ build_ref_function(model ) }}
Expand Down Expand Up @@ -101,6 +104,12 @@ def main(action, client_identifier, clientinfo, module):
self.this = this()
self.is_incremental = {{ is_incremental() }}

def materialize(df, table, session):
if isinstance(df, pd.core.frame.DataFrame):
oml.create(df, table=table)
elif isinstance(df, oml.core.frame.DataFrame):
df.materialize(table=table)

{{ model.raw_code | indent(width=4, first=False, blank=True)}}


Expand Down
44 changes: 22 additions & 22 deletions dbt/include/oracle/macros/materializations/table/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,26 @@

{% macro py_write_table(compiled_code, target_relation, temporary=False) %}
{{ compiled_code.replace(model.raw_code, "", 1) }}
def materialize(df, table, session):
if isinstance(df, pd.core.frame.DataFrame):
oml.create(df, table=table)
elif isinstance(df, oml.core.frame.DataFrame):
df.materialize(table=table)

dbt = dbtObj(load_df_function=oml.sync)
final_df = model(dbt, session=oml)

{{ log("Python model materialization is " ~ model.config.materialized, info=True) }}
{% if model.config.materialized.lower() == 'table' %}
table_name = f"{dbt.this.identifier}__dbt_tmp"
{% else %}
# incremental materialization
{% if temporary %}
table_name = "{{target_relation.identifier}}"
{% else %}
table_name = dbt.this.identifier
{% endif %}
{% endif %}
materialize(final_df, table=table_name.upper(), session=oml)
return pd.DataFrame.from_dict({"result": [1]})
try:
dbt = dbtObj(load_df_function=oml.sync)
set_connection_attributes()
final_df = model(dbt, session=oml)
{{ log("Python model materialization is " ~ model.config.materialized, info=True) }}
{% if model.config.materialized.lower() == 'table' %}
table_name = f"{dbt.this.identifier}__dbt_tmp"
{% else %}
# incremental materialization
{% if temporary %}
table_name = "{{target_relation.identifier}}"
{% else %}
table_name = dbt.this.identifier
{% endif %}
{% endif %}
materialize(final_df, table=table_name.upper(), session=oml)
return pd.DataFrame.from_dict({"result": [1]})
except Exception:
raise
finally:
connection = oml.core.methods._get_conn()
connection.close()
{% endmacro %}
7 changes: 7 additions & 0 deletions dbt_adbs_test_project/models/sales_py.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
def model(dbt, session):
dbt.config(materialized="table")
dbt.config(async_flag=True)
dbt.config(timeout=1800)
# oml.core.DataFrame referencing a dbt-sql model
sales = session.sync(query="SELECT * FROM SH.SALES")
return sales

0 comments on commit 9b57970

Please sign in to comment.