Skip to content

Commit

Permalink
Remove extra retries on 503 for DBFS delete. (#283)
Browse files Browse the repository at this point in the history
* Removed the extra retries on 503 for DBFS delete.

* Refactored DBFS partial delete exception handling and added an additional error message.
  • Loading branch information
bogdanghita-db authored Mar 31, 2020
1 parent ebf4879 commit f92a8c3
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 46 deletions.
21 changes: 4 additions & 17 deletions databricks_cli/dbfs/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import tempfile

import re
import time
import click

from requests.exceptions import HTTPError
Expand All @@ -39,8 +38,6 @@
from databricks_cli.dbfs.exceptions import LocalFileExistsException

BUFFER_SIZE_BYTES = 2**20
DELETE_MAX_CONSECUTIVE_503_RETRIES = 3
DELETE_503_RETRY_DELAY_MILLIS = 30 * 1000


class ParseException(Exception):
Expand Down Expand Up @@ -77,14 +74,12 @@ def __eq__(self, other):
class DbfsErrorCodes(object):
RESOURCE_DOES_NOT_EXIST = 'RESOURCE_DOES_NOT_EXIST'
RESOURCE_ALREADY_EXISTS = 'RESOURCE_ALREADY_EXISTS'
TEMPORARILY_UNAVAILABLE = 'TEMPORARILY_UNAVAILABLE'
PARTIAL_DELETE = 'PARTIAL_DELETE'


class DbfsApi(object):
def __init__(self, api_client, delete_retry_delay_millis=DELETE_503_RETRY_DELAY_MILLIS):
def __init__(self, api_client):
self.client = DbfsService(api_client)
self.delete_retry_delay_millis = delete_retry_delay_millis

def list_files(self, dbfs_path, headers=None):
list_response = self.client.list(dbfs_path.absolute_path, headers=headers)
Expand Down Expand Up @@ -148,7 +143,6 @@ def get_num_files_deleted(partial_delete_error):
return int(m.group(1))

def delete(self, dbfs_path, recursive, headers=None):
num_consecutive_503_retries = 0
num_files_deleted = 0
while True:
try:
Expand All @@ -167,17 +161,10 @@ def delete(self, dbfs_path, recursive, headers=None):
num_files_deleted), nl=False)
except ParseException:
click.echo("\rDelete in progress...\033[K", nl=False)
num_consecutive_503_retries = 0
continue
# Retry at most DELETE_MAX_CONSECUTIVE_503_ERRORS times for other 503 errors
elif num_consecutive_503_retries < DELETE_MAX_CONSECUTIVE_503_RETRIES:
num_consecutive_503_retries += 1
time.sleep(float(self.delete_retry_delay_millis) / 1000)
continue
else:
raise e
else:
raise e
click.echo("\rDeleted at least {} files but interrupted by error.\033[K".format(
num_files_deleted))
raise e
break
click.echo("\rDelete finished successfully.\033[K")

Expand Down
31 changes: 2 additions & 29 deletions tests/dbfs/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ def get_resource_does_not_exist_exception():
return requests.exceptions.HTTPError(response=response)


def get_temporarily_unavailable_exception():
response = requests.Response()
response.status_code = 503
response._content = ('{{"error_code": "{}"}}'.format(api.DbfsErrorCodes.TEMPORARILY_UNAVAILABLE)).encode() # NOQA
return requests.exceptions.HTTPError(response=response)


def get_partial_delete_exception(message="[...] operation has deleted 10 files [...]"):
response = requests.Response()
response.status_code = 503
Expand Down Expand Up @@ -182,33 +175,13 @@ def test_cat(self, dbfs_api):

def test_partial_delete(self, dbfs_api):
e_partial_delete = get_partial_delete_exception()
e_temporarily_unavailable = get_temporarily_unavailable_exception()
# Simulate partial deletes and 503 exceptions followed by a full successful delete
exception_sequence = \
[e_temporarily_unavailable, e_partial_delete, e_partial_delete] + \
[e_temporarily_unavailable] * api.DELETE_MAX_CONSECUTIVE_503_RETRIES + \
[e_partial_delete, None]
# Simulate 3 partial deletes followed by a full successful delete
exception_sequence = [e_partial_delete, e_partial_delete, e_partial_delete, None]
dbfs_api.client.delete = mock.Mock(side_effect=exception_sequence)
dbfs_api.delete_retry_delay_millis = 1
# Should succeed
dbfs_api.delete(DbfsPath('dbfs:/whatever-doesnt-matter'), recursive=True)

def test_partial_delete_service_unavailable(self, dbfs_api):
e_partial_delete = get_partial_delete_exception()
e_temporarily_unavailable = get_temporarily_unavailable_exception()
# Simulate more than api.DELETE_MAX_CONSECUTIVE_503_ERRORS 503 errors that are not partial
# deletes (error_code != PARTIAL_DELETE)
exception_sequence = \
[e_partial_delete] + \
[e_temporarily_unavailable] * (api.DELETE_MAX_CONSECUTIVE_503_RETRIES + 1) + \
[e_partial_delete, None]
dbfs_api.client.delete = mock.Mock(side_effect=exception_sequence)
dbfs_api.delete_retry_delay_millis = 1
with pytest.raises(e_temporarily_unavailable.__class__) as thrown:
dbfs_api.delete(DbfsPath('dbfs:/whatever-doesnt-matter'), recursive=True)
# Should raise the same e_temporarily_unavailable exception instance
assert thrown.value == e_temporarily_unavailable

def test_partial_delete_exception_message_parse_error(self, dbfs_api):
message = "unexpected partial delete exception message"
e_partial_delete = get_partial_delete_exception(message)
Expand Down

0 comments on commit f92a8c3

Please sign in to comment.