diff --git a/.gitignore b/.gitignore index fc13a8d..5174766 100644 --- a/.gitignore +++ b/.gitignore @@ -147,4 +147,5 @@ doc/build.gitbak .venv1.4/ .venv1.5/ .venv1.6/ +.venv*/ dbt_adbs_py_test_project diff --git a/Makefile b/Makefile index fd36472..fbc1ad5 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ # Configuration variables -VERSION=1.5.4 +VERSION=1.5.5 PROJ_DIR?=$(shell pwd) VENV_DIR?=${PROJ_DIR}/.bldenv BUILD_DIR=${PROJ_DIR}/build diff --git a/dbt/adapters/oracle/connections.py b/dbt/adapters/oracle/connections.py index bf75dbf..b255e48 100644 --- a/dbt/adapters/oracle/connections.py +++ b/dbt/adapters/oracle/connections.py @@ -213,7 +213,7 @@ def open(cls, connection): } if oracledb.__name__ == "oracledb": - conn_config['connection_id_prefix'] = 'dbt-oracle-' + conn_config['connection_id_prefix'] = f'dbt-oracle-{dbt_version}-' if credentials.shardingkey: conn_config['shardingkey'] = credentials.shardingkey @@ -240,7 +240,7 @@ def open(cls, connection): handle = oracledb.connect(**conn_config) # session_info is stored in v$session session_info = cls.get_session_info(credentials=credentials) - logger.info(f"Session info :{json.dumps(session_info)}") + logger.debug(f"Session info :{json.dumps(session_info)}") for k, v in session_info.items(): try: setattr(handle, k, v) diff --git a/dbt/adapters/oracle/python_submissions.py b/dbt/adapters/oracle/python_submissions.py index 5a43152..52b5bcb 100644 --- a/dbt/adapters/oracle/python_submissions.py +++ b/dbt/adapters/oracle/python_submissions.py @@ -17,6 +17,8 @@ import http import json from typing import Dict +import uuid +import platform import requests import time @@ -25,6 +27,7 @@ from dbt.adapters.oracle import OracleAdapterCredentials from dbt.events import AdapterLogger from dbt.ui import red, green +from dbt.version import __version__ as dbt_version # ADB-S OML Rest API minimum timeout is 1800 seconds DEFAULT_TIMEOUT_IN_SECONDS = 1800 @@ -140,6 +143,20 @@ def __init__(self, self.oml4py_client = OracleOML4PYClient(oml_cloud_service_url=credential.oml_cloud_service_url, username=credential.user, password=credential.password) + self.session_info = self.get_session_info(credentials=credential) + + @staticmethod + def get_session_info(credentials): + default_action = "DBT RUN OML4PY" + default_client_identifier = f'dbt-oracle-client-{uuid.uuid4()}' + default_client_info = "_".join([platform.node(), platform.machine()]) + default_module = f'dbt-{dbt_version}' + return { + "action": credentials.session_info.get("action", default_action), + "client_identifier": credentials.session_info.get("client_identifier", default_client_identifier), + "clientinfo": credentials.session_info.get("client_info", default_client_info), + "module": credentials.session_info.get("module", default_module) + } def schedule_async_job_and_wait_for_completion(self, data): logger.info(f"Running Python aysnc job using {data}") @@ -196,7 +213,8 @@ def schedule_async_job_and_wait_for_completion(self, data): def __call__(self, *args, **kwargs): data = { - "service": self.service + "service": self.service, + "parameters": json.dumps(self.session_info) } if self.async_flag: data["asyncFlag"] = self.async_flag diff --git a/dbt/include/oracle/macros/materializations/python_model/python.sql b/dbt/include/oracle/macros/materializations/python_model/python.sql index 45d0a37..95c2db9 100644 --- a/dbt/include/oracle/macros/materializations/python_model/python.sql +++ b/dbt/include/oracle/macros/materializations/python_model/python.sql @@ -54,8 +54,26 @@ {% endmacro %} {% macro py_script_postfix(model) %} -def main(): +def main(action, client_identifier, clientinfo, module): import oml + def set_connection_attributes(): + try: + connection = oml.core.methods._get_conn() + except Exception: + raise + else: + session_info = {"action": action, + "client_identifier": client_identifier, + "clientinfo": clientinfo, + "module": module} + for k, v in session_info.items(): + try: + setattr(connection, k, v) + except AttributeError: + pass # ok to be silent, ADB-S Python runtime complains about print statements + + set_connection_attributes() + import pandas as pd {{ build_ref_function(model ) }} {{ build_source_function(model ) }} @@ -86,6 +104,12 @@ def main(): self.this = this() self.is_incremental = {{ is_incremental() }} + def materialize(df, table, session): + if isinstance(df, pd.core.frame.DataFrame): + oml.create(df, table=table) + elif isinstance(df, oml.core.frame.DataFrame): + df.materialize(table=table) + {{ model.raw_code | indent(width=4, first=False, blank=True)}} diff --git a/dbt/include/oracle/macros/materializations/table/table.sql b/dbt/include/oracle/macros/materializations/table/table.sql index 29b23c6..17b1f69 100644 --- a/dbt/include/oracle/macros/materializations/table/table.sql +++ b/dbt/include/oracle/macros/materializations/table/table.sql @@ -100,26 +100,26 @@ {% macro py_write_table(compiled_code, target_relation, temporary=False) %} {{ compiled_code.replace(model.raw_code, "", 1) }} - def materialize(df, table, session): - if isinstance(df, pd.core.frame.DataFrame): - oml.create(df, table=table) - elif isinstance(df, oml.core.frame.DataFrame): - df.materialize(table=table) - - dbt = dbtObj(load_df_function=oml.sync) - final_df = model(dbt, session=oml) - - {{ log("Python model materialization is " ~ model.config.materialized, info=True) }} - {% if model.config.materialized.lower() == 'table' %} - table_name = f"{dbt.this.identifier}__dbt_tmp" - {% else %} - # incremental materialization - {% if temporary %} - table_name = "{{target_relation.identifier}}" - {% else %} - table_name = dbt.this.identifier - {% endif %} - {% endif %} - materialize(final_df, table=table_name.upper(), session=oml) - return pd.DataFrame.from_dict({"result": [1]}) + try: + dbt = dbtObj(load_df_function=oml.sync) + set_connection_attributes() + final_df = model(dbt, session=oml) + {{ log("Python model materialization is " ~ model.config.materialized, info=True) }} + {% if model.config.materialized.lower() == 'table' %} + table_name = f"{dbt.this.identifier}__dbt_tmp" + {% else %} + # incremental materialization + {% if temporary %} + table_name = "{{target_relation.identifier}}" + {% else %} + table_name = dbt.this.identifier + {% endif %} + {% endif %} + materialize(final_df, table=table_name.upper(), session=oml) + return pd.DataFrame.from_dict({"result": [1]}) + except Exception: + raise + finally: + connection = oml.core.methods._get_conn() + connection.close() {% endmacro %} diff --git a/dbt_adbs_test_project/models/sales_py.py b/dbt_adbs_test_project/models/sales_py.py new file mode 100644 index 0000000..cc4cb8a --- /dev/null +++ b/dbt_adbs_test_project/models/sales_py.py @@ -0,0 +1,7 @@ +def model(dbt, session): + dbt.config(materialized="table") + dbt.config(async_flag=True) + dbt.config(timeout=1800) + # oml.core.DataFrame referencing a dbt-sql model + sales = session.sync(query="SELECT * FROM SH.SALES") + return sales diff --git a/setup.cfg b/setup.cfg index a27224c..3735a21 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = dbt-oracle -version = 1.5.4 +version = 1.5.5 description = dbt (data build tool) adapter for the Oracle database long_description = file: README.md long_description_content_type = text/markdown diff --git a/setup.py b/setup.py index 993c1f3..63b8e41 100644 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ url = 'https://github.com/oracle/dbt-oracle' -VERSION = '1.5.4' +VERSION = '1.5.5' setup( author="Oracle", python_requires='>=3.7.2',