Skip to content

Commit 2cbdc99

Browse files
Ignore duplicate handler errors when lazy loading (#2316)
If the user registers a custom structured dataset encoder/decoder before the lazy import is run for the first time, the default transformers will fail because they don't run with override. flytekit should swallow those errors. Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
1 parent 66a6018 commit 2cbdc99

File tree

3 files changed

+51
-4
lines changed

3 files changed

+51
-4
lines changed

flytekit/core/type_engine.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,6 +1044,7 @@ def lazy_import_transformers(cls):
10441044
register_bigquery_handlers,
10451045
register_pandas_handlers,
10461046
)
1047+
from flytekit.types.structured.structured_dataset import DuplicateHandlerError
10471048

10481049
if is_imported("tensorflow"):
10491050
from flytekit.extras import tensorflow # noqa: F401
@@ -1056,11 +1057,20 @@ def lazy_import_transformers(cls):
10561057
from flytekit.types.schema.types_pandas import PandasSchemaReader, PandasSchemaWriter # noqa: F401
10571058
except ValueError:
10581059
logger.debug("Transformer for pandas is already registered.")
1059-
register_pandas_handlers()
1060+
try:
1061+
register_pandas_handlers()
1062+
except DuplicateHandlerError:
1063+
logger.debug("Transformer for pandas is already registered.")
10601064
if is_imported("pyarrow"):
1061-
register_arrow_handlers()
1065+
try:
1066+
register_arrow_handlers()
1067+
except DuplicateHandlerError:
1068+
logger.debug("Transformer for arrow is already registered.")
10621069
if is_imported("google.cloud.bigquery"):
1063-
register_bigquery_handlers()
1070+
try:
1071+
register_bigquery_handlers()
1072+
except DuplicateHandlerError:
1073+
logger.debug("Transformer for bigquery is already registered.")
10641074
if is_imported("numpy"):
10651075
from flytekit.types import numpy # noqa: F401
10661076
if is_imported("PIL"):

flytekit/types/structured/structured_dataset.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ def __init__(self, python_type: Type[T], protocol: Optional[str] = None, support
177177
is capable of handling.
178178
:param supported_format: Arbitrary string representing the format. If not supplied then an empty string
179179
will be used. An empty string implies that the encoder works with any format. If the format being asked
180-
for does not exist, the transformer enginer will look for the "" encoder instead and write a warning.
180+
for does not exist, the transformer engine will look for the "" encoder instead and write a warning.
181181
"""
182182
self._python_type = python_type
183183
self._protocol = protocol.replace("://", "") if protocol else None

tests/flytekit/unit/types/structured_dataset/test_structured_dataset.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import tempfile
33
import typing
44

5+
import google.cloud.bigquery
56
import pyarrow as pa
67
import pytest
78
from fsspec.utils import get_protocol
@@ -15,6 +16,7 @@
1516
from flytekit.core.task import task
1617
from flytekit.core.type_engine import TypeEngine
1718
from flytekit.core.workflow import workflow
19+
from flytekit.lazy_import.lazy_module import is_imported
1820
from flytekit.models import literals
1921
from flytekit.models.literals import StructuredDatasetMetadata
2022
from flytekit.models.types import SchemaType, SimpleType, StructuredDatasetType
@@ -508,3 +510,38 @@ def test_list_of_annotated():
508510
@task
509511
def no_op(data: WineDataset) -> typing.List[WineDataset]:
510512
return [data]
513+
514+
515+
class PrivatePandasToBQEncodingHandlers(StructuredDatasetEncoder):
516+
def __init__(self):
517+
super().__init__(pd.DataFrame, "bq", supported_format="")
518+
519+
def encode(
520+
self,
521+
ctx: FlyteContext,
522+
structured_dataset: StructuredDataset,
523+
structured_dataset_type: StructuredDatasetType,
524+
) -> literals.StructuredDataset:
525+
return literals.StructuredDataset(
526+
uri=typing.cast(str, structured_dataset.uri), metadata=StructuredDatasetMetadata(structured_dataset_type)
527+
)
528+
529+
530+
def test_reregister_encoder():
531+
# Test that lazy import can run after a user has already registered a custom handler.
532+
# The default handlers don't have override=True (and should not) but the call should not fail.
533+
dir(google.cloud.bigquery)
534+
assert is_imported("google.cloud.bigquery")
535+
536+
StructuredDatasetTransformerEngine.register(
537+
PrivatePandasToBQEncodingHandlers(), default_format_for_type=False, override=True
538+
)
539+
TypeEngine.lazy_import_transformers()
540+
541+
sd = StructuredDataset(dataframe=pd.DataFrame({"a": [1, 2], "b": [3, 4]}), uri="bq://blah", file_format="bq")
542+
543+
ctx = FlyteContextManager.current_context()
544+
545+
df_literal_type = TypeEngine.to_literal_type(pd.DataFrame)
546+
547+
TypeEngine.to_literal(ctx, sd, python_type=pd.DataFrame, expected=df_literal_type)

0 commit comments

Comments
 (0)