Skip to content

Commit

Permalink
Added support for MSSQL and POSTGRESQL to HMS Federation (#3701)
Browse files Browse the repository at this point in the history
closes #3664
  • Loading branch information
FastLee authored Feb 19, 2025
1 parent c88d3f5 commit 01c4263
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 35 deletions.
67 changes: 50 additions & 17 deletions src/databricks/labs/ucx/hive_metastore/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,23 @@ def __init__(
self._enable_hms_federation = enable_hms_federation
self._config = config

# Supported databases and version for HMS Federation
supported_database_versions: ClassVar[dict[str, list[str]]] = {
"mysql": ["2.3", "0.13"],
# Supported databases and associated ports
# https://docs.databricks.com/en/data-governance/unity-catalog/hms-federation/hms-federation-external.html
# https://dev.mysql.com/doc/mysql-port-reference/en/mysql-port-reference-tables.html
# https://www.postgresql.org/docs/current/runtime-config-connection.html
# https://learn.microsoft.com/en-us/sql/connect/jdbc/building-the-connection-url?view=sql-server-ver15
supported_databases_port: ClassVar[dict[str, int]] = {
"mysql": 3306,
"postgresql": 5432,
"sqlserver": 1433,
}

# Supported HMS versions
# https://docs.databricks.com/en/data-governance/unity-catalog/hms-federation/hms-federation-external.html
supported_hms_versions: ClassVar[set[tuple[int, int]]] = {
(0, 13),
(2, 3),
(3, 1),
}

def create_from_cli(self, prompts: Prompts) -> None:
Expand Down Expand Up @@ -127,19 +141,24 @@ def _external_hms(self) -> ExternalHmsInfo | None:
if not version:
logger.info('Hive Metastore version not found')
return None
major_minor_match = re.match(r'(^\d+\.\d+)', version)
if not major_minor_match:
major_minor_match = re.match(r'(^(?P<major>\d+)\.(?P<minor>\d+))', version)
if not major_minor_match or not major_minor_match.group('major') or not major_minor_match.group('minor'):
logger.info(f'Wrong Hive Metastore Database Version Format: {version}')
return None
major_minor_version = major_minor_match.group(1)
external_hms = replace(self._split_jdbc_url(jdbc_url), version=major_minor_version)
supported_versions = self.supported_database_versions.get(external_hms.database_type)
if not supported_versions:
logger.info(f'Unsupported Hive Metastore: {external_hms.database_type}')
try:
major = int(major_minor_match.group('major'))
minor = int(major_minor_match.group('minor'))
except ValueError:
logger.info(f'Wrong Hive Metastore Database Version Format: {version}')
return None
if major_minor_version not in supported_versions:
logger.info(f'Unsupported Hive Metastore Version: {external_hms.database_type} - {version}')

# Verify HMS version
if (major, minor) not in self.supported_hms_versions:
logger.info(
f'Unsupported Hive Metastore Version: {version}. We currently support: {self.supported_hms_versions}'
)
return None
external_hms = replace(self._split_jdbc_url(jdbc_url), version=f'{major}.{minor}')

if not external_hms.user:
external_hms = replace(
Expand All @@ -158,19 +177,33 @@ def _external_hms(self) -> ExternalHmsInfo | None:
@classmethod
def _split_jdbc_url(cls, jdbc_url: str) -> ExternalHmsInfo:
# Define the regex pattern to match the JDBC URL components
# The regex supports the following JDBC URL formats:
# 1. jdbc:mysql://hostname:3306/metastore
# 2. jdbc:mysql://hostname/metastore
# 3. jdbc:mysql://hostname:3306/metastore?user=foo&password=bar
# 4. jdbc:mysql://hostname/metastore?user=foo&password=bar
# 5. jdbc:mssql://hostname:1433;database=database;user=foo;password=bar
pattern = re.compile(
r'jdbc:(?P<db_type>[a-zA-Z0-9]+)://(?P<host>[^:/]+):(?P<port>\d+)/(?P<database>[^?]+)(\?user=(?P<user>[^&]+)&password=(?P<password>[^&]+))?'
r'jdbc:(?P<db_type>[a-zA-Z0-9]+)://(?P<host>[^:/?;]+)(:(?P<port>\d+))?(/(?P<database>[^?^;]+))?([?;](?P<parameters>.+))?'
)
match = pattern.match(jdbc_url)
if not match:
raise ValueError(f'Unsupported JDBC URL: {jdbc_url}')

params = {}
if match.group('parameters'):
params = dict(param.split('=') for param in re.split(r"[;&]", match.group('parameters')))

db_type = match.group('db_type')
port = match.group('port') or str(cls.supported_databases_port.get(db_type))
if not port:
raise ValueError(f"Can't identify Port for {db_type}")
host = match.group('host')
port = match.group('port')
database = match.group('database')
user = match.group('user')
password = match.group('password')
database = match.group('database') or params.get("database")
if not database or not isinstance(database, str):
raise ValueError(f"Can't identify Database for {db_type}")
user = params.get('user')
password = params.get('password')

return ExternalHmsInfo(db_type, host, port, database, user, password, None)

Expand Down
75 changes: 57 additions & 18 deletions tests/unit/hive_metastore/test_federation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import base64
from unittest.mock import create_autospec, call
import pytest


from databricks.labs.blueprint.installation import MockInstallation
from databricks.labs.blueprint.tui import MockPrompts
Expand Down Expand Up @@ -87,7 +89,59 @@ def test_create_federated_catalog_int(mock_installation):
assert calls == workspace_client.grants.method_calls


def test_create_federated_catalog_ext(mock_installation):
@pytest.mark.parametrize(
"config, expected",
[
(
{
"spark.hadoop.javax.jdo.option.ConnectionPassword": "{{secrets/secret_scope/secret_key}}",
"spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:mysql://hostname.us-east-2.rds.amazonaws.com:3306/metastore",
"spark.hadoop.javax.jdo.option.ConnectionUserName": "foo",
"spark.sql.hive.metastore.version": "2.3.0",
},
{
'database': 'metastore',
'db_type': 'mysql',
'host': 'hostname.us-east-2.rds.amazonaws.com',
'password': 'bar',
'port': '3306',
'user': 'foo',
'version': '2.3',
},
),
(
{
"spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:mysql://hostname.us-east-2.rds.amazonaws.com:3306/metastore?user=foo&password=bar",
"spark.sql.hive.metastore.version": "3.1.2",
},
{
'database': 'metastore',
'db_type': 'mysql',
'host': 'hostname.us-east-2.rds.amazonaws.com',
'password': 'bar',
'port': '3306',
'user': 'foo',
'version': '3.1',
},
),
(
{
"spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:sqlserver://teststableip.database.windows.net;database=teststableip;user=teststableip@teststableip;password=bar",
"spark.sql.hive.metastore.version": "3.1.2",
},
{
'database': 'teststableip',
'db_type': 'sqlserver',
'host': 'teststableip.database.windows.net',
'password': 'bar',
'port': '1433',
'user': 'teststableip@teststableip',
'version': '3.1',
},
),
],
)
def test_create_federated_catalog_ext(mock_installation, config, expected):
workspace_client = create_autospec(WorkspaceClient)
external_locations = create_autospec(ExternalLocations)
workspace_info = create_autospec(WorkspaceInfo)
Expand All @@ -109,14 +163,7 @@ def test_create_federated_catalog_ext(mock_installation):
)
mock_installation.load = lambda _: WorkspaceConfig(
inventory_database='ucx',
spark_conf={
"spark.hadoop.javax.jdo.option.ConnectionDriverName": "org.mariadb.jdbc.Driver",
"spark.hadoop.javax.jdo.option.ConnectionPassword": "{{secrets/secret_scope/secret_key}}",
"spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:mysql://hostname.us-east-2.rds.amazonaws.com:3306/metastore",
"spark.hadoop.javax.jdo.option.ConnectionUserName": "foo",
"spark.sql.hive.metastore.jars": "maven",
"spark.sql.hive.metastore.version": "2.3.0",
},
spark_conf=config,
)

hms_fed = HiveMetastoreFederation(
Expand All @@ -134,15 +181,7 @@ def test_create_federated_catalog_ext(mock_installation):
workspace_client.connections.create.assert_called_with(
name='fed_source',
connection_type=ConnectionType.HIVE_METASTORE,
options={
'database': 'metastore',
'db_type': 'mysql',
'host': 'hostname.us-east-2.rds.amazonaws.com',
'password': 'bar',
'port': '3306',
'user': 'foo',
'version': '2.3',
},
options=expected,
)
workspace_client.catalogs.create.assert_called_with(
name='a',
Expand Down

0 comments on commit 01c4263

Please sign in to comment.