From 01c4263108b9d21ee6cd498f64e39b7c70a5fe9d Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Wed, 19 Feb 2025 11:03:14 -0500 Subject: [PATCH] Added support for MSSQL and POSTGRESQL to HMS Federation (#3701) closes #3664 --- .../labs/ucx/hive_metastore/federation.py | 67 ++++++++++++----- tests/unit/hive_metastore/test_federation.py | 75 ++++++++++++++----- 2 files changed, 107 insertions(+), 35 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/federation.py b/src/databricks/labs/ucx/hive_metastore/federation.py index 7768e07e79..0a180713ca 100644 --- a/src/databricks/labs/ucx/hive_metastore/federation.py +++ b/src/databricks/labs/ucx/hive_metastore/federation.py @@ -78,9 +78,23 @@ def __init__( self._enable_hms_federation = enable_hms_federation self._config = config - # Supported databases and version for HMS Federation - supported_database_versions: ClassVar[dict[str, list[str]]] = { - "mysql": ["2.3", "0.13"], + # Supported databases and associated ports + # https://docs.databricks.com/en/data-governance/unity-catalog/hms-federation/hms-federation-external.html + # https://dev.mysql.com/doc/mysql-port-reference/en/mysql-port-reference-tables.html + # https://www.postgresql.org/docs/current/runtime-config-connection.html + # https://learn.microsoft.com/en-us/sql/connect/jdbc/building-the-connection-url?view=sql-server-ver15 + supported_databases_port: ClassVar[dict[str, int]] = { + "mysql": 3306, + "postgresql": 5432, + "sqlserver": 1433, + } + + # Supported HMS versions + # https://docs.databricks.com/en/data-governance/unity-catalog/hms-federation/hms-federation-external.html + supported_hms_versions: ClassVar[set[tuple[int, int]]] = { + (0, 13), + (2, 3), + (3, 1), } def create_from_cli(self, prompts: Prompts) -> None: @@ -127,19 +141,24 @@ def _external_hms(self) -> ExternalHmsInfo | None: if not version: logger.info('Hive Metastore version not found') return None - major_minor_match = re.match(r'(^\d+\.\d+)', version) - if not major_minor_match: + major_minor_match = re.match(r'(^(?P\d+)\.(?P\d+))', version) + if not major_minor_match or not major_minor_match.group('major') or not major_minor_match.group('minor'): logger.info(f'Wrong Hive Metastore Database Version Format: {version}') return None - major_minor_version = major_minor_match.group(1) - external_hms = replace(self._split_jdbc_url(jdbc_url), version=major_minor_version) - supported_versions = self.supported_database_versions.get(external_hms.database_type) - if not supported_versions: - logger.info(f'Unsupported Hive Metastore: {external_hms.database_type}') + try: + major = int(major_minor_match.group('major')) + minor = int(major_minor_match.group('minor')) + except ValueError: + logger.info(f'Wrong Hive Metastore Database Version Format: {version}') return None - if major_minor_version not in supported_versions: - logger.info(f'Unsupported Hive Metastore Version: {external_hms.database_type} - {version}') + + # Verify HMS version + if (major, minor) not in self.supported_hms_versions: + logger.info( + f'Unsupported Hive Metastore Version: {version}. We currently support: {self.supported_hms_versions}' + ) return None + external_hms = replace(self._split_jdbc_url(jdbc_url), version=f'{major}.{minor}') if not external_hms.user: external_hms = replace( @@ -158,19 +177,33 @@ def _external_hms(self) -> ExternalHmsInfo | None: @classmethod def _split_jdbc_url(cls, jdbc_url: str) -> ExternalHmsInfo: # Define the regex pattern to match the JDBC URL components + # The regex supports the following JDBC URL formats: + # 1. jdbc:mysql://hostname:3306/metastore + # 2. jdbc:mysql://hostname/metastore + # 3. jdbc:mysql://hostname:3306/metastore?user=foo&password=bar + # 4. jdbc:mysql://hostname/metastore?user=foo&password=bar + # 5. jdbc:mssql://hostname:1433;database=database;user=foo;password=bar pattern = re.compile( - r'jdbc:(?P[a-zA-Z0-9]+)://(?P[^:/]+):(?P\d+)/(?P[^?]+)(\?user=(?P[^&]+)&password=(?P[^&]+))?' + r'jdbc:(?P[a-zA-Z0-9]+)://(?P[^:/?;]+)(:(?P\d+))?(/(?P[^?^;]+))?([?;](?P.+))?' ) match = pattern.match(jdbc_url) if not match: raise ValueError(f'Unsupported JDBC URL: {jdbc_url}') + params = {} + if match.group('parameters'): + params = dict(param.split('=') for param in re.split(r"[;&]", match.group('parameters'))) + db_type = match.group('db_type') + port = match.group('port') or str(cls.supported_databases_port.get(db_type)) + if not port: + raise ValueError(f"Can't identify Port for {db_type}") host = match.group('host') - port = match.group('port') - database = match.group('database') - user = match.group('user') - password = match.group('password') + database = match.group('database') or params.get("database") + if not database or not isinstance(database, str): + raise ValueError(f"Can't identify Database for {db_type}") + user = params.get('user') + password = params.get('password') return ExternalHmsInfo(db_type, host, port, database, user, password, None) diff --git a/tests/unit/hive_metastore/test_federation.py b/tests/unit/hive_metastore/test_federation.py index a8c142a7f0..43abf232f8 100644 --- a/tests/unit/hive_metastore/test_federation.py +++ b/tests/unit/hive_metastore/test_federation.py @@ -1,5 +1,7 @@ import base64 from unittest.mock import create_autospec, call +import pytest + from databricks.labs.blueprint.installation import MockInstallation from databricks.labs.blueprint.tui import MockPrompts @@ -87,7 +89,59 @@ def test_create_federated_catalog_int(mock_installation): assert calls == workspace_client.grants.method_calls -def test_create_federated_catalog_ext(mock_installation): +@pytest.mark.parametrize( + "config, expected", + [ + ( + { + "spark.hadoop.javax.jdo.option.ConnectionPassword": "{{secrets/secret_scope/secret_key}}", + "spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:mysql://hostname.us-east-2.rds.amazonaws.com:3306/metastore", + "spark.hadoop.javax.jdo.option.ConnectionUserName": "foo", + "spark.sql.hive.metastore.version": "2.3.0", + }, + { + 'database': 'metastore', + 'db_type': 'mysql', + 'host': 'hostname.us-east-2.rds.amazonaws.com', + 'password': 'bar', + 'port': '3306', + 'user': 'foo', + 'version': '2.3', + }, + ), + ( + { + "spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:mysql://hostname.us-east-2.rds.amazonaws.com:3306/metastore?user=foo&password=bar", + "spark.sql.hive.metastore.version": "3.1.2", + }, + { + 'database': 'metastore', + 'db_type': 'mysql', + 'host': 'hostname.us-east-2.rds.amazonaws.com', + 'password': 'bar', + 'port': '3306', + 'user': 'foo', + 'version': '3.1', + }, + ), + ( + { + "spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:sqlserver://teststableip.database.windows.net;database=teststableip;user=teststableip@teststableip;password=bar", + "spark.sql.hive.metastore.version": "3.1.2", + }, + { + 'database': 'teststableip', + 'db_type': 'sqlserver', + 'host': 'teststableip.database.windows.net', + 'password': 'bar', + 'port': '1433', + 'user': 'teststableip@teststableip', + 'version': '3.1', + }, + ), + ], +) +def test_create_federated_catalog_ext(mock_installation, config, expected): workspace_client = create_autospec(WorkspaceClient) external_locations = create_autospec(ExternalLocations) workspace_info = create_autospec(WorkspaceInfo) @@ -109,14 +163,7 @@ def test_create_federated_catalog_ext(mock_installation): ) mock_installation.load = lambda _: WorkspaceConfig( inventory_database='ucx', - spark_conf={ - "spark.hadoop.javax.jdo.option.ConnectionDriverName": "org.mariadb.jdbc.Driver", - "spark.hadoop.javax.jdo.option.ConnectionPassword": "{{secrets/secret_scope/secret_key}}", - "spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:mysql://hostname.us-east-2.rds.amazonaws.com:3306/metastore", - "spark.hadoop.javax.jdo.option.ConnectionUserName": "foo", - "spark.sql.hive.metastore.jars": "maven", - "spark.sql.hive.metastore.version": "2.3.0", - }, + spark_conf=config, ) hms_fed = HiveMetastoreFederation( @@ -134,15 +181,7 @@ def test_create_federated_catalog_ext(mock_installation): workspace_client.connections.create.assert_called_with( name='fed_source', connection_type=ConnectionType.HIVE_METASTORE, - options={ - 'database': 'metastore', - 'db_type': 'mysql', - 'host': 'hostname.us-east-2.rds.amazonaws.com', - 'password': 'bar', - 'port': '3306', - 'user': 'foo', - 'version': '2.3', - }, + options=expected, ) workspace_client.catalogs.create.assert_called_with( name='a',