From 1e05005ea95fa64ccb9b49f60bf40145bce929b8 Mon Sep 17 00:00:00 2001 From: Shireen Kheradpey Date: Fri, 2 Jul 2021 13:30:01 -0400 Subject: [PATCH 1/3] [core] Add KlioWriteToText missing fields including file_name_suffix, num_shards, append_trailing_newlines etc --- core/src/klio_core/config/_io.py | 6 +++++ core/tests/config/test_config.py | 44 +++++++++++++++++++++++++------- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/core/src/klio_core/config/_io.py b/core/src/klio_core/config/_io.py index a964b2b4..3962020a 100644 --- a/core/src/klio_core/config/_io.py +++ b/core/src/klio_core/config/_io.py @@ -286,6 +286,12 @@ def as_dict(self): @attr.attrs(frozen=True) class KlioWriteFileConfig(KlioEventOutput, KlioFileConfig): file_path_prefix = attr.attrib(type=str) + file_name_suffix = attr.attrib(type=str, default="") + append_trailing_newlines = attr.attrib(type=bool, default=True) + num_shards = attr.attrib(type=int, default=0) + shard_name_template = attr.attrib(type=str, default=None) + compression_type = attr.attrib(type=str, default="auto") + header = attr.attrib(type=str, default=None) @classmethod def from_dict(cls, config_dict, *args, **kwargs): diff --git a/core/tests/config/test_config.py b/core/tests/config/test_config.py index 6d300185..217e790b 100644 --- a/core/tests/config/test_config.py +++ b/core/tests/config/test_config.py @@ -248,7 +248,9 @@ def no_gcp_config_dict(job_config_dict, empty_pipeline_config_dict): @pytest.mark.parametrize("blocking", (True, False, None)) def test_klio_job_config( - job_config_dict, blocking, final_job_config_dict, + job_config_dict, + blocking, + final_job_config_dict, ): if blocking is None: job_config_dict.pop("blocking") @@ -336,7 +338,8 @@ def test_bare_klio_pipeline_config(bare_pipeline_config_dict): def test_klio_pipeline_config( - pipeline_config_dict, final_pipeline_config_dict, + pipeline_config_dict, + final_pipeline_config_dict, ): config_obj = config.KlioPipelineConfig( @@ -432,17 +435,36 @@ def test_klio_read_file_config(): assert config_dict["location"] == klio_read_file_config.file_pattern -def test_klio_write_file_config(): - config_dict = { - "type": "GCS", - "location": "gs://sigint-output/test-parent-job-out", - } +@pytest.mark.parametrize( + "config_dict", + ( + # The minimum inputs + { + "type": "GCS", + "location": "gs://sigint-output/test-parent-job-out", + }, + # Other fields configured + { + "type": "GCS", + "location": "gs://sigint-output/test-parent-job-out", + "file_name_suffix": ".txt", + "num_shards": 3, + "append_trailing_newlines": True, + }, + ), +) +def test_klio_write_file_config(config_dict): klio_write_file_config = io.KlioWriteFileConfig.from_dict( config_dict, io.KlioIOType.DATA, io.KlioIODirection.OUTPUT ) + config_dict.pop("type") assert "file" == klio_write_file_config.name - assert config_dict["location"] == klio_write_file_config.file_path_prefix + for k, v in config_dict.items(): + if k == "location": + assert v == klio_write_file_config.file_path_prefix + else: + assert v == getattr(klio_write_file_config, k) def test_klio_write_bigquery_config(): @@ -503,7 +525,11 @@ def test_klio_write_bigquery_config_raises(schema): @pytest.mark.parametrize( "include_topic,include_subscription", - ((False, True), (True, False), (True, True),), + ( + (False, True), + (True, False), + (True, True), + ), ) def test_pubsub_event_input_kwargs(include_topic, include_subscription): config_dict = { From c411d253945d9df422c78b288ea4408ac3be811f Mon Sep 17 00:00:00 2001 From: Shireen Kheradpey Date: Fri, 2 Jul 2021 13:30:01 -0400 Subject: [PATCH 2/3] [core] Add KlioWriteToText missing fields including file_name_suffix, num_shards, append_trailing_newlines etc --- core/src/klio_core/config/_io.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/klio_core/config/_io.py b/core/src/klio_core/config/_io.py index 3962020a..302e5286 100644 --- a/core/src/klio_core/config/_io.py +++ b/core/src/klio_core/config/_io.py @@ -18,8 +18,9 @@ import json import logging - import attr +from apache_beam.io.filesystem import CompressionTypes + logger = logging.getLogger("klio") @@ -255,6 +256,7 @@ def from_dict(cls, config_dict, *args, **kwargs): class KlioFileConfig(object): name = "file" + compression_type = attr.attrib(type=str, default=CompressionTypes.AUTO) @attr.attrs(frozen=True) @@ -290,7 +292,6 @@ class KlioWriteFileConfig(KlioEventOutput, KlioFileConfig): append_trailing_newlines = attr.attrib(type=bool, default=True) num_shards = attr.attrib(type=int, default=0) shard_name_template = attr.attrib(type=str, default=None) - compression_type = attr.attrib(type=str, default="auto") header = attr.attrib(type=str, default=None) @classmethod From 0fc3219c38d7e43aba9167f42180d903fb10dd21 Mon Sep 17 00:00:00 2001 From: Shireen Kheradpey Date: Fri, 2 Jul 2021 13:30:01 -0400 Subject: [PATCH 3/3] [core] Add KlioWriteToText missing fields including file_name_suffix, num_shards, append_trailing_newlines etc --- core/setup.py | 1 + core/tests/config/test_config.py | 18 ++++-------------- core/tox.ini | 10 +++++++++- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/core/setup.py b/core/setup.py index 296ea562..5208d919 100644 --- a/core/setup.py +++ b/core/setup.py @@ -151,6 +151,7 @@ def get_long_description(package_dir): } META_FILE = read(META_PATH) INSTALL_REQUIRES = [ + "apache-beam[gcp]", "click", "glom", "google-api-python-client", diff --git a/core/tests/config/test_config.py b/core/tests/config/test_config.py index 217e790b..b254f96b 100644 --- a/core/tests/config/test_config.py +++ b/core/tests/config/test_config.py @@ -248,9 +248,7 @@ def no_gcp_config_dict(job_config_dict, empty_pipeline_config_dict): @pytest.mark.parametrize("blocking", (True, False, None)) def test_klio_job_config( - job_config_dict, - blocking, - final_job_config_dict, + job_config_dict, blocking, final_job_config_dict, ): if blocking is None: job_config_dict.pop("blocking") @@ -338,8 +336,7 @@ def test_bare_klio_pipeline_config(bare_pipeline_config_dict): def test_klio_pipeline_config( - pipeline_config_dict, - final_pipeline_config_dict, + pipeline_config_dict, final_pipeline_config_dict, ): config_obj = config.KlioPipelineConfig( @@ -439,10 +436,7 @@ def test_klio_read_file_config(): "config_dict", ( # The minimum inputs - { - "type": "GCS", - "location": "gs://sigint-output/test-parent-job-out", - }, + {"type": "GCS", "location": "gs://sigint-output/test-parent-job-out"}, # Other fields configured { "type": "GCS", @@ -525,11 +519,7 @@ def test_klio_write_bigquery_config_raises(schema): @pytest.mark.parametrize( "include_topic,include_subscription", - ( - (False, True), - (True, False), - (True, True), - ), + ((False, True), (True, False), (True, True),), ) def test_pubsub_event_input_kwargs(include_topic, include_subscription): config_dict = { diff --git a/core/tox.ini b/core/tox.ini index 99cf367b..5c436f26 100644 --- a/core/tox.ini +++ b/core/tox.ini @@ -1,9 +1,15 @@ [tox] -envlist = py36,py37,py38,docs,manifest,check-formatting,lint +envlist = {py36,py37,py38}-beam{26,27,28},docs,manifest,check-formatting,lint [testenv] install_command = python -m pip install {opts} {packages} extras = tests +deps = + {toxinidir}/../core + beam26: apache-beam[gcp]>=2.26.0,<2.27.0 + beam27: apache-beam[gcp]>=2.27.0,<2.28.0 + beam28: apache-beam[gcp]>=2.28.0,<2.29.0 + commands = coverage run -m pytest {posargs} ; assert that protos are compiled @@ -85,6 +91,8 @@ filterwarnings = ignore:invalid escape sequence:DeprecationWarning ; 3rd party libraries haven't updated their use of collections.abc (py37+) ignore:Using or importing the ABCs from:DeprecationWarning + ; Apache Beam-related warnings + ignore:Running the Apache Beam SDK on Python 3:UserWarning ; required for mapping envs -> github runtimes [gh-actions]