From e5c2f412fe4c0cf3967c4aecf1dc5dfe95cddffd Mon Sep 17 00:00:00 2001 From: Dennis Keck <26092524+fellhorn@users.noreply.github.com> Date: Tue, 17 Dec 2024 17:59:44 +0100 Subject: [PATCH 01/13] Faster GitIgnore directory check (#3007) * Faster GitIgnore directory check Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> * Remove code duplication Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> --------- Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> --- flytekit/tools/ignore.py | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/flytekit/tools/ignore.py b/flytekit/tools/ignore.py index e2aefef596..b07ce6aa2c 100644 --- a/flytekit/tools/ignore.py +++ b/flytekit/tools/ignore.py @@ -5,7 +5,7 @@ from fnmatch import fnmatch from pathlib import Path from shutil import which -from typing import Dict, List, Optional, Type +from typing import List, Optional, Type from docker.utils.build import PatternMatcher @@ -41,28 +41,37 @@ class GitIgnore(Ignore): def __init__(self, root: Path): super().__init__(root) self.has_git = which("git") is not None - self.ignored = self._list_ignored() + self.ignored_files = self._list_ignored_files() + self.ignored_dirs = self._list_ignored_dirs() - def _list_ignored(self) -> Dict: + def _git_wrapper(self, extra_args: List[str]) -> set[str]: if self.has_git: - out = subprocess.run(["git", "ls-files", "-io", "--exclude-standard"], cwd=self.root, capture_output=True) + out = subprocess.run( + ["git", "ls-files", "-io", "--exclude-standard", *extra_args], + cwd=self.root, + capture_output=True, + ) if out.returncode == 0: - return dict.fromkeys(out.stdout.decode("utf-8").split("\n")[:-1]) - logger.info(f"Could not determine ignored files due to:\n{out.stderr}\nNot applying any filters") - return {} + return set(out.stdout.decode("utf-8").split("\n")[:-1]) + logger.info(f"Could not determine ignored paths due to:\n{out.stderr}\nNot applying any filters") + return set() logger.info("No git executable found, not applying any filters") - return {} + return set() + + def _list_ignored_files(self) -> set[str]: + return self._git_wrapper([]) + + def _list_ignored_dirs(self) -> set[str]: + return self._git_wrapper(["--directory"]) def _is_ignored(self, path: str) -> bool: - if self.ignored: + if self.ignored_files: # git-ls-files uses POSIX paths - if Path(path).as_posix() in self.ignored: + if Path(path).as_posix() in self.ignored_files: return True # Ignore empty directories - if os.path.isdir(os.path.join(self.root, path)) and all( - [self.is_ignored(os.path.join(path, f)) for f in os.listdir(os.path.join(self.root, path))] - ): - return True + if os.path.isdir(os.path.join(self.root, path)) and self.ignored_dirs: + return Path(path).as_posix() + "/" in self.ignored_dirs return False From 34af2e2e9a00180ba593e656660e702a35ef2c20 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Tue, 17 Dec 2024 13:41:27 -0500 Subject: [PATCH 02/13] Use the same source of time in flaky test (#3009) * Use the same source of time in flaky test Signed-off-by: Eduardo Apolinario * Use proto Timestamp to get current time Signed-off-by: Eduardo Apolinario --------- Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario --- tests/flytekit/unit/bin/test_python_entrypoint.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/flytekit/unit/bin/test_python_entrypoint.py b/tests/flytekit/unit/bin/test_python_entrypoint.py index bbb33ec962..323770bed7 100644 --- a/tests/flytekit/unit/bin/test_python_entrypoint.py +++ b/tests/flytekit/unit/bin/test_python_entrypoint.py @@ -526,17 +526,20 @@ def test_get_container_error_timestamp(monkeypatch) -> None: assert get_container_error_timestamp(FlyteException("foo", timestamp=10.5)) == Timestamp(seconds=10, nanos=500000000) - current_dtime = datetime.now() + current_timestamp = Timestamp() + current_timestamp.GetCurrentTime() error_timestamp = get_container_error_timestamp(RuntimeError("foo")) - assert error_timestamp.ToDatetime() >= current_dtime + assert error_timestamp.ToDatetime() >= current_timestamp.ToDatetime() - current_dtime = datetime.now() + current_timestamp = Timestamp() + current_timestamp.GetCurrentTime() error_timestamp = get_container_error_timestamp(FlyteException("foo")) - assert error_timestamp.ToDatetime() >= current_dtime + assert error_timestamp.ToDatetime() >= current_timestamp.ToDatetime() - current_dtime = datetime.now() + current_timestamp = Timestamp() + current_timestamp.GetCurrentTime() error_timestamp = get_container_error_timestamp(None) - assert error_timestamp.ToDatetime() >= current_dtime + assert error_timestamp.ToDatetime() >= current_timestamp.ToDatetime() def get_flyte_context(tmp_path_factory, outputs_path): From 3732b762ea3d02b0e7b181a544ff40019a3b82dd Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Tue, 17 Dec 2024 17:36:59 -0500 Subject: [PATCH 03/13] Use correct name in flyteagent test (#3011) Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario --- .github/workflows/pythonpublish.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pythonpublish.yml b/.github/workflows/pythonpublish.yml index e5a8118923..4aceee472e 100644 --- a/.github/workflows/pythonpublish.yml +++ b/.github/workflows/pythonpublish.yml @@ -224,7 +224,7 @@ jobs: cache-to: type=gha,mode=max - name: Confirm Agent can start run: | - docker run --rm ghcr.io/${{ github.repository_owner }}/flyteagent:${{ github.sha }} pyflyte serve agent --port 8000 --timeout 1 + docker run --rm ghcr.io/${{ github.repository_owner }}/flyteagent-slim:${{ github.sha }} pyflyte serve agent --port 8000 --timeout 1 - name: Push flyteagent-all Image to GitHub Registry uses: docker/build-push-action@v2 with: From b92e91105f2eb4fdad0dcbc3eafe491850bfb74c Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 18 Dec 2024 10:49:46 -0800 Subject: [PATCH 04/13] vscode decorator for the dynamic task (#2994) Signed-off-by: Kevin Su Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario --- flytekit/core/context_manager.py | 4 +- flytekit/core/tracker.py | 1 - flytekit/core/workflow.py | 15 +++++-- .../tests/test_flyteinteractive_vscode.py | 39 ++++++++++++++++++- 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index d804cbddc8..29d86aa8fc 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -735,7 +735,9 @@ def new_compilation_state(self, prefix: str = "") -> CompilationState: Creates and returns a default compilation state. For most of the code this should be the entrypoint of compilation, otherwise the code should always uses - with_compilation_state """ - return CompilationState(prefix=prefix) + from flytekit.core.python_auto_container import default_task_resolver + + return CompilationState(prefix=prefix, task_resolver=default_task_resolver) def new_execution_state(self, working_dir: Optional[os.PathLike] = None) -> ExecutionState: """ diff --git a/flytekit/core/tracker.py b/flytekit/core/tracker.py index da8be53de6..8ead705cd4 100644 --- a/flytekit/core/tracker.py +++ b/flytekit/core/tracker.py @@ -344,7 +344,6 @@ def extract_task_module(f: Union[Callable, TrackedInstance]) -> Tuple[str, str, :param f: A task or any other callable :return: [name to use: str, module_name: str, function_name: str, full_path: str] """ - if isinstance(f, TrackedInstance): if hasattr(f, "task_function"): mod, mod_name, name = _task_module_from_callable(f.task_function) diff --git a/flytekit/core/workflow.py b/flytekit/core/workflow.py index bb48cde73b..4e6535b492 100644 --- a/flytekit/core/workflow.py +++ b/flytekit/core/workflow.py @@ -696,9 +696,13 @@ def task_name(self, t: PythonAutoContainerTask) -> str: # type: ignore return f"{self.name}.{t.__module__}.{t.name}" def _validate_add_on_failure_handler(self, ctx: FlyteContext, prefix: str, wf_args: Dict[str, Promise]): - # Compare + resolver = ( + ctx.compilation_state.task_resolver + if ctx.compilation_state and ctx.compilation_state.task_resolver + else self + ) with FlyteContextManager.with_context( - ctx.with_compilation_state(CompilationState(prefix=prefix, task_resolver=self)) + ctx.with_compilation_state(CompilationState(prefix=prefix, task_resolver=resolver)) ) as inner_comp_ctx: # Now lets compile the failure-node if it exists if self.on_failure: @@ -736,9 +740,14 @@ def compile(self, **kwargs): ctx = FlyteContextManager.current_context() all_nodes = [] prefix = ctx.compilation_state.prefix if ctx.compilation_state is not None else "" + resolver = ( + ctx.compilation_state.task_resolver + if ctx.compilation_state and ctx.compilation_state.task_resolver + else self + ) with FlyteContextManager.with_context( - ctx.with_compilation_state(CompilationState(prefix=prefix, task_resolver=self)) + ctx.with_compilation_state(CompilationState(prefix=prefix, task_resolver=resolver)) ) as comp_ctx: # Construct the default input promise bindings, but then override with the provided inputs, if any input_kwargs = construct_input_promises([k for k in self.interface.inputs.keys()]) diff --git a/plugins/flytekit-flyteinteractive/tests/test_flyteinteractive_vscode.py b/plugins/flytekit-flyteinteractive/tests/test_flyteinteractive_vscode.py index 96fa9261c2..7996a0da41 100644 --- a/plugins/flytekit-flyteinteractive/tests/test_flyteinteractive_vscode.py +++ b/plugins/flytekit-flyteinteractive/tests/test_flyteinteractive_vscode.py @@ -2,6 +2,9 @@ import mock import pytest + +from flytekit.core import context_manager +from flytekit.core.python_auto_container import default_task_resolver from flytekitplugins.flyteinteractive import ( CODE_TOGETHER_CONFIG, CODE_TOGETHER_EXTENSION, @@ -24,9 +27,9 @@ is_extension_installed, ) -from flytekit import task, workflow +from flytekit import task, workflow, dynamic from flytekit.configuration import Image, ImageConfig, SerializationSettings -from flytekit.core.context_manager import ExecutionState +from flytekit.core.context_manager import ExecutionState, FlyteContextManager from flytekit.tools.translator import get_serializable_task @@ -402,3 +405,35 @@ def test_get_installed_extensions_failed(mock_run): expected_extensions = [] assert installed_extensions == expected_extensions + + +def test_vscode_with_dynamic(vscode_patches): + ( + mock_process, + mock_prepare_interactive_python, + mock_exit_handler, + mock_download_vscode, + mock_signal, + mock_prepare_resume_task_python, + mock_prepare_launch_json, + ) = vscode_patches + + mock_exit_handler.return_value = None + + @task() + def train(): + print("forward") + print("backward") + + @dynamic() + @vscode + def d1(): + print("dynamic", flush=True) + train() + + ctx = FlyteContextManager.current_context() + with context_manager.FlyteContextManager.with_context( + ctx.with_execution_state(ctx.execution_state.with_params(mode=ExecutionState.Mode.TASK_EXECUTION)) + ): + d1() + assert d1.task_resolver == default_task_resolver From 23beed9d9e484eaa88c24a9d01a6638a3e7525b5 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 18 Dec 2024 12:50:27 -0800 Subject: [PATCH 05/13] fix: Improve error details and logging config handling (#3012) Signed-off-by: Kevin Su --- flytekit/extend/backend/agent_service.py | 3 ++- flytekit/loggers.py | 2 +- tests/flytekit/unit/cli/pyflyte/test_run.py | 8 +++----- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index 5919ab0611..ee97b9ddfb 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -25,6 +25,7 @@ from prometheus_client import Counter, Summary from flytekit import logger +from flytekit.bin.entrypoint import get_traceback_str from flytekit.exceptions.system import FlyteAgentNotFound from flytekit.extend.backend.base_agent import AgentRegistry, SyncAgentBase, mirror_async_methods from flytekit.models.literals import LiteralMap @@ -63,7 +64,7 @@ def _handle_exception(e: Exception, context: grpc.ServicerContext, task_type: st context.set_details(error_message) request_failure_count.labels(task_type=task_type, operation=operation, error_code=HTTPStatus.NOT_FOUND).inc() else: - error_message = f"failed to {operation} {task_type} task with error: {e}." + error_message = f"failed to {operation} {task_type} task with error:\n {get_traceback_str(e)}." logger.error(error_message) context.set_code(grpc.StatusCode.INTERNAL) context.set_details(error_message) diff --git a/flytekit/loggers.py b/flytekit/loggers.py index 8c6e0de196..0224d177b4 100644 --- a/flytekit/loggers.py +++ b/flytekit/loggers.py @@ -177,7 +177,7 @@ def get_level_from_cli_verbosity(verbosity: int) -> int: :return: logging level """ if verbosity == 0: - return logging.CRITICAL + return _get_env_logging_level(default_level=logging.CRITICAL) elif verbosity == 1: return logging.WARNING elif verbosity == 2: diff --git a/tests/flytekit/unit/cli/pyflyte/test_run.py b/tests/flytekit/unit/cli/pyflyte/test_run.py index 70be22527e..848dbbf6e1 100644 --- a/tests/flytekit/unit/cli/pyflyte/test_run.py +++ b/tests/flytekit/unit/cli/pyflyte/test_run.py @@ -477,10 +477,7 @@ def test_nested_workflow(working_dir, wf_path, monkeypatch: pytest.MonkeyPatch): ], catch_exceptions=False, ) - assert ( - result.stdout.strip() - == "Running Execution on local.\nRunning Execution on local." - ) + assert ("Running Execution on local." in result.stdout.strip()) assert result.exit_code == 0 @@ -853,7 +850,8 @@ def test_list_default_arguments(task_path): catch_exceptions=False, ) assert result.exit_code == 0 - assert result.stdout == "Running Execution on local.\n0 Hello Color.RED\n\n" + assert "Running Execution on local." in result.stdout + assert "Hello Color.RED" in result.stdout def test_entity_non_found_in_file(): From e9a7da19fe50e6588ec3a66a44a8724a9e96e8cd Mon Sep 17 00:00:00 2001 From: "Thomas J. Fan" Date: Thu, 19 Dec 2024 14:13:43 -0500 Subject: [PATCH 06/13] Add python interpreter into vscode settings (#3014) Signed-off-by: Thomas J. Fan --- flytekit/interactive/vscode_lib/decorator.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flytekit/interactive/vscode_lib/decorator.py b/flytekit/interactive/vscode_lib/decorator.py index 2ed3406cb4..055bda6639 100644 --- a/flytekit/interactive/vscode_lib/decorator.py +++ b/flytekit/interactive/vscode_lib/decorator.py @@ -302,7 +302,7 @@ def prepare_resume_task_python(pid: int): def prepare_launch_json(): """ - Generate the launch.json for users to easily launch interactive debugging and task resumption. + Generate the launch.json and settings.json for users to easily launch interactive debugging and task resumption. """ task_function_source_dir = os.path.dirname( @@ -337,6 +337,10 @@ def prepare_launch_json(): with open(os.path.join(vscode_directory, "launch.json"), "w") as file: json.dump(launch_json, file, indent=4) + settings_json = {"python.defaultInterpreterPath": sys.executable} + with open(os.path.join(vscode_directory, "settings.json"), "w") as file: + json.dump(settings_json, file, indent=4) + VSCODE_TYPE_VALUE = "vscode" From bc0e8c09758930da072fb228411b1b87f537e420 Mon Sep 17 00:00:00 2001 From: Paul Dittamo <37558497+pvditt@users.noreply.github.com> Date: Mon, 23 Dec 2024 11:50:43 -0800 Subject: [PATCH 07/13] add support for toggling data mode for array node (#2940) * add support for toggling data mode for array node Signed-off-by: Paul Dittamo * clean up Signed-off-by: Paul Dittamo * clean up Signed-off-by: Paul Dittamo * cleanup Signed-off-by: Paul Dittamo * Bump flyteidl lower-bound to 1.14.1 Signed-off-by: Eduardo Apolinario * Add import of FlyteLaunchPlan back Signed-off-by: Eduardo Apolinario --------- Signed-off-by: Paul Dittamo Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario --- flytekit/core/array_node.py | 27 +++++++++++++++++---------- flytekit/core/array_node_map_task.py | 3 ++- flytekit/models/core/workflow.py | 3 +++ flytekit/tools/translator.py | 1 + pyproject.toml | 2 +- 5 files changed, 24 insertions(+), 12 deletions(-) diff --git a/flytekit/core/array_node.py b/flytekit/core/array_node.py index 0cb2c8d25c..466058a791 100644 --- a/flytekit/core/array_node.py +++ b/flytekit/core/array_node.py @@ -19,6 +19,7 @@ flyte_entity_call_handler, translate_inputs_to_literals, ) +from flytekit.core.task import ReferenceTask from flytekit.loggers import logger from flytekit.models import interface as _interface_models from flytekit.models import literals as _literal_models @@ -34,8 +35,7 @@ class ArrayNode: def __init__( self, - target: Union[LaunchPlan, "FlyteLaunchPlan"], - execution_mode: _core_workflow.ArrayNode.ExecutionMode = _core_workflow.ArrayNode.FULL_STATE, + target: Union[LaunchPlan, ReferenceTask, "FlyteLaunchPlan"], bindings: Optional[List[_literal_models.Binding]] = None, concurrency: Optional[int] = None, min_successes: Optional[int] = None, @@ -51,17 +51,17 @@ def __init__( :param min_successes: The minimum number of successful executions. If set, this takes precedence over min_success_ratio :param min_success_ratio: The minimum ratio of successful executions. - :param execution_mode: The execution mode for propeller to use when handling ArrayNode :param metadata: The metadata for the underlying node """ from flytekit.remote import FlyteLaunchPlan self.target = target self._concurrency = concurrency - self._execution_mode = execution_mode self.id = target.name self._bindings = bindings or [] self.metadata = metadata + self._data_mode = None + self._execution_mode = None if min_successes is not None: self._min_successes = min_successes @@ -92,9 +92,12 @@ def __init__( else: raise ValueError("No interface found for the target entity.") - if isinstance(target, LaunchPlan) or isinstance(target, FlyteLaunchPlan): - if self._execution_mode != _core_workflow.ArrayNode.FULL_STATE: - raise ValueError("Only execution version 1 is supported for LaunchPlans.") + if isinstance(target, (LaunchPlan, FlyteLaunchPlan)): + self._data_mode = _core_workflow.ArrayNode.SINGLE_INPUT_FILE + self._execution_mode = _core_workflow.ArrayNode.FULL_STATE + elif isinstance(target, ReferenceTask): + self._data_mode = _core_workflow.ArrayNode.INDIVIDUAL_INPUT_FILES + self._execution_mode = _core_workflow.ArrayNode.MINIMAL_STATE else: raise ValueError(f"Only LaunchPlans are supported for now, but got {type(target)}") @@ -133,6 +136,10 @@ def upstream_nodes(self) -> List[Node]: def flyte_entity(self) -> Any: return self.target + @property + def data_mode(self) -> _core_workflow.ArrayNode.DataMode: + return self._data_mode + def local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], Promise, VoidPromise]: if self._remote_interface: raise ValueError("Mapping over remote entities is not supported in local execution.") @@ -254,7 +261,7 @@ def __call__(self, *args, **kwargs): def array_node( - target: Union[LaunchPlan, "FlyteLaunchPlan"], + target: Union[LaunchPlan, ReferenceTask, "FlyteLaunchPlan"], concurrency: Optional[int] = None, min_success_ratio: Optional[float] = None, min_successes: Optional[int] = None, @@ -275,8 +282,8 @@ def array_node( """ from flytekit.remote import FlyteLaunchPlan - if not isinstance(target, LaunchPlan) and not isinstance(target, FlyteLaunchPlan): - raise ValueError("Only LaunchPlans are supported for now.") + if not isinstance(target, (LaunchPlan, FlyteLaunchPlan, ReferenceTask)): + raise ValueError("Only LaunchPlans and ReferenceTasks are supported for now.") node = ArrayNode( target=target, diff --git a/flytekit/core/array_node_map_task.py b/flytekit/core/array_node_map_task.py index 44458a53d2..78b9611651 100644 --- a/flytekit/core/array_node_map_task.py +++ b/flytekit/core/array_node_map_task.py @@ -18,6 +18,7 @@ from flytekit.core.interface import transform_interface_to_list_interface from flytekit.core.launch_plan import LaunchPlan from flytekit.core.python_function_task import PythonFunctionTask, PythonInstanceTask +from flytekit.core.task import ReferenceTask from flytekit.core.type_engine import TypeEngine from flytekit.core.utils import timeit from flytekit.loggers import logger @@ -390,7 +391,7 @@ def map_task( """ from flytekit.remote import FlyteLaunchPlan - if isinstance(target, LaunchPlan) or isinstance(target, FlyteLaunchPlan): + if isinstance(target, (LaunchPlan, FlyteLaunchPlan, ReferenceTask)): return array_node( target=target, concurrency=concurrency, diff --git a/flytekit/models/core/workflow.py b/flytekit/models/core/workflow.py index 8d8bf9c9ef..f3fed3d4f3 100644 --- a/flytekit/models/core/workflow.py +++ b/flytekit/models/core/workflow.py @@ -390,6 +390,7 @@ def __init__( min_success_ratio=None, execution_mode=None, is_original_sub_node_interface=False, + data_mode=None, ) -> None: """ TODO: docstring @@ -401,6 +402,7 @@ def __init__( self._min_success_ratio = min_success_ratio self._execution_mode = execution_mode self._is_original_sub_node_interface = is_original_sub_node_interface + self._data_mode = data_mode @property def node(self) -> "Node": @@ -414,6 +416,7 @@ def to_flyte_idl(self) -> _core_workflow.ArrayNode: min_success_ratio=self._min_success_ratio, execution_mode=self._execution_mode, is_original_sub_node_interface=BoolValue(value=self._is_original_sub_node_interface), + data_mode=self._data_mode, ) @classmethod diff --git a/flytekit/tools/translator.py b/flytekit/tools/translator.py index 5c7a6d5eb4..ee905a4218 100644 --- a/flytekit/tools/translator.py +++ b/flytekit/tools/translator.py @@ -601,6 +601,7 @@ def get_serializable_array_node( min_success_ratio=array_node.min_success_ratio, execution_mode=array_node.execution_mode, is_original_sub_node_interface=array_node.is_original_sub_node_interface, + data_mode=array_node.data_mode, ) diff --git a/pyproject.toml b/pyproject.toml index 58c107cdc3..3dc782c507 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ dependencies = [ "diskcache>=5.2.1", "docker>=4.0.0", "docstring-parser>=0.9.0", - "flyteidl>=1.13.9", + "flyteidl>=1.14.1", "fsspec>=2023.3.0", "gcsfs>=2023.3.0", "googleapis-common-protos>=1.57", From 9b94910bd9ba9d622d2d63cb76c52259b664578c Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Thu, 26 Dec 2024 16:44:23 -0500 Subject: [PATCH 08/13] Store protos in local cache (#3022) * Store proto obj instead of model Literal in local cache Signed-off-by: Eduardo Apolinario * Remove unused file Signed-off-by: Eduardo Apolinario --------- Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario --- flytekit/core/local_cache.py | 48 ++++++++++++++---- flytekit/models/literals.py | 10 +++- tests/flytekit/unit/core/test_local_cache.py | 22 ++++++++ .../unit/core/testdata/pickled_value.bin | Bin 0 -> 396 bytes 4 files changed, 69 insertions(+), 11 deletions(-) create mode 100644 tests/flytekit/unit/core/testdata/pickled_value.bin diff --git a/flytekit/core/local_cache.py b/flytekit/core/local_cache.py index 7cd87e2a49..d6c7f93f99 100644 --- a/flytekit/core/local_cache.py +++ b/flytekit/core/local_cache.py @@ -1,9 +1,11 @@ from typing import Optional, Tuple from diskcache import Cache +from flyteidl.core.literals_pb2 import LiteralMap from flytekit import lazy_module -from flytekit.models.literals import Literal, LiteralCollection, LiteralMap +from flytekit.models.literals import Literal, LiteralCollection +from flytekit.models.literals import LiteralMap as ModelLiteralMap joblib = lazy_module("joblib") @@ -23,13 +25,16 @@ def _recursive_hash_placement(literal: Literal) -> Literal: literal_map = {} for key, literal_value in literal.map.literals.items(): literal_map[key] = _recursive_hash_placement(literal_value) - return Literal(map=LiteralMap(literal_map)) + return Literal(map=ModelLiteralMap(literal_map)) else: return literal def _calculate_cache_key( - task_name: str, cache_version: str, input_literal_map: LiteralMap, cache_ignore_input_vars: Tuple[str, ...] = () + task_name: str, + cache_version: str, + input_literal_map: ModelLiteralMap, + cache_ignore_input_vars: Tuple[str, ...] = (), ) -> str: # Traverse the literals and replace the literal with a new literal that only contains the hash literal_map_overridden = {} @@ -40,7 +45,7 @@ def _calculate_cache_key( # Generate a stable representation of the underlying protobuf by passing `deterministic=True` to the # protobuf library. - hashed_inputs = LiteralMap(literal_map_overridden).to_flyte_idl().SerializeToString(deterministic=True) + hashed_inputs = ModelLiteralMap(literal_map_overridden).to_flyte_idl().SerializeToString(deterministic=True) # Use joblib to hash the string representation of the literal into a fixed length string return f"{task_name}-{cache_version}-{joblib.hash(hashed_inputs)}" @@ -66,24 +71,47 @@ def clear(): @staticmethod def get( - task_name: str, cache_version: str, input_literal_map: LiteralMap, cache_ignore_input_vars: Tuple[str, ...] - ) -> Optional[LiteralMap]: + task_name: str, cache_version: str, input_literal_map: ModelLiteralMap, cache_ignore_input_vars: Tuple[str, ...] + ) -> Optional[ModelLiteralMap]: if not LocalTaskCache._initialized: LocalTaskCache.initialize() - return LocalTaskCache._cache.get( + serialized_obj = LocalTaskCache._cache.get( _calculate_cache_key(task_name, cache_version, input_literal_map, cache_ignore_input_vars) ) + if serialized_obj is None: + return None + + # If the serialized object is a model file, first convert it back to a proto object (which will force it to + # use the installed flyteidl proto messages) and then convert it to a model object. This will guarantee + # that the object is in the correct format. + if isinstance(serialized_obj, ModelLiteralMap): + return ModelLiteralMap.from_flyte_idl(ModelLiteralMap.to_flyte_idl(serialized_obj)) + elif isinstance(serialized_obj, bytes): + # If it is a bytes object, then it is a serialized proto object. + # We need to convert it to a model object first.o + pb_literal_map = LiteralMap() + pb_literal_map.ParseFromString(serialized_obj) + return ModelLiteralMap.from_flyte_idl(pb_literal_map) + else: + raise ValueError(f"Unexpected object type {type(serialized_obj)}") + @staticmethod def set( task_name: str, cache_version: str, - input_literal_map: LiteralMap, + input_literal_map: ModelLiteralMap, cache_ignore_input_vars: Tuple[str, ...], - value: LiteralMap, + value: ModelLiteralMap, ) -> None: if not LocalTaskCache._initialized: LocalTaskCache.initialize() LocalTaskCache._cache.set( - _calculate_cache_key(task_name, cache_version, input_literal_map, cache_ignore_input_vars), value + _calculate_cache_key( + task_name, + cache_version, + input_literal_map, + cache_ignore_input_vars, + ), + value.to_flyte_idl().SerializeToString(), ) diff --git a/flytekit/models/literals.py b/flytekit/models/literals.py index d65ebfafae..a4b5a1d359 100644 --- a/flytekit/models/literals.py +++ b/flytekit/models/literals.py @@ -979,7 +979,15 @@ def offloaded_metadata(self) -> Optional[LiteralOffloadedMetadata]: """ This value holds metadata about the offloaded literal. """ - return self._offloaded_metadata + # The following check might seem non-sensical, since `_offloaded_metadata` is set in the constructor. + # This is here to support backwards compatibility caused by the local cache implementation. Let me explain. + # The local cache pickles values and unpickles them. When unpickling, the constructor is not called, so there + # are cases where the `_offloaded_metadata` is not set (for example if you cache a value using flytekit<=1.13.6 + # and you load that value later using flytekit>1.13.6). + # In other words, this is a workaround to support backwards compatibility with the local cache. + if hasattr(self, "_offloaded_metadata"): + return self._offloaded_metadata + return None def to_flyte_idl(self): """ diff --git a/tests/flytekit/unit/core/test_local_cache.py b/tests/flytekit/unit/core/test_local_cache.py index cf3e90e338..0990541a84 100644 --- a/tests/flytekit/unit/core/test_local_cache.py +++ b/tests/flytekit/unit/core/test_local_cache.py @@ -1,4 +1,6 @@ import datetime +import pathlib +import pickle import re import sys import typing @@ -627,3 +629,23 @@ def test_set_cache_ignore_input_vars_without_set_cache(): @task(cache_ignore_input_vars=["a"]) def add(a: int, b: int) -> int: return a + b + + +@pytest.mark.serial +def test_cache_old_version_of_literal_map(): + cache_key = "t.produce_dc-1-ea65cfadb0079394a8be1f4aa1e96e2b" + + # Load a literal map from a previous version of the cache from a local file + with open(pathlib.Path(__file__).parent / "testdata/pickled_value.bin", "rb") as f: + literal_map = pickle.loads(f.read()) + LocalTaskCache._cache.set(cache_key, literal_map) + + assert _calculate_cache_key("t.produce_dc", "1", LiteralMap(literals={})) == cache_key + + # Hit the cache directly and confirm that the loaded object does not have the `_offloaded_metadata` attribute + literal_map = LocalTaskCache._cache.get(cache_key) + assert hasattr(literal_map.literals['o0'], "_offloaded_metadata") is False + + # Now load the same object from the cache and confirm that the `_offloaded_metadata` attribute is now present + loaded_literal_map = LocalTaskCache.get("t.produce_dc", "1", LiteralMap(literals={}), ()) + assert hasattr(loaded_literal_map.literals['o0'], "_offloaded_metadata") is True diff --git a/tests/flytekit/unit/core/testdata/pickled_value.bin b/tests/flytekit/unit/core/testdata/pickled_value.bin new file mode 100644 index 0000000000000000000000000000000000000000..71a45e1909e52c0612354de4b07eca2dfd08f031 GIT binary patch literal 396 zcmX|-u};G<5QdwoB*aKalm#)t)-Dm^2`V8r=s+LfWINZy!igi>K`Igw6{|PQ`!MrV z*h%50@AUt7`tROnU;AV-l)KouJaYiNW$jRt0W3E2;;x*SKcsp4&&lpV+kd-)S&h2 hp1Bj?N=oE*j68!T{4U;DQ&R`Xp+QUIM6?^jieEDdk$M0C literal 0 HcmV?d00001 From 7a5c5636c7c598525a071149f3ee3748774c8555 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 27 Dec 2024 14:54:14 -0500 Subject: [PATCH 09/13] Bump aiohttp from 3.9.5 to 3.10.11 (#3018) Bumps [aiohttp](https://github.com/aio-libs/aiohttp) from 3.9.5 to 3.10.11. - [Release notes](https://github.com/aio-libs/aiohttp/releases) - [Changelog](https://github.com/aio-libs/aiohttp/blob/master/CHANGES.rst) - [Commits](https://github.com/aio-libs/aiohttp/compare/v3.9.5...v3.10.11) --- updated-dependencies: - dependency-name: aiohttp dependency-type: indirect ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- dev-requirements.txt | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 9acff98cb6..26af8ad1bb 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -10,7 +10,9 @@ adlfs==2024.4.1 # via flytekit aiobotocore==2.13.0 # via s3fs -aiohttp==3.9.5 +aiohappyeyeballs==2.4.4 + # via aiohttp +aiohttp==3.10.11 # via # adlfs # aiobotocore @@ -113,10 +115,8 @@ filelock==3.14.0 # via # snowflake-connector-python # virtualenv -flyteidl @ git+https://github.com/flyteorg/flyte.git@master#subdirectory=flyteidl - # via - # -r dev-requirements.in - # flytekit +flyteidl==1.14.1 + # via flytekit frozenlist==1.4.1 # via # aiohttp @@ -244,7 +244,9 @@ keyring==25.2.1 keyrings-alt==5.0.1 # via -r dev-requirements.in kubernetes==29.0.0 - # via -r dev-requirements.in + # via + # -r dev-requirements.in + # flytekit markdown-it-py==3.0.0 # via # flytekit @@ -260,7 +262,7 @@ marshmallow-enum==1.5.1 # flytekit marshmallow-jsonschema==0.13.0 # via flytekit -mashumaro==3.13 +mashumaro==3.15 # via flytekit matplotlib-inline==0.1.7 # via @@ -345,6 +347,8 @@ prometheus-client==0.20.0 # via -r dev-requirements.in prompt-toolkit==3.0.45 # via ipython +propcache==0.2.1 + # via yarl proto-plus==1.23.0 # via # google-api-core @@ -557,7 +561,7 @@ websocket-client==1.8.0 # kubernetes wrapt==1.16.0 # via aiobotocore -yarl==1.9.4 +yarl==1.18.3 # via aiohttp zipp==3.19.1 # via importlib-metadata From edbf99244114eec1ae79209e6ad9c609448edf22 Mon Sep 17 00:00:00 2001 From: Pim de Haan Date: Sat, 28 Dec 2024 00:23:52 +0100 Subject: [PATCH 10/13] Fix bug in FlyteDirectory.listdir on local files (#2926) * Fix issue in FlyteDirectory.listdir Fixes flyteorg/flyte#6005 Signed-off-by: Pim de Haan * Added test Signed-off-by: Pim de Haan * Run make lint Signed-off-by: Eduardo Apolinario --------- Signed-off-by: Pim de Haan Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario --- flytekit/types/directory/types.py | 7 +++-- .../unit/types/directory/test_listdir.py | 31 +++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) create mode 100644 tests/flytekit/unit/types/directory/test_listdir.py diff --git a/flytekit/types/directory/types.py b/flytekit/types/directory/types.py index a7c0aacc83..3bcbf77c97 100644 --- a/flytekit/types/directory/types.py +++ b/flytekit/types/directory/types.py @@ -367,10 +367,11 @@ def listdir(cls, directory: FlyteDirectory) -> typing.List[typing.Union[FlyteDir file_access = FlyteContextManager.current_context().file_access if not file_access.is_remote(final_path): for p in os.listdir(final_path): - if os.path.isfile(os.path.join(final_path, p)): - paths.append(FlyteFile(p)) + joined_path = os.path.join(final_path, p) + if os.path.isfile(joined_path): + paths.append(FlyteFile(joined_path)) else: - paths.append(FlyteDirectory(p)) + paths.append(FlyteDirectory(joined_path)) return paths def create_downloader(_remote_path: str, _local_path: str, is_multipart: bool): diff --git a/tests/flytekit/unit/types/directory/test_listdir.py b/tests/flytekit/unit/types/directory/test_listdir.py new file mode 100644 index 0000000000..0987456907 --- /dev/null +++ b/tests/flytekit/unit/types/directory/test_listdir.py @@ -0,0 +1,31 @@ +import tempfile +from pathlib import Path + +from flytekit import FlyteDirectory, FlyteFile, map_task, task, workflow + +def test_listdir(): + @task + def setup() -> FlyteDirectory: + tmpdir = Path(tempfile.mkdtemp()) + (tmpdir / "file.txt").write_text("Hello, World!") + return FlyteDirectory(tmpdir) + + + @task + def read_file(file: FlyteFile) -> str: + with open(file, "r") as f: + return f.read() + + + @task + def list_dir(dir: FlyteDirectory) -> list[FlyteFile]: + return FlyteDirectory.listdir(dir) + + + @workflow + def wf() -> list[str]: + tmpdir = setup() + files = list_dir(dir=tmpdir) + return map_task(read_file)(file=files) + + assert wf() == ["Hello, World!"] From 60fa4177a8278fc53da79bb070c7f5202be7ad7d Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 30 Dec 2024 11:38:18 -0800 Subject: [PATCH 11/13] Fix unit tests in airflow plugin (#3024) Signed-off-by: Kevin Su --- plugins/flytekit-airflow/setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/flytekit-airflow/setup.py b/plugins/flytekit-airflow/setup.py index 98d1b38bf1..ccfe1f81fc 100644 --- a/plugins/flytekit-airflow/setup.py +++ b/plugins/flytekit-airflow/setup.py @@ -6,6 +6,7 @@ plugin_requires = [ "apache-airflow", + "apache-airflow-providers-google<12.0.0", "flytekit>1.10.7", "flyteidl>1.10.7", ] From fdcfde3ce4aeec6271c6e765f75af6c7fd6c409b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=9F=E5=AE=B6=E7=91=8B?= <36886416+JiangJiaWei1103@users.noreply.github.com> Date: Tue, 31 Dec 2024 09:22:54 +0800 Subject: [PATCH 12/13] fix: Fix resource meta typos for async agent (#3023) Signed-off-by: JiaWei Jiang --- flytekit/extend/backend/agent_service.py | 4 ++-- flytekit/extend/backend/base_agent.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index ee97b9ddfb..8b76db9e32 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -117,14 +117,14 @@ async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerCon task_execution_metadata = TaskExecutionMetadata.from_flyte_idl(request.task_execution_metadata) logger.info(f"{agent.name} start creating the job") - resource_mata = await mirror_async_methods( + resource_meta = await mirror_async_methods( agent.create, task_template=template, inputs=inputs, output_prefix=request.output_prefix, task_execution_metadata=task_execution_metadata, ) - return CreateTaskResponse(resource_meta=resource_mata.encode()) + return CreateTaskResponse(resource_meta=resource_meta.encode()) @record_agent_metrics async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse: diff --git a/flytekit/extend/backend/base_agent.py b/flytekit/extend/backend/base_agent.py index 16235a68ec..01bfe0fb7f 100644 --- a/flytekit/extend/backend/base_agent.py +++ b/flytekit/extend/backend/base_agent.py @@ -335,10 +335,10 @@ def execute(self: PythonTask, **kwargs) -> LiteralMap: task_template = get_serializable(OrderedDict(), ss, self).template self._agent = AgentRegistry.get_agent(task_template.type, task_template.task_type_version) - resource_mata = asyncio.run( + resource_meta = asyncio.run( self._create(task_template=task_template, output_prefix=output_prefix, inputs=kwargs) ) - resource = asyncio.run(self._get(resource_meta=resource_mata)) + resource = asyncio.run(self._get(resource_meta=resource_meta)) if resource.phase != TaskExecution.SUCCEEDED: raise FlyteUserException(f"Failed to run the task {self.name} with error: {resource.message}") From c95cc634359ebc793cccdf18a2872f2974ed8662 Mon Sep 17 00:00:00 2001 From: V <0426vincent@gmail.com> Date: Tue, 31 Dec 2024 21:24:56 -0800 Subject: [PATCH 13/13] fix: format commands output (#3026) --- flytekit/clis/sdk_in_container/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/clis/sdk_in_container/run.py b/flytekit/clis/sdk_in_container/run.py index 7d661c3ff8..4c2aebcc36 100644 --- a/flytekit/clis/sdk_in_container/run.py +++ b/flytekit/clis/sdk_in_container/run.py @@ -1057,7 +1057,7 @@ def _create_command( h = h + click.style(f" (LP Name: {loaded_entity.name})", fg="yellow") else: if loaded_entity.__doc__: - h = h + click.style(f"{loaded_entity.__doc__}", dim=True) + h = h + click.style(f" {loaded_entity.__doc__}", dim=True) cmd = YamlFileReadingCommand( name=entity_name, params=params,