diff --git a/databricks_cli/click_types.py b/databricks_cli/click_types.py index ddc86a3a..463efe82 100644 --- a/databricks_cli/click_types.py +++ b/databricks_cli/click_types.py @@ -98,7 +98,13 @@ class SecretPrincipalClickType(ParamType): class PipelineSpecClickType(ParamType): name = 'SPEC' - help = 'The path to the pipelines deployment spec file.' + help = '[Deprecated] Use the settings option instead. \n' + \ + 'The path to the pipelines settings file.' + + +class PipelineSettingClickType(ParamType): + name = 'SETTINGS' + help = 'The path to the pipelines settings file.' class PipelineIdClickType(ParamType): diff --git a/databricks_cli/pipelines/api.py b/databricks_cli/pipelines/api.py index 2b2e9737..e6fe720c 100644 --- a/databricks_cli/pipelines/api.py +++ b/databricks_cli/pipelines/api.py @@ -40,14 +40,14 @@ def __init__(self, api_client): self.client = DeltaPipelinesService(api_client) self.dbfs_client = DbfsApi(api_client) - def create(self, spec, spec_dir, allow_duplicate_names, headers=None): - data = self._upload_libraries_and_update_spec(spec, spec_dir) + def create(self, settings, settings_dir, allow_duplicate_names, headers=None): + data = self._upload_libraries_and_update_settings(settings, settings_dir) data['allow_duplicate_names'] = allow_duplicate_names return self.client.client.perform_query('POST', '/pipelines', data=data, headers=headers) - def deploy(self, spec, spec_dir, allow_duplicate_names, headers=None): - data = self._upload_libraries_and_update_spec(spec, spec_dir) + def edit(self, settings, settings_dir, allow_duplicate_names, headers=None): + data = self._upload_libraries_and_update_settings(settings, settings_dir) data['allow_duplicate_names'] = allow_duplicate_names pipeline_id = data['id'] self.client.client.perform_query('PUT', '/pipelines/{}'.format(pipeline_id), data=data, @@ -86,14 +86,15 @@ def start_update(self, pipeline_id, full_refresh=None, headers=None): def stop(self, pipeline_id, headers=None): self.client.stop(pipeline_id, headers) - def _upload_libraries_and_update_spec(self, spec, spec_dir): - spec = copy.deepcopy(spec) - lib_objects = LibraryObject.from_json(spec.get('libraries', [])) + def _upload_libraries_and_update_settings(self, settings, settings_dir): + settings = copy.deepcopy(settings) + lib_objects = LibraryObject.from_json(settings.get('libraries', [])) local_lib_objects, external_lib_objects = self._identify_local_libraries(lib_objects) - spec['libraries'] = LibraryObject.to_json( - external_lib_objects + self._upload_local_libraries(spec_dir, local_lib_objects)) - return spec + settings['libraries'] = LibraryObject.to_json( + external_lib_objects + self._upload_local_libraries( + settings_dir, local_lib_objects)) + return settings @staticmethod def _identify_local_libraries(lib_objects): @@ -124,9 +125,10 @@ def _identify_local_libraries(lib_objects): external_lib_objects.append(lib_object) return local_lib_objects, external_lib_objects - def _upload_local_libraries(self, spec_dir, local_lib_objects): - relative_local_lib_objects = [LibraryObject(llo.lib_type, os.path.join(spec_dir, llo.path)) - for llo in local_lib_objects] + def _upload_local_libraries(self, settings_dir, local_lib_objects): + relative_local_lib_objects = [ + LibraryObject( + llo.lib_type, os.path.join(settings_dir, llo.path)) for llo in local_lib_objects] remote_lib_objects = [LibraryObject(rllo.lib_type, self._get_hashed_path(rllo.path)) for rllo in relative_local_lib_objects] transformed_remote_lib_objects = [LibraryObject(rlo.lib_type, DbfsPath(rlo.path)) diff --git a/databricks_cli/pipelines/cli.py b/databricks_cli/pipelines/cli.py index a62ba958..3b65660d 100644 --- a/databricks_cli/pipelines/cli.py +++ b/databricks_cli/pipelines/cli.py @@ -33,7 +33,8 @@ import click -from databricks_cli.click_types import PipelineSpecClickType, PipelineIdClickType +from databricks_cli.click_types import PipelineSpecClickType, \ + PipelineSettingClickType, PipelineIdClickType from databricks_cli.version import print_version_callback, version from databricks_cli.pipelines.api import PipelinesApi from databricks_cli.configure.config import provide_api_client, profile_option, debug_option @@ -49,28 +50,168 @@ @click.command(context_settings=CONTEXT_SETTINGS, - short_help='Deploys a pipeline according to the pipeline specification.') -@click.argument('spec_arg', default=None, required=False) -@click.option('--spec', default=None, type=PipelineSpecClickType(), help=PipelineSpecClickType.help) + short_help='Creates a pipeline.') +@click.argument('settings_arg', default=None, required=False) +@click.option('--settings', default=None, + type=PipelineSettingClickType(), help=PipelineSettingClickType.help) @click.option('--allow-duplicate-names', is_flag=True, - help="If true, skips duplicate name checking while deploying the pipeline.") + help="If true, skips duplicate name checking while creating the pipeline.") +@debug_option +@profile_option +@pipelines_exception_eater +@provide_api_client +def create_cli(api_client, settings_arg, settings, allow_duplicate_names): + # pylint: disable=line-too-long + """ + Creates a pipeline specified by the pipeline settings. The pipeline settings are a + JSON document that defines a Delta Live Tables pipeline on Databricks. + + To use a file containing the pipeline settings, pass the file path to the command as + an argument or with the --settings option. If the pipeline creation is successful, logs + the URL and the ID of the new pipeline to STDOUT. + + Specification for the pipeline settings JSON can be found at + https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-configuration.html + + If a pipeline with the same name already exists, the pipeline will not be created. + This check can be disabled by adding the --allow-duplicate-names option. + + If the pipeline settings contain an "id" field, this command will fail. + + Usage: + + databricks pipelines create example.json + + OR + + databricks pipelines create --settings example.json + """ + # pylint: enable=line-too-long + if bool(settings_arg) == bool(settings): + raise ValueError('Settings should be provided either as an argument ' + + '(databricks pipelines create example.json) or as ' + + 'an option (databricks pipelines create --settings example.json).') + + src = settings_arg if bool(settings_arg) else settings + settings_obj = _read_settings(src) + settings_dir = os.path.dirname(src) + + if 'id' in settings_obj: + raise ValueError("Pipeline settings shouldn't contain \"id\" " + "when creating a pipeline.") + + try: + response = PipelinesApi(api_client).create( + settings_obj, settings_dir, allow_duplicate_names) + except requests.exceptions.HTTPError as e: + _handle_duplicate_name_exception(settings_obj, e, is_create_pipeline=True) + + new_pipeline_id = response['pipeline_id'] + click.echo("Successfully created pipeline: {} with ID: {}.".format( + _get_pipeline_url(api_client, new_pipeline_id), new_pipeline_id)) + + +@click.command(context_settings=CONTEXT_SETTINGS, + short_help='Edits a pipeline.') +@click.argument('settings_arg', default=None, required=False) +@click.option('--settings', default=None, type=PipelineSettingClickType(), + help=PipelineSettingClickType.help) @click.option('--pipeline-id', default=None, type=PipelineIdClickType(), help=PipelineIdClickType.help) +@click.option('--allow-duplicate-names', is_flag=True, + help="Skip duplicate name check while editing pipeline.") @debug_option @profile_option @pipelines_exception_eater @provide_api_client -def deploy_cli(api_client, spec_arg, spec, allow_duplicate_names, pipeline_id): +def edit_cli(api_client, settings_arg, settings, pipeline_id, allow_duplicate_names): + # pylint: disable=line-too-long """ - Deploys a pipeline according to the pipeline specification. The pipeline spec is a - JSON document that defines the required settings to run a Delta Live Tables pipeline - on Databricks. All local libraries referenced in the spec are uploaded to DBFS. + Edits a pipeline specified by the pipeline settings. The pipeline settings are a + JSON document that defines a Delta Live Tables pipeline on Databricks. To use a + file containing the pipeline settings, pass the file path to the command as an + argument or with the --settings option. + + Specification for the pipeline settings JSON can be found at + https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-configuration.html + + If another pipeline with the same name exists, pipeline settings will not be edited. + This check can be disabled by adding the --allow-duplicate-names option. + + Note that if an ID is specified in both the settings and passed with the --pipeline-id argument, + the two ids must be the same, or the command will fail. - If the pipeline spec contains an "id" field, or if a pipeline ID is specified directly + Usage: + + databricks pipelines edit example.json + + OR + + databricks pipelines edit --settings example.json + """ + # pylint: enable=line-too-long + if bool(settings_arg) == bool(settings): + raise ValueError('Settings should be provided either as an argument ' + + '(databricks pipelines edit example.json) or as ' + + 'an option (databricks pipelines edit --settings example.json).') + + src = settings_arg if bool(settings_arg) else settings + settings_obj = _read_settings(src) + settings_dir = os.path.dirname(src) + + if (pipeline_id and 'id' in settings_obj) and pipeline_id != settings_obj["id"]: + raise ValueError( + "The ID provided in --pipeline_id '{}' is different from the ID provided " + "in the settings '{}'. Resolve the conflict and try the command again. ".format( + pipeline_id, settings_obj["id"]) + ) + + settings_obj['id'] = pipeline_id or settings_obj.get('id', None) + _validate_pipeline_id(settings_obj['id']) + + try: + PipelinesApi(api_client).edit(settings_obj, settings_dir, allow_duplicate_names) + except requests.exceptions.HTTPError as e: + _handle_duplicate_name_exception(settings_obj, e, is_create_pipeline=False) + click.echo("Successfully edited pipeline settings: {}.".format( + _get_pipeline_url(api_client, settings_obj['id']))) + + +@click.command(context_settings=CONTEXT_SETTINGS, + short_help='[Deprecated] This command is deprecated, use create and edit ' + 'commands instead.\n Creates or edits a pipeline specified by the ' + 'pipeline settings.') +@click.argument('settings_arg', default=None, required=False) +@click.option('--settings', default=None, type=PipelineSettingClickType(), + help=PipelineSettingClickType.help) +@click.option('--spec', default=None, type=PipelineSpecClickType(), + help=PipelineSpecClickType.help) +@click.option('--allow-duplicate-names', is_flag=True, + help="Skip duplicate name check while deploying pipeline.") +@click.option('--pipeline-id', default=None, type=PipelineIdClickType(), + help=PipelineIdClickType.help) +@debug_option +@profile_option +@pipelines_exception_eater +@provide_api_client +def deploy_cli(api_client, settings_arg, settings, spec, allow_duplicate_names, pipeline_id): + # pylint: disable=line-too-long + """ + [Deprecated] This command is deprecated, use create and edit commands instead. + + Creates or edits a pipeline specified by the pipeline settings. The pipeline settings + are a JSON document that defines a Delta Live Tables pipeline on Databricks. To use a + file containing the pipeline settings, pass the file path to the command as an + argument or with the --settings option. + + Specification for the pipeline settings JSON can be found at + https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-configuration.html + + If the pipeline settings contains an "id" field, or if a pipeline ID is specified directly (using the --pipeline-id argument), attempts to update an existing pipeline - with that ID. If it does not, creates a new pipeline and logs the ID of the new pipeline - to STDOUT. Note that if an ID is both specified in the spec and passed via --pipeline-id, - the two IDs must be the same, or the command will fail. + with that ID. If it does not, creates a new pipeline and logs the URL and the ID of the + new pipeline to STDOUT. Note that if an ID is specified in both the settings and passed + with the --pipeline-id argument, the two IDs must be the same, or the command will fail. The deploy command will not create a new pipeline if a pipeline with the same name already exists. This check can be disabled by adding the --allow-duplicate-names option. @@ -81,51 +222,62 @@ def deploy_cli(api_client, spec_arg, spec, allow_duplicate_names, pipeline_id): OR - databricks pipelines deploy --spec example.json + databricks pipelines deploy --settings example.json OR - databricks pipelines deploy --pipeline-id 1234 --spec example.json + databricks pipelines deploy --pipeline-id 1234 --settings example.json """ - if bool(spec_arg) == bool(spec): - raise ValueError('The spec should be provided either by an option or argument') - src = spec_arg if bool(spec_arg) else spec - spec_obj = _read_spec(src) - spec_dir = os.path.dirname(src) - if not pipeline_id and 'id' not in spec_obj: + # pylint: enable=line-too-long + click.echo("DeprecationWarning: the \"deploy\" command is deprecated, " + + "use \"create\" command to create a new pipeline or \"edit\" command " + + "to modify an existing pipeline.\n") + + settings_error_msg = 'Settings should be provided either as an argument ' \ + '(databricks pipelines deploy example.json) or as ' \ + 'an option (databricks pipelines deploy --settings example.json).' + if bool(spec): + if bool(spec) == bool(settings): + raise ValueError(settings_error_msg) + settings = spec + + if bool(settings_arg) == bool(settings): + raise ValueError(settings_error_msg) + + src = settings_arg if bool(settings_arg) else settings + settings_obj = _read_settings(src) + settings_dir = os.path.dirname(src) + if not pipeline_id and 'id' not in settings_obj: try: - response = PipelinesApi(api_client).create(spec_obj, spec_dir, allow_duplicate_names) + response = PipelinesApi(api_client).create( + settings_obj, settings_dir, allow_duplicate_names) except requests.exceptions.HTTPError as e: - _handle_duplicate_name_exception(spec_obj, e) + _handle_duplicate_name_exception(settings_obj, e, is_create_pipeline=True) new_pipeline_id = response['pipeline_id'] - click.echo("Pipeline has been assigned ID {}".format(new_pipeline_id)) - click.echo("Successfully created pipeline: {}".format( - _get_pipeline_url(api_client, new_pipeline_id))) - click.echo(new_pipeline_id, err=True) + click.echo("Successfully created pipeline: {} with ID: {}".format( + _get_pipeline_url(api_client, new_pipeline_id), new_pipeline_id)) else: - if (pipeline_id and 'id' in spec_obj) and pipeline_id != spec_obj["id"]: + if (pipeline_id and 'id' in settings_obj) and pipeline_id != settings_obj["id"]: raise ValueError( "The ID provided in --pipeline_id '{}' is different from the ID provided " - "in the spec '{}'. Resolve the conflict and try the command again. " - "Because pipeline IDs are no longer persisted after being deleted, Databricks " - "recommends removing the ID field from your spec." - .format(pipeline_id, spec_obj["id"]) + "in the settings '{}'. Resolve the conflict and try the command again.".format( + pipeline_id, settings_obj["id"]) ) - spec_obj['id'] = pipeline_id or spec_obj.get('id', None) - _validate_pipeline_id(spec_obj['id']) - + settings_obj['id'] = pipeline_id or settings_obj.get('id', None) + _validate_pipeline_id(settings_obj['id']) try: - PipelinesApi(api_client).deploy(spec_obj, spec_dir, allow_duplicate_names) + PipelinesApi(api_client).edit( + settings_obj, settings_dir, allow_duplicate_names) except requests.exceptions.HTTPError as e: - _handle_duplicate_name_exception(spec_obj, e) - click.echo("Successfully deployed pipeline: {}".format( - _get_pipeline_url(api_client, spec_obj['id']))) + _handle_duplicate_name_exception(settings_obj, e, is_create_pipeline=False) + click.echo("Successfully deployed pipeline: {}.".format( + _get_pipeline_url(api_client, settings_obj['id']))) @click.command(context_settings=CONTEXT_SETTINGS, - short_help='Stops the pipeline by cancelling any active update and deletes it.') + short_help='Deletes the pipeline and cancels any active updates.') @click.option('--pipeline-id', default=None, type=PipelineIdClickType(), help=PipelineIdClickType.help) @debug_option @@ -134,7 +286,7 @@ def deploy_cli(api_client, spec_arg, spec, allow_duplicate_names, pipeline_id): @provide_api_client def delete_cli(api_client, pipeline_id): """ - Stops the pipeline by cancelling any active update and deletes it. + Deletes the pipeline and cancels any active updates. Usage: @@ -146,7 +298,7 @@ def delete_cli(api_client, pipeline_id): @click.command(context_settings=CONTEXT_SETTINGS, - short_help='Gets a pipeline\'s current spec and status.') + short_help='Gets a pipeline\'s current settings and status.') @click.option('--pipeline-id', default=None, type=PipelineIdClickType(), help=PipelineIdClickType.help) @debug_option @@ -155,7 +307,7 @@ def delete_cli(api_client, pipeline_id): @provide_api_client def get_cli(api_client, pipeline_id): """ - Gets a pipeline's current spec and status. + Gets a pipeline's current settings and status. Usage: @@ -166,12 +318,19 @@ def get_cli(api_client, pipeline_id): @click.command(context_settings=CONTEXT_SETTINGS, - short_help='Gets a pipeline\'s current spec and status.') + short_help='Lists all pipelines and their statuses.') @debug_option @profile_option @pipelines_exception_eater @provide_api_client def list_cli(api_client): + """ + Lists all pipelines and their statuses. + + Usage: + + databricks pipelines list + """ click.echo(pretty_format(PipelinesApi(api_client).list())) @@ -233,9 +392,9 @@ def run_cli(api_client, pipeline_id): short_help='Starts a pipeline update.') @click.option('--pipeline-id', default=None, type=PipelineIdClickType(), help=PipelineIdClickType.help) -@click.option('--full-refresh', default=False, type=bool, - help='If true, truncates tables and creates new checkpoint' + - ' folders so that data is reprocessed from the beginning.') +@click.option('--full-refresh', default=False, type=bool, is_flag=True, + help='If present, truncates tables and creates new checkpoint ' + + 'folders so that data is reprocessed from the beginning.') @debug_option @profile_option @pipelines_exception_eater @@ -246,7 +405,7 @@ def start_cli(api_client, pipeline_id, full_refresh): Usage: - databricks pipelines start --pipeline-id 1234 --full-refresh=true + databricks pipelines start --pipeline-id 1234 --full-refresh """ _validate_pipeline_id(pipeline_id) resp = PipelinesApi(api_client).start_update(pipeline_id, full_refresh=full_refresh) @@ -286,21 +445,22 @@ def _gen_start_update_msg(resp, pipeline_id, full_refresh): return output_msg -def _read_spec(src): +def _read_settings(src): """ - Reads the spec at src as a JSON if no file extension is provided, or if in the extension format - if the format is supported. + Reads the settings at src as a JSON If the file has JSON extension or + if no file extension is provided. Other file extensions and formats are + not supported. """ extension = os.path.splitext(src)[1] - if extension.lower() == '.json': + if extension.lower() == '.json' or (not extension): try: with open(src, 'r') as f: data = f.read() return json.loads(data) except json_parse_exception as e: - error_and_quit("Invalid JSON provided in spec\n{}".format(e)) + error_and_quit("Invalid JSON provided in settings\n{}.".format(e)) else: - raise ValueError('The provided file extension for the spec is not supported. ' + + raise ValueError('The provided file extension for the settings is not supported. ' + 'Only JSON files are supported.') @@ -309,29 +469,15 @@ def _get_pipeline_url(api_client, pipeline_id): return urljoin(base_url, "#joblist/pipelines/{}".format(pipeline_id)) -def _write_spec(src, spec): - """ - Writes the spec at src as JSON. - """ - data = json.dumps(spec, indent=2) + '\n' - with open(src, 'w') as f: - f.write(data) - - def _validate_pipeline_id(pipeline_id): """ - Checks if the pipeline ID is not empty and contains only hyphen (-), - underscore (_), and alphanumeric characters. + Checks if the pipeline ID is not empty. """ if pipeline_id is None or len(pipeline_id) == 0: error_and_quit(u'Empty pipeline ID provided') - if not set(pipeline_id) <= PIPELINE_ID_PERMITTED_CHARACTERS: - message = u'Pipeline ID {} has invalid character(s)\n'.format(pipeline_id) - message += u'Valid characters are: _ - a-z A-Z 0-9' - error_and_quit(message) -def _handle_duplicate_name_exception(spec, exception): +def _handle_duplicate_name_exception(settings, exception, is_create_pipeline): error_code = None try: error_code = json.loads(exception.response.text).get('error_code') @@ -339,27 +485,36 @@ def _handle_duplicate_name_exception(spec, exception): pass if error_code == 'RESOURCE_CONFLICT': - raise ValueError("Pipeline with name '{}' already exists. ".format(spec['name']) + - "If you are updating an existing pipeline, provide the pipeline " + - "ID using --pipeline-id. Otherwise, " + - "you can use the --allow-duplicate-names option to skip this check. ") + if is_create_pipeline: + raise ValueError( + "Pipeline with name '{}' already exists. ".format(settings['name']) + + "If you are updating an existing pipeline, use \"edit\" command. " + "Otherwise, You can use the --allow-duplicate-names option to skip " + "this check. ") + + raise ValueError( + "Pipeline with name '{}' already exists. ".format(settings['name']) + + "You can use the --allow-duplicate-names option to skip this check. ") + raise exception @click.group(context_settings=CONTEXT_SETTINGS, - short_help='Utility to interact with the Databricks Delta Pipelines.') + short_help='Utility to interact with Databricks Delta Live Tables Pipelines.') @click.option('--version', '-v', is_flag=True, callback=print_version_callback, expose_value=False, is_eager=True, help=version) @debug_option @profile_option def pipelines_group(): # pragma: no cover """ - Utility to interact with the Databricks pipelines. + Utility to interact with Databricks Delta Live Tables Pipelines. """ pass pipelines_group.add_command(deploy_cli, name='deploy') +pipelines_group.add_command(create_cli, name="create") +pipelines_group.add_command(edit_cli, name="edit") pipelines_group.add_command(delete_cli, name='delete') pipelines_group.add_command(get_cli, name='get') pipelines_group.add_command(list_cli, name='list') diff --git a/tests/pipelines/test_api.py b/tests/pipelines/test_api.py index 96e6e580..4d0554d8 100644 --- a/tests/pipelines/test_api.py +++ b/tests/pipelines/test_api.py @@ -81,11 +81,11 @@ def test_create_pipeline_and_upload_libraries(put_file_mock, dbfs_path_validate, @mock.patch('databricks_cli.dbfs.api.DbfsApi.put_file') def test_deploy_pipeline_and_upload_libraries(put_file_mock, dbfs_path_validate, pipelines_api, tmpdir): - _test_library_uploads(pipelines_api, pipelines_api.deploy, SPEC, put_file_mock, + _test_library_uploads(pipelines_api, pipelines_api.edit, SPEC, put_file_mock, dbfs_path_validate, tmpdir, False) -def _test_library_uploads(pipelines_api, api_method, spec, put_file_mock, dbfs_path_validate, +def _test_library_uploads(pipelines_api, api_method, settings, put_file_mock, dbfs_path_validate, tmpdir, allow_duplicate_names): """ Scenarios Tested: @@ -98,7 +98,7 @@ def _test_library_uploads(pipelines_api, api_method, spec, put_file_mock, dbfs_p A test local file which has '456' written to it is not present in Dbfs and therefore must be. uploaded to dbfs. """ - spec = copy.deepcopy(spec) + settings = copy.deepcopy(settings) # set-up the test jar1 = tmpdir.join('jar1.jar').strpath @@ -135,9 +135,9 @@ def _test_library_uploads(pipelines_api, api_method, spec, put_file_mock, dbfs_p {'whl': wheel1}, ] - expected_data = copy.deepcopy(spec) + expected_data = copy.deepcopy(settings) - spec['libraries'] = libraries + settings['libraries'] = libraries hash123 = "40bd001563085fc35165329ea1ff5c5ecbdbbeef" hash456 = "51eac6b471a284d3341d8c0c63d0f1a286262a18" @@ -155,7 +155,7 @@ def _test_library_uploads(pipelines_api, api_method, spec, put_file_mock, dbfs_p ] expected_data['allow_duplicate_names'] = allow_duplicate_names - api_method(spec, tmpdir.strpath, allow_duplicate_names) + api_method(settings, tmpdir.strpath, allow_duplicate_names) assert dbfs_path_validate.call_count == 5 assert put_file_mock.call_count == 4 assert put_file_mock.call_args_list[0][0][0] == jar2 @@ -171,17 +171,17 @@ def _test_library_uploads(pipelines_api, api_method, spec, put_file_mock, dbfs_p def test_create(pipelines_api): client_mock = pipelines_api.client.client.perform_query - spec = copy.deepcopy(SPEC_WITHOUT_ID) - spec['libraries'] = [] + settings = copy.deepcopy(SPEC_WITHOUT_ID) + settings['libraries'] = [] - pipelines_api.create(spec, spec_dir='.', allow_duplicate_names=False) - data = copy.deepcopy(spec) + pipelines_api.create(settings, settings_dir='.', allow_duplicate_names=False) + data = copy.deepcopy(settings) data['allow_duplicate_names'] = False client_mock.assert_called_with("POST", "/pipelines", data=data, headers=None) assert client_mock.call_count == 1 - pipelines_api.create(spec, spec_dir='.', allow_duplicate_names=True, headers=HEADERS) - data = copy.deepcopy(spec) + pipelines_api.create(settings, settings_dir='.', allow_duplicate_names=True, headers=HEADERS) + data = copy.deepcopy(settings) data['allow_duplicate_names'] = True client_mock.assert_called_with("POST", "/pipelines", data=data, headers=HEADERS) assert client_mock.call_count == 2 @@ -190,17 +190,17 @@ def test_create(pipelines_api): def test_deploy(pipelines_api): client_mock = pipelines_api.client.client.perform_query - spec = copy.deepcopy(SPEC) - spec['libraries'] = [] + settings = copy.deepcopy(SPEC) + settings['libraries'] = [] - pipelines_api.deploy(spec, spec_dir='.', allow_duplicate_names=False) - data = copy.deepcopy(spec) + pipelines_api.edit(settings, settings_dir='.', allow_duplicate_names=False) + data = copy.deepcopy(settings) data['allow_duplicate_names'] = False client_mock.assert_called_with("PUT", "/pipelines/" + PIPELINE_ID, data=data, headers=None) assert client_mock.call_count == 1 - pipelines_api.deploy(spec, spec_dir='.', allow_duplicate_names=True, headers=HEADERS) - data = copy.deepcopy(spec) + pipelines_api.edit(settings, settings_dir='.', allow_duplicate_names=True, headers=HEADERS) + data = copy.deepcopy(settings) data['allow_duplicate_names'] = True client_mock.assert_called_with("PUT", "/pipelines/" + PIPELINE_ID, data=data, headers=HEADERS) assert client_mock.call_count == 2 diff --git a/tests/pipelines/test_cli.py b/tests/pipelines/test_cli.py index 2d08bf28..499090a7 100644 --- a/tests/pipelines/test_cli.py +++ b/tests/pipelines/test_cli.py @@ -34,8 +34,8 @@ import databricks_cli.pipelines.cli as cli from tests.utils import provide_conf -DEPLOY_SPEC_NO_ID = '{"name": "asdf"}' -DEPLOY_SPEC = '{"id": "123", "name": "asdf"}' +PIPELINE_SETTINGS_NO_ID = '{"name": "asdf"}' +PIPELINE_SETTINGS = '{"id": "123", "name": "asdf"}' PIPELINE_ID = "123" @@ -57,12 +57,12 @@ def click_ctx(): @provide_conf -def test_create_pipeline_spec_arg(pipelines_api_mock, tmpdir): +def test_create_pipeline_settings_arg(pipelines_api_mock, tmpdir): pipelines_api_mock.create = mock.Mock(return_value={"pipeline_id": PIPELINE_ID}) - path = tmpdir.join('/spec.json').strpath + path = tmpdir.join('/settings.json').strpath with open(path, 'w') as f: - f.write(DEPLOY_SPEC_NO_ID) + f.write(PIPELINE_SETTINGS_NO_ID) runner = CliRunner() result = runner.invoke(cli.deploy_cli, [path]) @@ -71,12 +71,12 @@ def test_create_pipeline_spec_arg(pipelines_api_mock, tmpdir): @provide_conf -def test_create_pipeline_spec_option(pipelines_api_mock, tmpdir): +def test_create_pipeline_settings_option(pipelines_api_mock, tmpdir): pipelines_api_mock.create = mock.Mock(return_value={"pipeline_id": PIPELINE_ID}) - path = tmpdir.join('/spec.json').strpath + path = tmpdir.join('/settings.json').strpath with open(path, 'w') as f: - f.write(DEPLOY_SPEC_NO_ID) + f.write(PIPELINE_SETTINGS_NO_ID) runner = CliRunner() result = runner.invoke(cli.deploy_cli, ['--spec', path]) @@ -85,37 +85,76 @@ def test_create_pipeline_spec_option(pipelines_api_mock, tmpdir): @provide_conf -def test_deploy_cli_spec_arg(pipelines_api_mock, tmpdir): - path = tmpdir.join('/spec.json').strpath +def test_edit_and_deploy_cli_settings_arg(pipelines_api_mock, tmpdir): + path = tmpdir.join('/settings.json').strpath with open(path, 'w') as f: - f.write(DEPLOY_SPEC) + f.write(PIPELINE_SETTINGS) runner = CliRunner() - runner.invoke(cli.deploy_cli, [path]) - assert pipelines_api_mock.deploy.call_args[0][0] == json.loads(DEPLOY_SPEC) + for cmd in [cli.deploy_cli, cli.edit_cli]: + pipelines_api_mock.reset_mock() + runner.invoke(cmd, [path]) + assert pipelines_api_mock.edit.call_args[0][0] == json.loads(PIPELINE_SETTINGS) @provide_conf -def test_deploy_spec_option(pipelines_api_mock, tmpdir): - path = tmpdir.join('/spec.json').strpath +def test_create_cli_settings_arg(pipelines_api_mock, tmpdir): + path = tmpdir.join('/settings.json').strpath with open(path, 'w') as f: - f.write(DEPLOY_SPEC) + f.write(PIPELINE_SETTINGS_NO_ID) runner = CliRunner() - runner.invoke(cli.deploy_cli, ['--spec', path]) - assert pipelines_api_mock.deploy.call_args[0][0] == json.loads(DEPLOY_SPEC) + for cmd in [cli.deploy_cli, cli.create_cli]: + pipelines_api_mock.reset_mock() + runner.invoke(cmd, [path]) + assert pipelines_api_mock.create.call_args[0][0] == json.loads(PIPELINE_SETTINGS_NO_ID) + + +@provide_conf +def test_deploy_settings_option(pipelines_api_mock, tmpdir): + path = tmpdir.join('/settings.json').strpath + with open(path, 'w') as f: + f.write(PIPELINE_SETTINGS) + + path_no_id = tmpdir.join('/settings_no_id.json').strpath + with open(path_no_id, 'w') as f: + f.write(PIPELINE_SETTINGS_NO_ID) + + runner = CliRunner() + for option in ['--settings', '--spec']: + pipelines_api_mock.reset_mock() + runner.invoke(cli.deploy_cli, [option, path]) + assert pipelines_api_mock.edit.call_args[0][0] == json.loads(PIPELINE_SETTINGS) + + runner.invoke(cli.deploy_cli, [option, path_no_id]) + assert pipelines_api_mock.create.call_args[0][0] == json.loads(PIPELINE_SETTINGS_NO_ID) @provide_conf def test_deploy_cli_incorrect_parameters(pipelines_api_mock, tmpdir): - path = tmpdir.join('/spec.json').strpath + path = tmpdir.join('/settings.json').strpath with open(path, 'w') as f: - f.write(DEPLOY_SPEC) + f.write(PIPELINE_SETTINGS) runner = CliRunner() - result = runner.invoke(cli.deploy_cli, [path, '--spec', path]) - assert result.exit_code == 1 - assert pipelines_api_mock.deploy.call_count == 0 - result = runner.invoke(cli.deploy_cli, ['--spec', path, path]) - assert result.exit_code == 1 - assert pipelines_api_mock.deploy.call_count == 0 + + for option in ['--settings', '--spec']: + pipelines_api_mock.reset_mock() + result = runner.invoke(cli.deploy_cli, [path, option, path]) + assert result.exit_code == 1 + assert pipelines_api_mock.edit.call_count == 0 + result = runner.invoke(cli.deploy_cli, [option, path, path]) + assert result.exit_code == 1 + assert pipelines_api_mock.edit.call_count == 0 + + +@provide_conf +def test_only_one_of_settings_or_settings_should_be_provided(pipelines_api_mock, tmpdir): + path = tmpdir.join('/settings.json').strpath + with open(path, 'w') as f: + f.write(PIPELINE_SETTINGS) + runner = CliRunner() + result = runner.invoke(cli.deploy_cli, [path, '--settings', path, '--spec', path]) + assert "ValueError: Settings should be provided" in result.stdout + assert pipelines_api_mock.create.call_count == 0 + assert pipelines_api_mock.edit.call_count == 0 @provide_conf @@ -134,50 +173,69 @@ def test_delete_cli_correct_parameters(pipelines_api_mock): @provide_conf -def test_deploy_spec_pipeline_id_is_not_changed_if_provided_in_spec(tmpdir): - path = tmpdir.join('/spec.json').strpath +def test_deploy_settings_pipeline_id_is_not_changed_if_provided_in_spec(tmpdir): + path = tmpdir.join('/settings.json').strpath with open(path, 'w') as f: - f.write(DEPLOY_SPEC) + f.write(PIPELINE_SETTINGS) runner = CliRunner() - result = runner.invoke(cli.deploy_cli, ['--spec', path]) - + result = runner.invoke(cli.deploy_cli, ['--settings', path]) assert '123' in result.stdout @provide_conf -def test_deploy_update_delete_cli_correct_spec_extensions(pipelines_api_mock, tmpdir): +def test_correct_settings_extensions(pipelines_api_mock, tmpdir): pipelines_api_mock.create = mock.Mock(return_value={"pipeline_id": PIPELINE_ID}) runner = CliRunner() - path_json = tmpdir.join('/spec.json').strpath + + path_no_extension = tmpdir.join('/settings').strpath + with open(path_no_extension, 'w') as f: + f.write(PIPELINE_SETTINGS_NO_ID) + + for cmd in [cli.deploy_cli, cli.create_cli]: + pipelines_api_mock.reset_mock() + result = runner.invoke(cmd, ['--settings', path_no_extension]) + assert result.exit_code == 0 + assert pipelines_api_mock.create.call_count == 1 + + path_json = tmpdir.join('/settings.json').strpath with open(path_json, 'w') as f: - f.write(DEPLOY_SPEC_NO_ID) - result = runner.invoke(cli.deploy_cli, ['--spec', path_json]) - assert result.exit_code == 0 - assert pipelines_api_mock.create.call_count == 1 + f.write(PIPELINE_SETTINGS_NO_ID) - result = runner.invoke(cli.deploy_cli, ['--spec', path_json, '--pipeline-id', PIPELINE_ID]) - assert result.exit_code == 0 - assert pipelines_api_mock.deploy.call_count == 1 + for cmd in [cli.deploy_cli, cli.create_cli]: + pipelines_api_mock.reset_mock() + result = runner.invoke(cmd, ['--settings', path_json]) + assert result.exit_code == 0 + assert pipelines_api_mock.create.call_count == 1 + + for cmd in [cli.deploy_cli, cli.edit_cli]: + pipelines_api_mock.reset_mock() + result = runner.invoke(cmd, ['--settings', path_json, '--pipeline-id', PIPELINE_ID]) + assert result.exit_code == 0 + assert pipelines_api_mock.edit.call_count == 1 result = runner.invoke(cli.delete_cli, ['--pipeline-id', PIPELINE_ID]) assert result.exit_code == 0 assert pipelines_api_mock.delete.call_count == 1 - pipelines_api_mock.reset_mock() - path_case_insensitive = tmpdir.join('/spec2.JsON').strpath + path_case_insensitive = tmpdir.join('/settings2.JsON').strpath with open(path_case_insensitive, 'w') as f: - f.write(DEPLOY_SPEC_NO_ID) - result = runner.invoke(cli.deploy_cli, ['--spec', path_case_insensitive]) - assert result.exit_code == 0 - assert pipelines_api_mock.create.call_count == 1 - - result = runner.invoke(cli.deploy_cli, [ - '--spec', path_case_insensitive, - '--pipeline-id', PIPELINE_ID - ]) - assert result.exit_code == 0 - assert pipelines_api_mock.deploy.call_count == 1 + f.write(PIPELINE_SETTINGS_NO_ID) + + for cmd in [cli.deploy_cli, cli.create_cli]: + pipelines_api_mock.reset_mock() + result = runner.invoke(cmd, ['--settings', path_case_insensitive]) + assert result.exit_code == 0 + assert pipelines_api_mock.create.call_count == 1 + + for cmd in [cli.deploy_cli, cli.edit_cli]: + pipelines_api_mock.reset_mock() + result = runner.invoke(cmd, [ + '--settings', path_case_insensitive, + '--pipeline-id', PIPELINE_ID + ]) + assert result.exit_code == 0 + assert pipelines_api_mock.edit.call_count == 1 result = runner.invoke(cli.delete_cli, ['--pipeline-id', PIPELINE_ID]) assert result.exit_code == 0 @@ -186,13 +244,15 @@ def test_deploy_update_delete_cli_correct_spec_extensions(pipelines_api_mock, tm @provide_conf -def test_deploy_with_invalid_spec_extension(pipelines_api_mock): - pipelines_api_mock.deploy = mock.Mock() - result = CliRunner().invoke(cli.deploy_cli, ['--spec', 'spec.invalid']) - assert result.exit_code == 1 - assert "ValueError: The provided file extension for the spec is not " \ - "supported" in result.stdout - assert pipelines_api_mock.deploy.call_count == 0 +def test_invalid_settings_extension(pipelines_api_mock): + for cmd in [cli.deploy_cli, cli.create_cli, cli.edit_cli]: + pipelines_api_mock.reset_mock() + result = CliRunner().invoke(cmd, ['--settings', 'settings.invalid']) + assert result.exit_code == 1 + assert "ValueError: The provided file extension for the settings is not " \ + "supported" in result.stdout + assert pipelines_api_mock.edit.call_count == 0 + assert pipelines_api_mock.create.call_count == 0 def test_gen_start_update_msg(): @@ -213,7 +273,7 @@ def test_cli_id(pipelines_api_mock): runner.invoke(cli.reset_cli, ['--pipeline-id', PIPELINE_ID]) runner.invoke(cli.run_cli, ['--pipeline-id', PIPELINE_ID]) runner.invoke(cli.start_cli, ['--pipeline-id', PIPELINE_ID]) - runner.invoke(cli.start_cli, ['--pipeline-id', PIPELINE_ID, "--full-refresh", "true"]) + runner.invoke(cli.start_cli, ['--pipeline-id', PIPELINE_ID, "--full-refresh"]) start_update_call_args_list = pipelines_api_mock.start_update.call_args_list assert start_update_call_args_list[0] == mock.call(PIPELINE_ID, full_refresh=True) @@ -240,111 +300,120 @@ def test_get_cli_id(pipelines_api_mock): @provide_conf def test_get_cli_no_id(pipelines_api_mock): + pipelines_api_mock.get.reset_mock() runner = CliRunner() result = runner.invoke(cli.get_cli, []) assert result.exit_code == 1 assert pipelines_api_mock.get.call_count == 0 -def test_validate_pipeline_id(click_ctx): - empty_pipeline_id = '' - pipeline_id_with_unicode = b'pipeline_id-\xe2\x9d\x8c-123'.decode('utf-8') - invalid_pipline_ids = ['pipeline_id-?-123', 'pipeline_id-\\-\'-123', 'pipeline_id-/-123', - pipeline_id_with_unicode, empty_pipeline_id] - with click_ctx: - for pipline_id in invalid_pipline_ids: - with pytest.raises(SystemExit): - cli._validate_pipeline_id(pipline_id) - assert cli._validate_pipeline_id('pipeline_id-ac345cd1') is None - - @provide_conf def test_duplicate_name_check_error(pipelines_api_mock, tmpdir): mock_response = mock.MagicMock() mock_response.text = '{"error_code": "RESOURCE_CONFLICT"}' - pipelines_api_mock.create = mock.Mock( - side_effect=requests.exceptions.HTTPError(response=mock_response)) - pipelines_api_mock.deploy = mock.Mock( - side_effect=requests.exceptions.HTTPError(response=mock_response)) - path = tmpdir.join('/spec.json').strpath + path = tmpdir.join('/settings.json').strpath with open(path, 'w') as f: - f.write(DEPLOY_SPEC_NO_ID) - + f.write(PIPELINE_SETTINGS_NO_ID) runner = CliRunner() - result = runner.invoke(cli.deploy_cli, [path]) - assert pipelines_api_mock.create.call_count == 1 - assert result.exit_code == 1 - assert "already exists" in result.stdout + for cmd in [cli.deploy_cli, cli.create_cli]: + pipelines_api_mock.reset_mock() + pipelines_api_mock.create = mock.Mock( + side_effect=requests.exceptions.HTTPError(response=mock_response)) + result = runner.invoke(cmd, [path]) + assert pipelines_api_mock.create.call_count == 1 + assert result.exit_code == 1 + assert "already exists" in result.stdout with open(path, 'w') as f: - f.write(DEPLOY_SPEC) - result = runner.invoke(cli.deploy_cli, [path]) - assert result.exit_code == 1 - assert pipelines_api_mock.deploy.call_count == 1 - assert "already exists" in result.stdout + f.write(PIPELINE_SETTINGS) + for cmd in [cli.deploy_cli, cli.edit_cli]: + pipelines_api_mock.reset_mock() + pipelines_api_mock.edit = mock.Mock( + side_effect=requests.exceptions.HTTPError(response=mock_response)) + result = runner.invoke(cmd, [path]) + assert result.exit_code == 1 + assert pipelines_api_mock.edit.call_count == 1 + assert "already exists" in result.stdout @provide_conf def test_allow_duplicate_names_flag(pipelines_api_mock, tmpdir): - path = tmpdir.join('/spec.json').strpath + path = tmpdir.join('/settings.json').strpath with open(path, 'w') as f: - f.write(DEPLOY_SPEC_NO_ID) + f.write(PIPELINE_SETTINGS_NO_ID) runner = CliRunner() - runner.invoke(cli.deploy_cli, [path]) - assert pipelines_api_mock.create.call_args_list[0][0][2] is False - runner.invoke(cli.deploy_cli, [path, "--allow-duplicate-names"]) - assert pipelines_api_mock.create.call_args_list[1][0][2] is True + for cmd in [cli.deploy_cli, cli.create_cli]: + pipelines_api_mock.reset_mock() + runner.invoke(cmd, [path]) + assert pipelines_api_mock.create.call_args_list[0][0][2] is False + + runner.invoke(cmd, [path, "--allow-duplicate-names"]) + assert pipelines_api_mock.create.call_args_list[1][0][2] is True with open(path, 'w') as f: - f.write(DEPLOY_SPEC) + f.write(PIPELINE_SETTINGS) - runner.invoke(cli.deploy_cli, [path]) - assert pipelines_api_mock.deploy.call_args_list[0][0][2] is False + for cmd in [cli.deploy_cli, cli.edit_cli]: + pipelines_api_mock.reset_mock() + runner.invoke(cmd, [path]) + assert pipelines_api_mock.edit.call_args_list[0][0][2] is False - runner.invoke(cli.deploy_cli, [path, "--allow-duplicate-names"]) - assert pipelines_api_mock.deploy.call_args_list[1][0][2] is True + runner.invoke(cmd, [path, "--allow-duplicate-names"]) + assert pipelines_api_mock.edit.call_args_list[1][0][2] is True @provide_conf def test_create_pipeline_no_update_spec(pipelines_api_mock, tmpdir): - pipelines_api_mock.create = mock.Mock(return_value={"pipeline_id": PIPELINE_ID}) - - path = tmpdir.join('/spec.json').strpath + path = tmpdir.join('/settings.json').strpath with open(path, 'w') as f: - f.write(DEPLOY_SPEC_NO_ID) - + f.write(PIPELINE_SETTINGS_NO_ID) runner = CliRunner() - result = runner.invoke(cli.deploy_cli, [path]) - assert result.exit_code == 0 - assert pipelines_api_mock.create.call_count == 1 + for cmd in [cli.deploy_cli, cli.create_cli]: + pipelines_api_mock.create = mock.Mock(return_value={"pipeline_id": PIPELINE_ID}) + result = runner.invoke(cmd, [path]) - with open(path, 'r') as f: - spec = json.loads(f.read()) - assert 'id' not in spec + assert result.exit_code == 0 + assert pipelines_api_mock.create.call_count == 1 + with open(path, 'r') as f: + spec = json.loads(f.read()) + assert 'id' not in spec @provide_conf -def test_deploy_pipeline_conflicting_ids(pipelines_api_mock, tmpdir): - pipelines_api_mock.deploy = mock.Mock() - - path = tmpdir.join('/spec.json').strpath +def test_create_with_id(pipelines_api_mock, tmpdir): + path = tmpdir.join('/settings.json').strpath with open(path, 'w') as f: - f.write(DEPLOY_SPEC) + f.write(PIPELINE_SETTINGS) - result = CliRunner().invoke(cli.deploy_cli, ['--spec', path, '--pipeline-id', "fake"]) + result = CliRunner().invoke(cli.create_cli, ['--settings', path]) assert result.exit_code == 1 - assert "ValueError: The ID provided in --pipeline_id 'fake' is different from the ID " \ - "provided in the spec '123'." in result.stdout - assert pipelines_api_mock.deploy.call_count == 0 + assert "ValueError: Pipeline settings shouldn't contain \"id\"" in result.stdout + assert pipelines_api_mock.create.call_count == 0 @provide_conf -def test_deploy_with_missing_spec(pipelines_api_mock): - pipelines_api_mock.deploy = mock.Mock() - result = CliRunner().invoke(cli.deploy_cli, []) - assert result.exit_code == 1 - assert "ValueError: The spec should be provided" in result.stdout - assert pipelines_api_mock.deploy.call_count == 0 +def test_pipeline_conflicting_ids(pipelines_api_mock, tmpdir): + path = tmpdir.join('/settings.json').strpath + with open(path, 'w') as f: + f.write(PIPELINE_SETTINGS) + for cmd in [cli.deploy_cli, cli.edit_cli]: + pipelines_api_mock.reset_mock() + result = CliRunner().invoke(cmd, ['--settings', path, '--pipeline-id', "fake"]) + assert result.exit_code == 1 + assert "ValueError: The ID provided in --pipeline_id 'fake' is different from the ID " \ + "provided in the settings '123'." in result.stdout + assert pipelines_api_mock.deploy.call_count == 0 + + +@provide_conf +def test_with_missing_settings(pipelines_api_mock): + for cmd in [cli.deploy_cli, cli.edit_cli, cli.create_cli]: + pipelines_api_mock.reset_mock() + result = CliRunner().invoke(cmd, []) + assert result.exit_code == 1 + assert "ValueError: Settings should be provided" in result.stdout + assert pipelines_api_mock.create.call_count == 0 + assert pipelines_api_mock.edit.call_count == 0