Skip to content

Commit 64eab6c

Browse files
committed
[flink] Optionally show all registered functions
1 parent 17f6e56 commit 64eab6c

File tree

2 files changed

+124
-18
lines changed

2 files changed

+124
-18
lines changed

desktop/libs/notebook/src/notebook/connectors/flink_sql.py

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
OPERATION_TOKEN = '%(username)s-%(connector_name)s' + '-operation-token'
3636
DEFAULT_CATALOG_PARAM = "default_catalog"
3737
DEFAULT_DATABASE_PARAM = "default_database"
38+
LIST_ALL_FUNCTIONS_PARAM = "list_all_functions"
3839

3940

4041
def query_error_handler(func):
@@ -74,6 +75,7 @@ def __init__(self, user, interpreter=None):
7475
api_url = self.options['url']
7576
self.default_catalog = self.options.get(DEFAULT_CATALOG_PARAM)
7677
self.default_database = self.options.get(DEFAULT_DATABASE_PARAM)
78+
self.list_all_functions = self.options.get(LIST_ALL_FUNCTIONS_PARAM)
7779

7880
self.db = FlinkSqlClient(user=user, api_url=api_url)
7981

@@ -418,12 +420,18 @@ def _check_status_and_fetch_result(self, session_handle, operation_handle):
418420
data = [i['fields'] for i in resp['results']['data'] if resp and resp['results'] and resp['results']['data']]
419421
return data
420422

421-
def _show_databases(self):
423+
def _show_catalogs(self):
422424
session = self._get_session()
423-
session_handle = session['id']
425+
operation_handle = self.db.execute_statement(session_handle=session['id'], statement='SHOW CATALOGS')
426+
catalog_list = self._check_status_and_fetch_result(session['id'], operation_handle['operationHandle'])
427+
428+
return [catalog[0] for catalog in catalog_list]
424429

425-
operation_handle = self.db.execute_statement(session_handle=session_handle, statement='SHOW DATABASES')
426-
db_list = self._check_status_and_fetch_result(session_handle, operation_handle['operationHandle'])
430+
def _show_databases(self, catalog=None):
431+
session = self._get_session()
432+
statement = 'SHOW DATABASES IN `%(catalog)s`' % {'catalog': catalog} if catalog else 'SHOW DATABASES'
433+
operation_handle = self.db.execute_statement(session_handle=session['id'], statement=statement)
434+
db_list = self._check_status_and_fetch_result(session['id'], operation_handle['operationHandle'])
427435

428436
return [db[0] for db in db_list]
429437

@@ -461,12 +469,51 @@ def _get_columns(self, database, table):
461469
]
462470

463471
def _show_functions(self, database):
472+
if self.list_all_functions:
473+
return self._show_all_functions()
474+
else:
475+
return self._show_functions_in_current_db(database)
476+
477+
def _show_functions_in_current_db(self, database):
464478
session = self._get_session()
465479
statement = 'SHOW FUNCTIONS IN `%(database)s`' % {'database': database} if database else 'SHOW FUNCTIONS'
466480
operation_handle = self.db.execute_statement(session['id'], statement)
467481
function_list = self._check_status_and_fetch_result(session['id'], operation_handle['operationHandle'])
468482
return [{'name': function[0]} for function in function_list]
469483

