Skip to content

Commit a219818

Browse files
authored
Retry on 503 (#1408)
* add default retry on all client factories, which includes 502 and 503 errors * update retries to use defaults and ensure that a timeout or deadline is set
1 parent 2e1a5fd commit a219818

File tree

4 files changed

+31
-22
lines changed

4 files changed

+31
-22
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
kind: Fixes
2+
body: Fix issue where dbt-bigquery was not retrying in certain retryable scenarios,
3+
e.g. 503's
4+
time: 2024-11-20T16:31:01.60689-05:00
5+
custom:
6+
Author: mikealfare
7+
Issue: "682"

dbt/adapters/bigquery/clients.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from google.api_core.client_info import ClientInfo
22
from google.api_core.client_options import ClientOptions
3-
from google.api_core.retry import Retry
43
from google.auth.exceptions import DefaultCredentialsError
5-
from google.cloud.bigquery import Client as BigQueryClient
4+
from google.cloud.bigquery import Client as BigQueryClient, DEFAULT_RETRY as BQ_DEFAULT_RETRY
65
from google.cloud.dataproc_v1 import BatchControllerClient, JobControllerClient
76
from google.cloud.storage import Client as StorageClient
7+
from google.cloud.storage.retry import DEFAULT_RETRY as GCS_DEFAULT_RETRY
88

99
from dbt.adapters.events.logging import AdapterLogger
1010

@@ -28,23 +28,23 @@ def create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient:
2828
return _create_bigquery_client(credentials)
2929

3030

31-
@Retry() # google decorator. retries on transient errors with exponential backoff
31+
@GCS_DEFAULT_RETRY
3232
def create_gcs_client(credentials: BigQueryCredentials) -> StorageClient:
3333
return StorageClient(
3434
project=credentials.execution_project,
3535
credentials=create_google_credentials(credentials),
3636
)
3737

3838

39-
@Retry() # google decorator. retries on transient errors with exponential backoff
39+
# dataproc does not appear to have a default retry like BQ and GCS
4040
def create_dataproc_job_controller_client(credentials: BigQueryCredentials) -> JobControllerClient:
4141
return JobControllerClient(
4242
credentials=create_google_credentials(credentials),
4343
client_options=ClientOptions(api_endpoint=_dataproc_endpoint(credentials)),
4444
)
4545

4646

47-
@Retry() # google decorator. retries on transient errors with exponential backoff
47+
# dataproc does not appear to have a default retry like BQ and GCS
4848
def create_dataproc_batch_controller_client(
4949
credentials: BigQueryCredentials,
5050
) -> BatchControllerClient:
@@ -54,7 +54,7 @@ def create_dataproc_batch_controller_client(
5454
)
5555

5656

57-
@Retry() # google decorator. retries on transient errors with exponential backoff
57+
@BQ_DEFAULT_RETRY
5858
def _create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient:
5959
return BigQueryClient(
6060
credentials.execution_project,

dbt/adapters/bigquery/retry.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from google.api_core.future.polling import DEFAULT_POLLING
44
from google.api_core.retry import Retry
5-
from google.cloud.bigquery.retry import DEFAULT_RETRY, _job_should_retry
5+
from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY, _job_should_retry
66
from requests.exceptions import ConnectionError
77

88
from dbt.adapters.contracts.connection import Connection, ConnectionState
@@ -15,14 +15,8 @@
1515

1616
_logger = AdapterLogger("BigQuery")
1717

18-
19-
_SECOND = 1.0
20-
_MINUTE = 60 * _SECOND
21-
_HOUR = 60 * _MINUTE
22-
_DAY = 24 * _HOUR
23-
_DEFAULT_INITIAL_DELAY = _SECOND
24-
_DEFAULT_MAXIMUM_DELAY = 3 * _SECOND
25-
_DEFAULT_POLLING_MAXIMUM_DELAY = 10 * _SECOND
18+
_MINUTE = 60.0
19+
_DAY = 24 * 60 * 60.0
2620

2721

2822
class RetryFactory:
@@ -44,7 +38,7 @@ def create_job_execution_timeout(self, fallback: float = _DAY) -> float:
4438
) # keep _DAY here so it's not overridden by passing fallback=None
4539

4640
def create_retry(self, fallback: Optional[float] = None) -> Retry:
47-
return DEFAULT_RETRY.with_timeout(self._job_execution_timeout or fallback or _DAY)
41+
return DEFAULT_JOB_RETRY.with_timeout(self._job_execution_timeout or fallback or _DAY)
4842

4943
def create_polling(self, model_timeout: Optional[float] = None) -> Retry:
5044
return DEFAULT_POLLING.with_timeout(model_timeout or self._job_execution_timeout or _DAY)
@@ -53,14 +47,21 @@ def create_reopen_with_deadline(self, connection: Connection) -> Retry:
5347
"""
5448
This strategy mimics what was accomplished with _retry_and_handle
5549
"""
56-
return Retry(
57-
predicate=_DeferredException(self._retries),
58-
initial=_DEFAULT_INITIAL_DELAY,
59-
maximum=_DEFAULT_MAXIMUM_DELAY,
60-
deadline=self._job_deadline,
61-
on_error=_create_reopen_on_error(connection),
50+
51+
retry = DEFAULT_JOB_RETRY.with_delay(maximum=3.0).with_predicate(
52+
_DeferredException(self._retries)
6253
)
6354

55+
# there is no `with_on_error` method, but we want to retain the defaults on `DEFAULT_JOB_RETRY
56+
retry._on_error = _create_reopen_on_error(connection)
57+
58+
# don't override the default deadline to None if the user did not provide one,
59+
# the process will never end
60+
if deadline := self._job_deadline:
61+
return retry.with_deadline(deadline)
62+
63+
return retry
64+
6465

6566
class _DeferredException:
6667
"""

hatch.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ packages = ["dbt"]
88
packages = ["dbt"]
99

1010
[envs.default]
11+
python = "3.9"
1112
dependencies = [
1213
"dbt-adapters @ git+https://github.com/dbt-labs/dbt-adapters.git",
1314
"dbt-common @ git+https://github.com/dbt-labs/dbt-common.git",

0 commit comments

Comments
 (0)