From c6649d5b7e48606d397a6ef8d5dbd4a66c2f8170 Mon Sep 17 00:00:00 2001 From: Fabian Jakobs Date: Wed, 4 May 2022 09:17:38 +0000 Subject: [PATCH] Add execution context and run command APIs These are the still supported functionality from the REST API 1.2. https://docs.databricks.com/dev-tools/api/1.2/index.html --- databricks_cli/cli.py | 2 + databricks_cli/click_types.py | 31 +++ databricks_cli/execution_context/__init__.py | 22 ++ databricks_cli/execution_context/api.py | 75 ++++++ databricks_cli/execution_context/cli.py | 251 +++++++++++++++++++ databricks_cli/sdk/api_client.py | 6 + databricks_cli/sdk/service.py | 80 +++++- 7 files changed, 466 insertions(+), 1 deletion(-) create mode 100644 databricks_cli/execution_context/__init__.py create mode 100644 databricks_cli/execution_context/api.py create mode 100644 databricks_cli/execution_context/cli.py diff --git a/databricks_cli/cli.py b/databricks_cli/cli.py index 0b4f6ecf..be30fc3d 100644 --- a/databricks_cli/cli.py +++ b/databricks_cli/cli.py @@ -41,6 +41,7 @@ from databricks_cli.instance_pools.cli import instance_pools_group from databricks_cli.pipelines.cli import pipelines_group from databricks_cli.repos.cli import repos_group +from databricks_cli.execution_context.cli import execution_context_group @click.group(context_settings=CONTEXT_SETTINGS) @@ -67,6 +68,7 @@ def cli(): cli.add_command(instance_pools_group, name="instance-pools") cli.add_command(pipelines_group, name='pipelines') cli.add_command(repos_group, name='repos') +cli.add_command(execution_context_group, name='execution-context') if __name__ == "__main__": cli() diff --git a/databricks_cli/click_types.py b/databricks_cli/click_types.py index 463efe82..7d86acd7 100644 --- a/databricks_cli/click_types.py +++ b/databricks_cli/click_types.py @@ -112,6 +112,37 @@ class PipelineIdClickType(ParamType): help = 'The pipeline ID.' +class ExecutionContextIdClickType(ParamType): + name = 'CONTEXT_ID' + help = 'The execution context ID as returned from "databricks execution-context create".' + + +class CommandIdClickType(ParamType): + name = 'COMMAND_ID' + help = 'The command ID as returned from "databricks execution-context command-execute".' + + +class CommandStringType(ParamType): + name = 'COMMAND' + help = 'The command string to run.' + + +class CommandOutputType(ParamType): + name = 'FORMAT' + help = 'Can be "JSON" or "TEXT". Set to TEXT by default.' + + def convert(self, value, param, ctx): + if value is None: + return 'text' + if value.lower() != 'json' and value.lower() != 'text': + raise RuntimeError('output must be "json" or "text"') + return value + + @classmethod + def is_json(cls, value): + return value is not None and value.lower() == 'json' + + class OneOfOption(Option): def __init__(self, *args, **kwargs): self.one_of = kwargs.pop('one_of') diff --git a/databricks_cli/execution_context/__init__.py b/databricks_cli/execution_context/__init__.py new file mode 100644 index 00000000..b0c9feac --- /dev/null +++ b/databricks_cli/execution_context/__init__.py @@ -0,0 +1,22 @@ +# Databricks CLI +# Copyright 2017 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"), except +# that the use of services to which certain application programming +# interfaces (each, an "API") connect requires that the user first obtain +# a license for the use of the APIs from Databricks, Inc. ("Databricks"), +# by creating an account at www.databricks.com and agreeing to either (a) +# the Community Edition Terms of Service, (b) the Databricks Terms of +# Service, or (c) another written agreement between Licensee and Databricks +# for the use of the APIs. +# +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/databricks_cli/execution_context/api.py b/databricks_cli/execution_context/api.py new file mode 100644 index 00000000..ecf6291e --- /dev/null +++ b/databricks_cli/execution_context/api.py @@ -0,0 +1,75 @@ +# WARNING THIS FILE IS AUTOGENERATED +# +# Databricks CLI +# Copyright 2017 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"), except +# that the use of services to which certain application programming +# interfaces (each, an "API") connect requires that the user first obtain +# a license for the use of the APIs from Databricks, Inc. ("Databricks"), +# by creating an account at www.databricks.com and agreeing to either (a) +# the Community Edition Terms of Service, (b) the Databricks Terms of +# Service, or (c) another written agreement between Licensee and Databricks +# for the use of the APIs. +# +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from typing import Dict, Any + +from databricks_cli.sdk.service import ExecutionContextService + + +class ExecutionContext(object): + def __init__(self, api_client, cluster_id) -> None: + self.client = ExecutionContextService(api_client) + self.cluster_id = cluster_id + self.id = None + + def __enter__(self): + if self.id is None: + self.id = self.client.create_context(cluster_id=self.cluster_id)["id"] + + return self + + def __exit__(self, _type, _value, _traceback): + self.client.delete_context(cluster_id=self.cluster_id, context_id=self.id) + self.id = None + + +class ExecutionContextApi(object): + + def __init__(self, api_client) -> None: + self.client = ExecutionContextService(api_client) + + def create_context(self, cluster_id, language="python"): + # type: (str, str, Dict[Any, Any]) -> Dict[Any, Any] + return self.client.create_context(cluster_id, language) + + def get_context_status(self, cluster_id, context_id): + # type: (str, str, Dict[Any, Any]) -> Dict[Any, Any] + return self.client.get_context_status(cluster_id, context_id) + + def delete_context(self, cluster_id, context_id): + # type: (str, str, Dict[Any, Any]) -> Any + return self.client.delete_context(cluster_id, context_id) + + def execute_command(self, cluster_id, context_id, command, language="python"): + # type: (str, str, str, str, Dict[Any, Any]) -> Dict[Any, Any] + return self.client.execute_command(cluster_id, context_id, command, language) + + def cancel_command(self, cluster_id, context_id, command_id): + # type: (str, str, str, Dict[Any, Any]) -> Dict[Any, Any] + return self.client.cancel_command(cluster_id, context_id, command_id) + + def get_command_status(self, cluster_id, context_id, command_id): + # type: (str, str, str, Dict[Any, Any]) -> Dict[Any, Any] + return self.client.get_command_status(cluster_id, context_id, command_id) \ No newline at end of file diff --git a/databricks_cli/execution_context/cli.py b/databricks_cli/execution_context/cli.py new file mode 100644 index 00000000..2f10c1f2 --- /dev/null +++ b/databricks_cli/execution_context/cli.py @@ -0,0 +1,251 @@ +# Databricks CLI +# Copyright 2017 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"), except +# that the use of services to which certain application programming +# interfaces (each, an "API") connect requires that the user first obtain +# a license for the use of the APIs from Databricks, Inc. ("Databricks"), +# by creating an account at www.databricks.com and agreeing to either (a) +# the Community Edition Terms of Service, (b) the Databricks Terms of +# Service, or (c) another written agreement between Licensee and Databricks +# for the use of the APIs. +# +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import click +from tabulate import tabulate +from databricks_cli.click_types import ( + ClusterIdClickType, CommandIdClickType, + CommandOutputType, CommandStringType, + ExecutionContextIdClickType, OutputClickType +) + +from databricks_cli.utils import eat_exceptions, CONTEXT_SETTINGS, pretty_format, truncate_string +from databricks_cli.version import print_version_callback, version +from databricks_cli.configure.config import provide_api_client, profile_option, debug_option +from databricks_cli.execution_context.api import ExecutionContext, ExecutionContextApi + + +@click.command(context_settings=CONTEXT_SETTINGS, + short_help="Creates a new execution context.") +@click.option('--cluster-id', required=True, type=ClusterIdClickType(), + help=ClusterIdClickType.help) +@click.option('--language', required=False, default="python", + type=click.Choice(['python', 'scala', 'sql'], case_sensitive=False), + help="The language for the context.") +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def create_context_cli(api_client, cluster_id, language): # NOQA + """ + Creates a new execution context. + """ + result = ExecutionContextApi(api_client).create_context(cluster_id, language) + click.echo(pretty_format(result)) + + +@click.command(context_settings=CONTEXT_SETTINGS, + short_help="Gets information about an execution context.") +@click.option('--cluster-id', required=True, type=ClusterIdClickType(), + help=ClusterIdClickType.help) +@click.option('--context-id', required=True, type=ExecutionContextIdClickType(), + help=ExecutionContextIdClickType.help) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def get_context_status_cli(api_client, cluster_id, context_id): # NOQA + """ + Gets information about an execution context. + """ + result = ExecutionContextApi(api_client).get_context_status(cluster_id, context_id) + click.echo(pretty_format(result)) + + +@click.command(context_settings=CONTEXT_SETTINGS, + short_help="Deletes an execution context.") +@click.option('--cluster-id', required=True, type=ClusterIdClickType(), + help=ClusterIdClickType.help) +@click.option('--context-id', required=True, type=ExecutionContextIdClickType(), + help=ExecutionContextIdClickType.help) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def delete_context_cli(api_client, cluster_id, context_id): # NOQA + """ + Deletes an execution context. + """ + result = ExecutionContextApi(api_client).delete_context(cluster_id, context_id) + click.echo(pretty_format(result)) + + +@click.command(context_settings=CONTEXT_SETTINGS, + short_help="Executes a command within an existing execution context.") +@click.option('--output', default=None, help=CommandOutputType.help, type=CommandOutputType()) +@click.option('--cluster-id', required=True, type=ClusterIdClickType(), + help=ClusterIdClickType.help) +@click.option('--context-id', required=True, type=ExecutionContextIdClickType(), + help=ExecutionContextIdClickType.help) +@click.option('--command', required=True, type=CommandStringType(), + help=CommandStringType.help) +@click.option('--wait', required=False, default=False, type=click.BOOL, + help="If true wait for command completion, otherwise schedule " + "the command and return immediately.") +@click.option('--language', required=False, default="python", + type=click.Choice(['python', 'scala', 'sql'], case_sensitive=False), + help="The language for the context.") +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def execute_command_cli(api_client, output, cluster_id, context_id, command, language, wait): # NOQA + """ + Executes a command within an existing execution context. + """ + client = ExecutionContextApi(api_client) + result = client.execute_command(cluster_id, context_id, command, language) + if not wait: + click.echo(pretty_format(result)) + return + + command_id = result["id"] + _wait_for_command(client, output, cluster_id, context_id, command_id) + + +@click.command(context_settings=CONTEXT_SETTINGS, + short_help="Executes a single command within a new execution context.") +@click.option('--output', default=None, help=CommandOutputType.help, + type=CommandOutputType()) +@click.option('--cluster-id', required=True, type=ClusterIdClickType(), + help=ClusterIdClickType.help) +@click.option('--command', required=True, type=CommandStringType(), + help=CommandStringType.help) +@click.option('--language', required=False, default="python", + type=click.Choice(['python', 'scala', 'sql'], case_sensitive=False), + help="The language for the context.") +@click.option('--wait', required=False, default=False, type=click.BOOL, + help="If true wait for command completion, otherwise schedule" + " the command and return immediately.") +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def execute_command_once_cli(api_client, output, cluster_id, command, language, wait): # NOQA + """ + Executes a single command within a new execution context. + The execution context is automatically created and cleaned up. + """ + client = ExecutionContextApi(api_client) + + with ExecutionContext(api_client, cluster_id) as context: + result = client.execute_command(cluster_id, context.id, command, language) + if not wait: + click.echo(pretty_format(result)) + return + + command_id = result["id"] + _wait_for_command(client, output, cluster_id, context.id, command_id) + + +def _wait_for_command(client, output, cluster_id, context_id, command_id): + while True: + status = client.get_command_status(cluster_id, context_id, command_id) + if not OutputClickType.is_json(output): + click.echo("Status: " + status["status"]) + if status["status"] in ["Cancelled", "Error", "Finished"]: + click.echo() + _print_command_result(output, status) + break + time.sleep(1) + + +def _print_command_result(output, status): + if OutputClickType.is_json(output): + click.echo(pretty_format(status)) + else: + result_type = status["results"]["resultType"] + data = status["results"]["data"] + if result_type == "text": + click.echo("\nCommand ID: %s\n" % status["id"]) + for line in data.splitlines(): + click.echo("output > %s" % line) + elif result_type == "table": + # TODO: add CSV output here + headers = [truncate_string(field["name"], 10) for field in status["results"]["schema"]] + click.echo(tabulate(tabular_data=data, + headers=headers, + tablefmt='plain', disable_numparse=True)) + else: + click.echo(data) + + +@click.command(context_settings=CONTEXT_SETTINGS, + short_help="Cancels a command.") +@click.option('--cluster-id', required=True, type=ClusterIdClickType(), + help=ClusterIdClickType.help) +@click.option('--context-id', required=True, type=ExecutionContextIdClickType(), + help=ExecutionContextIdClickType.help) +@click.option('--command-id', required=True, type=CommandIdClickType(), + help=CommandIdClickType.help) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def cancel_command_cli(api_client, cluster_id, context_id, command_id): # NOQA + """ + Cancels a command. + """ + ExecutionContextApi(api_client).cancel_command(cluster_id, context_id, command_id) + + +@click.command(context_settings=CONTEXT_SETTINGS, + short_help="Gets information about a command.") +@click.option('--cluster-id', required=True, type=ClusterIdClickType(), + help=ClusterIdClickType.help) +@click.option('--context-id', required=True, type=ExecutionContextIdClickType(), + help=ExecutionContextIdClickType.help) +@click.option('--command-id', required=True, type=CommandIdClickType(), + help=CommandIdClickType.help) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def get_command_status_cli(api_client, cluster_id, context_id, command_id): # NOQA + """ + Gets information about a command. + """ + result = ExecutionContextApi(api_client).get_command_status(cluster_id, context_id, command_id) + click.echo(pretty_format(result)) + + +@click.group(context_settings=CONTEXT_SETTINGS, + short_help='Utility to interact with Databricks execution contexts.') +@click.option('--version', '-v', is_flag=True, callback=print_version_callback, + expose_value=False, is_eager=True, help=version) +@debug_option +@profile_option +@eat_exceptions +def execution_context_group(): # pragma: no cover + """Utility to interact with execution contexts.""" + pass + + +execution_context_group.add_command(create_context_cli, name='create') +execution_context_group.add_command(get_context_status_cli, name='status') +execution_context_group.add_command(delete_context_cli, name='delete') +execution_context_group.add_command(execute_command_cli, name='command-execute') +execution_context_group.add_command(execute_command_once_cli, name='command-execute-once') +execution_context_group.add_command(cancel_command_cli, name='command-cancel') +execution_context_group.add_command(get_command_status_cli, name='command-status') diff --git a/databricks_cli/sdk/api_client.py b/databricks_cli/sdk/api_client.py index eebd9581..a3d6ce20 100644 --- a/databricks_cli/sdk/api_client.py +++ b/databricks_cli/sdk/api_client.py @@ -105,6 +105,12 @@ def __init__(self, user=None, password=None, host=None, token=None, self.api_version = api_version self.jobs_api_version = jobs_api_version + def get_v1_client(self): + """Create an SDK v1 client based on this client""" + v1_client = copy.deepcopy(self) + v1_client.api_version = "1.2" + return v1_client + def close(self): """Close the client""" pass diff --git a/databricks_cli/sdk/service.py b/databricks_cli/sdk/service.py index ac6f7611..7c16f9c1 100755 --- a/databricks_cli/sdk/service.py +++ b/databricks_cli/sdk/service.py @@ -24,7 +24,7 @@ # limitations under the License. # import os - +from typing import Dict, Any class JobsService(object): def __init__(self, client): @@ -1075,6 +1075,7 @@ def stop(self, pipeline_id=None, headers=None): return self.client.perform_query('POST', '/pipelines/{pipeline_id}/stop'.format(pipeline_id=pipeline_id), data=_data, headers=headers) + class ReposService(object): def __init__(self, client): self.client = client @@ -1114,3 +1115,80 @@ def delete_repo(self, id, headers=None): _data = {} return self.client.perform_query('DELETE', '/repos/{id}'.format(id=id), data=_data, headers=headers) + +class ExecutionContextService(object): + """The execution context service provides funcitonality from the REST API 1.2 that is still supported + This includes the execution context and run command APIs. + + https://docs.databricks.com/dev-tools/api/1.2/index.html#command-execution + """ + def __init__(self, client): + self.client = client.get_v1_client() + + def create_context(self, cluster_id, language = "python", headers=None): + # type: (str, str, Dict[Any, Any]) -> Dict[Any, Any] + _data = {} + _data['language'] = language + if cluster_id is not None: + _data['clusterId'] = cluster_id + + return self.client.perform_query(method="POST", path="/contexts/create", data=_data, headers=headers) + + def get_context_status(self, cluster_id, context_id, headers=None): + # type: (str, str, Dict[Any, Any]) -> Dict[Any, Any] + _data = {} + if cluster_id is not None: + _data['clusterId'] = cluster_id + if context_id is not None: + _data['contextId'] = context_id + + return self.client.perform_query(method="GET", path="/contexts/status", data=_data, headers=headers) + + def delete_context(self, cluster_id, context_id, headers=None): + # type: (str, str, Dict[Any, Any]) -> Any + _data = {} + if cluster_id is not None: + _data['clusterId'] = cluster_id + if context_id is not None: + _data['contextId'] = context_id + + return self.client.perform_query(method="POST", path="/contexts/destroy", data=_data, headers=headers) + + def execute_command(self, cluster_id, context_id, command, language="python", headers=None): + # type: (str, str, str, str, Dict[Any, Any]) -> Dict[Any, Any] + _data = {} + _data['language'] = language + if cluster_id is not None: + _data['clusterId'] = cluster_id + if context_id is not None: + _data['contextId'] = context_id + if command is not None: + _data['command'] = command + + result = self.client.perform_query(method="POST", path="/commands/execute", data=_data, headers=headers) + return result + + def cancel_command(self, cluster_id, context_id, command_id, headers = None): + # type: (str, str, str, Dict[Any, Any]) -> Dict[Any, Any] + _data = {} + if cluster_id is not None: + _data['clusterId'] = cluster_id + if context_id is not None: + _data['contextId'] = context_id + if command_id is not None: + _data['commandId'] = command_id + + self.client.perform_query(method="POST", path="/commands/cancel", data=_data, headers=headers) + + def get_command_status(self, cluster_id, context_id, command_id, headers = None): + # type: (str, str, str, Dict[Any, Any]) -> Dict[Any, Any] + _data = {} + if cluster_id is not None: + _data['clusterId'] = cluster_id + if context_id is not None: + _data['contextId'] = context_id + if command_id is not None: + _data['commandId'] = command_id + + result = self.client.perform_query(method="GET", path="/commands/status", data=_data, headers=headers) + return result \ No newline at end of file