484+
def _show_all_functions(self):
485+
# Flink UDFs can be registered in any catalog in any database. This function iterates through all catalogs
486+
# and databases to lists all defined functions. The results are then extended with Flink system functions.
487+
# Returned user functions names are fully qualified (<catalog_name>.<database_name>.<function_name>).
488+
result = []
489+
490+
for catalog in self._show_catalogs():
491+
for database in self._show_databases(catalog):
492+
user_functions = self._list_user_functions(catalog, database)
493+
result.extend([{'name': f'{catalog}.{database}.{function["name"]}'} for function in user_functions])
494+
495+
result.extend(self._list_system_functions())
496+
return result
497+
498+
def _list_system_functions(self):
499+
# Flink allows to either list all functions or only user functions.
500+
all_functions = self._list_functions(catalog=self.default_catalog, database=self.default_database, user_scope=False)
501+
user_functions = self._list_user_functions(catalog=self.default_catalog, database=self.default_database)
502+
system_functions = [f for f in all_functions if f not in user_functions]
503+
return system_functions
504+
505+
def _list_user_functions(self, catalog, database):
506+
return self._list_functions(catalog, database, user_scope=True)
507+
508+
def _list_functions(self, catalog, database, user_scope):
509+
session = self._get_session()
510+
statement = 'SHOW USER FUNCTIONS' if user_scope else 'SHOW FUNCTIONS'
511+
if database:
512+
statement = statement + ' IN `%(catalog)s`.`%(database)s`' % {'catalog': catalog, 'database': database}
513+
operation_handle = self.db.execute_statement(session['id'], statement)
514+
function_list = self._check_status_and_fetch_result(session['id'], operation_handle['operationHandle'])
515+
return [{'name': function[0]} for function in function_list]
516+
470517
def _show_function(self, function_name):
471518
session = self._get_session()
472519
if session.get('flink_version') and session['flink_version'].startswith('2.'):
@@ -476,9 +523,11 @@ def _show_function(self, function_name):
476523
statement='DESCRIBE FUNCTION EXTENDED %(function_name)s' % {'function_name': function_name})
477524
properties = dict(self._check_status_and_fetch_result(session['id'], operation_handle['operationHandle']))
478525

526+
# Function can be overloaded (multiple signatures). But only the first signature will be returned.
527+
signatures = properties.get('signature').split('\n')
479528
return {
480529
'name': function_name,
481-
'signature': properties.get('signature'),
530+
'signature': signatures[0],
482531
}
483532
else:
484533
return {'name': function_name}

desktop/libs/notebook/src/notebook/connectors/flink_sql_tests.py

Lines changed: 70 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -86,18 +86,7 @@ def test_autocomplete_operation_functions(self, client_mock):
8686
mock_client_instance.create_session.return_value = {'sessionHandle': self.TEST_SESSION_HANDLE}
8787
mock_client_instance.info.return_value = {'version': '2.0.0'}
8888
mock_client_instance.execute_statement.return_value = {'operationHandle': self.TEST_OPERATION_HANDLE}
89-
mock_client_instance.fetch_results.return_value = {
90-
'resultType': 'PAYLOAD',
91-
'resultKind': 'SUCCESS_WITH_CONTENT',
92-
'results': {
93-
'columns': [{'name': 'function name', 'logicalType': {'type': 'VARCHAR', 'nullable': True, 'length': 1000}}],
94-
'rowFormat': 'JSON',
95-
'data': [
96-
{'kind': 'INSERT', 'fields': ['lower']},
97-
{'kind': 'INSERT', 'fields': ['upper']}
98-
]},
99-
'nextResultUri': f'/v3/sessions/{self.TEST_SESSION_HANDLE}/operations/{self.TEST_OPERATION_HANDLE}/result/1?rowFormat=JSON'
100-
}
89+
mock_client_instance.fetch_results.return_value = self._list_function_payload(['lower', 'upper'])
10190

10291
# and: FlinkSqlApi instance with configuration
10392
flink_api = FlinkSqlApi(self.user, interpreter=self.interpreter)
@@ -111,7 +100,53 @@ def test_autocomplete_operation_functions(self, client_mock):
111100

