Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ steps:
entrypoint: python
args: ['-m', 'flake8', '.']

timeout: 1200s
timeout: 1600s
images:
- 'gcr.io/${PROJECT_ID}/${_BUILD_IMG_NAME}'
options:
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG AIRFLOW_IMAGE_NAME="2.1.1-python3.8"
ARG AIRFLOW_IMAGE_NAME="2.2.5-python3.8"

FROM "apache/airflow:${AIRFLOW_IMAGE_NAME}"

Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile-ci
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# TODO: use multi-stage builds. This file is exactly the same as Dockerfile except the last line
ARG AIRFLOW_IMAGE_NAME="2.1.1-python3.8"
ARG AIRFLOW_IMAGE_NAME="2.2.5-python3.8"

FROM "apache/airflow:${AIRFLOW_IMAGE_NAME}"

Expand Down
4 changes: 2 additions & 2 deletions gcp_airflow_foundations/common/gcp/source_schema/bq.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook


def read_schema_from_bq(
Expand All @@ -16,7 +16,7 @@ def read_schema_from_bq(
Helper method to load table schema from the staging table
"""

bq_hook = BigQueryHook(bigquery_conn_id=bigquery_conn_id, delegate_to=None)
bq_hook = BigQueryHook(gcp_conn_id=bigquery_conn_id, delegate_to=None)

schema = bq_hook.get_schema(dataset_id=dataset_id, table_id=table_id, project_id=project_id)

Expand Down
8 changes: 4 additions & 4 deletions gcp_airflow_foundations/common/gcp/source_schema/gcs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import json
import logging

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook

from urllib.parse import urlparse

Expand All @@ -21,8 +21,8 @@ def read_schema_from_gcs(
gcs_bucket = parsed_url.netloc
gcs_object = parsed_url.path.lstrip("/")

gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=google_cloud_storage_conn_id, delegate_to=None
gcs_hook = GCSHook(
gcp_conn_id=google_cloud_storage_conn_id, delegate_to=None
)

schema_fields = json.loads(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import json
import logging

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook


from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
# from airflow.utils.decorators import apply_defaults

from gcp_airflow_foundations.operators.api.hooks.twilio_hook import TwilioHook
from urllib.parse import urlparse
Expand All @@ -31,7 +31,7 @@ class TwilioToBigQueryOperator(BaseOperator):
template_fields = ("dataset_id", "table_id", "project_id", "labels")

# pylint: disable=too-many-arguments
@apply_defaults
# @apply_defaults
def __init__(
self,
twilio_account_sid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from gcp_airflow_foundations.enums.facebook import AccountLookupScope, ApiObject

from airflow.models import BaseOperator, Variable
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook

from google.cloud import bigquery

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from airflow.models import BaseOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryDeleteTableOperator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from datetime import datetime

from airflow.models import BaseOperator, BaseOperatorLink
from airflow.contrib.operators.bigquery_operator import (
BigQueryOperator,
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator,
BigQueryCreateEmptyTableOperator,
)

from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
# from airflow.utils.decorators import apply_defaults
# from airflow.contrib.hooks.bigquery_hook import BigQueryHook

from airflow.exceptions import AirflowException

Expand All @@ -22,7 +22,7 @@
from gcp_airflow_foundations.enums.ingestion_type import IngestionType


class MergeBigQueryHDS(BigQueryOperator):
class MergeBigQueryHDS(BigQueryInsertJobOperator):
"""
Merges data into a BigQuery HDS table.

Expand Down Expand Up @@ -56,10 +56,11 @@ class MergeBigQueryHDS(BigQueryOperator):

template_fields = ("stg_table_name", "data_table_name", "stg_dataset_name")

@apply_defaults
# @apply_defaults
def __init__(
self,
*,
task_id: str,
project_id: str,
stg_table_name: str,
data_table_name: str,
Expand Down Expand Up @@ -89,6 +90,7 @@ def __init__(
sql="",
**kwargs,
)
self.task_id = task_id,
self.project_id = project_id
self.stg_table_name = stg_table_name
self.data_table_name = data_table_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from datetime import datetime

from airflow.models import BaseOperator, BaseOperatorLink
from airflow.contrib.operators.bigquery_operator import (
BigQueryOperator,
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator,
BigQueryCreateEmptyTableOperator,
)

from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
# from airflow.utils.decorators import apply_defaults
# from airflow.contrib.hooks.bigquery_hook import BigQueryHook

from airflow.exceptions import AirflowException

Expand All @@ -21,7 +21,7 @@
from gcp_airflow_foundations.enums.ingestion_type import IngestionType


class MergeBigQueryODS(BigQueryOperator):
class MergeBigQueryODS(BigQueryInsertJobOperator):
"""
Merges data into a BigQuery ODS table.

Expand Down Expand Up @@ -55,10 +55,11 @@ class MergeBigQueryODS(BigQueryOperator):

template_fields = ("stg_table_name", "data_table_name", "stg_dataset_name")

@apply_defaults
# @apply_defaults
def __init__(
self,
*,
task_id: Optional[str] = None,
project_id: str,
stg_table_name: str,
data_table_name: str,
Expand All @@ -78,6 +79,7 @@ def __init__(
**kwargs,
) -> None:
super(MergeBigQueryODS, self).__init__(
task_id=task_id,
delegate_to=delegate_to,
gcp_conn_id=gcp_conn_id,
use_legacy_sql=False,
Expand All @@ -87,6 +89,7 @@ def __init__(
sql="",
**kwargs,
)

self.project_id = project_id
self.stg_table_name = stg_table_name
self.data_table_name = data_table_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
BigQueryCreateEmptyTableOperator,
)

from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
# from airflow.utils.decorators import apply_defaults
# from airflow.contrib.hooks.bigquery_hook import BigQueryHook

from airflow.exceptions import AirflowException

Expand All @@ -23,7 +23,7 @@


class ParseSchema(BaseOperator):
@apply_defaults
# @apply_defaults
def __init__(
self,
*,
Expand Down
133 changes: 71 additions & 62 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,66 +1,75 @@
dacite==1.5.1
pydantic==1.8.2
pyyaml==5.4.1
wheel==0.36.2
black==19.10b0
configparser>=3.5.0
pyclean>=2.0.0
flake8>=3.8.0
pre-commit>=1.18.3
croniter>=1.0.15
facebook-business>=6.0.2
google-cloud-bigquery==2.28.1
pyarrow>=4.0.1
facebook-business>=6.0.2

apache-airflow-providers-salesforce==3.3.0
apache-airflow-providers-apache-beam==3.1.0
apache-airflow-providers-sftp==2.0.0
apache-airflow-providers-ssh==2.1.0

paramiko>=2.6.0
pysftp>=0.2.9
sshtunnel>=0.1.4,<0.2

apache-airflow-providers-facebook==2.1.0
apache-airflow-providers-google==6.3.0
pyopenssl==20.0.1
google-ads==14.0.0
google-api-core[grpc,grpcgcp]==1.31.5
google-api-python-client==1.12.8
google-auth-httplib2==0.1.0
google-auth==1.35.0
google-cloud-automl==2.4.2
google-cloud-bigtable==1.7.0
google-cloud-bigquery==2.28.1
google-cloud-build==3.0.0
google-cloud-container==1.0.1
google-cloud-datacatalog==3.4.1
google-cloud-dataproc==3.1.0
google-cloud-dlp==1.0.0
google-cloud-kms==2.6.0
google-cloud-language==1.3.0
google-cloud-logging==2.6.0
google-cloud-memcache==1.0.0
google-cloud-monitoring==2.5.0
google-cloud-os-login==2.3.1
google-cloud-pubsub==2.8.0
google-cloud-redis==2.2.2
google-cloud-secret-manager==1.0.0
google-cloud-spanner==1.19.1
google-cloud-speech==1.3.2
google-cloud-storage==1.42.2
google-cloud-tasks==2.5.1
google-cloud-texttospeech==1.0.1
google-cloud-translate==1.7.0
google-cloud-videointelligence==1.16.1
google-cloud-vision==1.0.0
google-cloud-workflows==1.2.1
grpcio-gcp==0.2.2
SQLAlchemy==1.3.23
aiohttp==3.8.1
aiosignal==1.2.0
apache-airflow-providers-apache-beam==4.0.0
apache-airflow-providers-common-sql==1.0.0
apache-airflow-providers-facebook==3.0.1
apache-airflow-providers-google==8.2.0
apache-airflow-providers-salesforce==5.0.0
apache-beam==2.40.0
appdirs==1.4.4
async-timeout==4.0.2
backoff==1.8.0
black==19.10b0
cached-property==1.5.2
cfgv==3.3.1
cloudpickle==2.1.0
configparser==5.2.0
crcmod==1.7
curlify==2.2.1
dacite==1.5.1
docopt==0.6.2
et-xmlfile==1.1.0
facebook-business==14.0.0
fastavro==1.5.4
frozenlist==1.3.1
gcp-airflow-foundations
google-ads==17.0.0
google-api-core==2.8.2
google-cloud-bigtable==1.7.2
google-cloud-container==2.11.0
google-cloud-core==2.3.2
google-cloud-dlp==1.0.2
google-cloud-language==1.3.2
google-cloud-secret-manager==1.0.2
google-cloud-spanner==1.19.3
google-cloud-speech==1.3.4
google-cloud-texttospeech==1.0.3
google-cloud-translate==1.7.2
google-cloud-videointelligence==1.16.3
google-cloud-vision==1.0.2
googleapis-common-protos==1.56.4
grpc-google-iam-v1==0.12.4
hdfs==2.7.0
httpcore==0.13.7
httpx==0.19.0
json-merge-patch==0.2
pandas-gbq==0.14.1
pandas==1.3.5
identify==2.5.2
multidict==6.0.2
nodeenv==1.7.0
numpy==1.21.0
oauth2client==4.1.3
openpyxl==3.0.9
pyparsing>=2.4.2
numpy==1.21
orjson==3.7.11
pandas-gbq==0.14.1
pathspec==0.9.0
pre-commit==2.20.0
proto-plus==1.19.6
protobuf==3.20.0
pyclean==2.2.0
pycountry==22.3.5
pydantic==1.8.2
pydot==1.4.2
pymongo==3.12.3
pyopenssl==20.0.1
regex==2022.7.25
requests-file==1.5.1
requests-toolbelt==0.9.1
simple-salesforce==1.12.1
toml==0.10.2
twitter-ads==10.0.0
typed-ast==1.5.4
wheel==0.36.2
yarl==1.8.1
zeep==4.1.0
4 changes: 2 additions & 2 deletions tests/integration/hds/test_hds_upsert_scd2.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def doCleanups(self):
cleanup_xcom()
clear_db_dags()

BigQueryHook().run_copy(
BigQueryHook().insert_job(
source_project_dataset_tables="airflow-framework.test_tables.ga_sessions_HDS",
destination_project_dataset_table=f"{PROJECT_ID}.{DATASET}.{self.table_id}",
write_disposition="WRITE_TRUNCATE",
Expand Down Expand Up @@ -155,7 +155,7 @@ def doCleanups(self):
cleanup_xcom()
clear_db_dags()

BigQueryHook().run_copy(
BigQueryHook().insert_job(
source_project_dataset_tables="airflow-framework.test_tables.ga_sessions_HDS",
destination_project_dataset_table=f"{PROJECT_ID}.{DATASET}.{self.table_id}",
write_disposition="WRITE_TRUNCATE",
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/hds/test_hds_upsert_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def doCleanups(self):
cleanup_xcom()
clear_db_dags()

BigQueryHook().run_query(
BigQueryHook().insert_job(
sql="""SELECT * EXCEPT(af_metadata_expired_at), TIMESTAMP_TRUNC('2017-07-31T00:00:00+00:00', DAY) AS partition_time FROM `airflow-framework.test_tables.ga_sessions_HDS`""",
use_legacy_sql=False,
destination_dataset_table=f"{DATASET}.{self.table_id}",
Expand Down
Loading