112101
# then
113102
mock_client_instance.execute_statement.assert_called_once_with(self.TEST_SESSION_HANDLE, 'SHOW FUNCTIONS')
114-
assert autocomplete_result == {'functions': [{'name': 'lower'}, {'name': 'upper'}]}
103+
self._assert_autocomplete_functions(autocomplete_result, ['lower', 'upper'])
104+
105+
@patch('notebook.connectors.flink_sql.FlinkSqlClient')
106+
def test_autocomplete_operation_functions_list_all(self, client_mock):
107+
# given: mock interactions
108+
def mock_execute_statement(session_handle, statement):
109+
responses = {
110+
'SHOW CATALOGS': {'operationHandle': 'show-catalogs'},
111+
'SHOW DATABASES IN `test_catalog`': {'operationHandle': 'show-databases'},
112+
'SHOW FUNCTIONS IN `test_catalog`.`db_a`': {'operationHandle': 'show-fns-dba'},
113+
'SHOW USER FUNCTIONS IN `test_catalog`.`db_a`': {'operationHandle': 'show-user-fns-dba'},
114+
'SHOW USER FUNCTIONS IN `test_catalog`.`db_b`': {'operationHandle': 'show-user-fns-dbb'},
115+
}
116+
return responses.get(statement)
117+
118+
def mock_fetch_results(session_handle, operation_handle, token):
119+
responses = {
120+
'show-catalogs': self._list_function_payload(['test_catalog']),
121+
'show-databases': self._list_function_payload(['db_a', 'db_b']),
122+
'show-fns-dba': self._list_function_payload(['test_fun_a', 'lower', 'upper']),
123+
'show-user-fns-dba': self._list_function_payload(['test_fun_a']),
124+
'show-user-fns-dbb': self._list_function_payload(['test_fun_b']),
125+
}
126+
return responses.get(operation_handle)
127+
128+
mock_client_instance = MagicMock()
129+
client_mock.return_value = mock_client_instance
130+
mock_client_instance.create_session.return_value = {'sessionHandle': self.TEST_SESSION_HANDLE}
131+
mock_client_instance.info.return_value = {'version': '2.0.0'}
132+
mock_client_instance.execute_statement.side_effect = mock_execute_statement
133+
mock_client_instance.fetch_results.side_effect = mock_fetch_results
134+
135+
# and: FlinkSqlApi instance with configuration
136+
self.interpreter['options']['list_all_functions'] = True
137+
self.interpreter['options']['default_catalog'] = 'test_catalog'
138+
self.interpreter['options']['default_database'] = 'db_a'
139+
flink_api = FlinkSqlApi(self.user, interpreter=self.interpreter)
140+
141+
# and: session is created
142+
flink_api.create_session(lang='flink', properties=None)
143+
144+
# when
145+
autocomplete_result = flink_api.autocomplete(snippet='dummy', database=None, table=None, column=None,
146+
nested=None, operation='functions')
147+
148+
# then
149+
self._assert_autocomplete_functions(autocomplete_result, ['lower', 'upper', 'test_catalog.db_a.test_fun_a', 'test_catalog.db_b.test_fun_b'])
115150

116151
@patch('notebook.connectors.flink_sql.FlinkSqlClient')
117152
def test_autocomplete_operation_function_flink_1_x(self, client_mock):
@@ -185,3 +220,25 @@ def test_autocomplete_operation_function_flink_2_x(self, client_mock):
185220
assert autocomplete_result == {
186221
'function': {'name': 'test_function', 'signature': 'default_catalog.default_db.test_function(values <ANY>...)'}
187222
}
223+
224+
def _list_function_payload(self, expected_functions, session_handle=None, operation_handle=None, token=0):
225+
session_handle = session_handle if session_handle else self.TEST_SESSION_HANDLE
226+
operation_handle = operation_handle if operation_handle else self.TEST_OPERATION_HANDLE
227+
228+
return {
229+
'resultType': 'PAYLOAD',
230+
'resultKind': 'SUCCESS_WITH_CONTENT',
231+
'results': {
232+
'columns': [
233+
{'name': 'function name', 'logicalType': {'type': 'VARCHAR', 'nullable': True, 'length': 1000}}],
234+
'rowFormat': 'JSON',
235+
'data': [
236+
{'kind': 'INSERT', 'fields': [fname]} for fname in expected_functions
237+
]},
238+
'nextResultUri': f'/v3/sessions/{session_handle}/operations/{operation_handle}/result/1?rowFormat=JSON'
239+
}
240+
241+
def _assert_autocomplete_functions(self, autocomplete_result, expected_fun_names):
242+
actual_fun_names = set([f['name'] for f in autocomplete_result['functions']])
243+
assert set(expected_fun_names) == actual_fun_names
244+

0 commit comments

Comments
 (0)