Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate KFP v1 SDK support #6924

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
1 change: 0 additions & 1 deletion build/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ sh_binary(
"//tfx/extensions/experimental/kfp_compatibility/proto:kfp_component_spec_pb2.py",
"//tfx/extensions/google_cloud_big_query/experimental/elwc_example_gen/proto:elwc_config_pb2.py",
"//tfx/orchestration/experimental/core:component_generated_alert_pb2.py",
"//tfx/orchestration/kubeflow/proto:kubeflow_pb2.py",
"//tfx/proto:bulk_inferrer_pb2.py",
"//tfx/proto:distribution_validator_pb2.py",
"//tfx/proto:evaluator_pb2.py",
Expand Down
4 changes: 2 additions & 2 deletions test_constraints.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
Flask-session<0.6.0

#TODO(b/329181965): Remove once we migrate TFX to 2.16.
tensorflow<2.16
tensorflow-text<2.16
tensorflow>=2.15.1,<2.16
tensorflow-text<2.16
18 changes: 7 additions & 11 deletions tfx/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,8 @@ def make_pipeline_sdk_required_install_packages():
"google-api-python-client>=1.8,<2",
# TODO(b/176812386): Deprecate usage of jinja2 for placeholders.
"jinja2>=2.7.3,<4",
# typing-extensions allows consistent & future-proof interface for typing.
# Since kfp<2 uses typing-extensions<4, lower bound is the latest 3.x, and
# upper bound is <5 as the semver started from 4.0 according to their doc.
"typing-extensions>=3.10.0.2,<5",
# Upper bound is <5 as the semver started from 4.0 according to their doc.
"typing-extensions<5",
]


Expand All @@ -90,7 +88,7 @@ def make_required_install_packages():
"google-cloud-bigquery>=3,<4",
"grpcio>=1.28.1,<2",
"keras-tuner>=1.0.4,<2,!=1.4.0,!=1.4.1",
"kubernetes>=10.0.1,<13",
"kubernetes>=10.0.1,<27",
"numpy>=1.16,<2",
"pyarrow>=10,<11",
# TODO: b/358471141 - Orjson 3.10.7 breaks TFX OSS tests.
Expand Down Expand Up @@ -148,9 +146,8 @@ def make_extra_packages_airflow():
def make_extra_packages_kfp():
"""Prepare extra packages needed for Kubeflow Pipelines orchestrator."""
return [
# TODO(b/304892416): Migrate from KFP SDK v1 to v2.
"kfp>=1.8.14,<2",
"kfp-pipeline-spec>0.1.13,<0.2",
"kfp>=2",
"kfp-pipeline-spec>=0.2.2",
]


Expand All @@ -171,9 +168,8 @@ def make_extra_packages_test():
def make_extra_packages_docker_image():
# Packages needed for tfx docker image.
return [
# TODO(b/304892416): Migrate from KFP SDK v1 to v2.
"kfp>=1.8.14,<2",
"kfp-pipeline-spec>0.1.13,<0.2",
"kfp>=2",
"kfp-pipeline-spec>=0.2.2",
"mmh>=2.2,<3",
"python-snappy>=0.5,<0.6",
# Required for tfx/examples/penguin/penguin_utils_cloud_tuner.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def setUp(self):
self._experimental_root = os.path.dirname(__file__)
self._penguin_root = os.path.dirname(self._experimental_root)

self._pipeline_name = 'sklearn_test'
self._pipeline_name = 'sklearn-test'
self._data_root = os.path.join(self._penguin_root, 'data')
self._trainer_module_file = os.path.join(
self._experimental_root, 'penguin_utils_sklearn.py')
Expand Down Expand Up @@ -66,6 +66,8 @@ def testPipelineConstruction(self, resolve_mock):
beam_pipeline_args=[])
self.assertEqual(8, len(logical_pipeline.components))

tfx.orchestration.experimental.KubeflowDagRunner().run(logical_pipeline)
file_path = os.path.join(self.tmp_dir, 'sklearn_test.tar.gz')
tfx.orchestration.experimental.KubeflowV2DagRunner(
config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_filename='sklearn_test.yaml').run(logical_pipeline)
file_path = os.path.join(self.tmp_dir, 'sklearn_test.yaml')
self.assertTrue(tfx.dsl.io.fileio.exists(file_path))
48 changes: 21 additions & 27 deletions tfx/examples/penguin/penguin_pipeline_kubeflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,33 +501,27 @@ def main():
else:
beam_pipeline_args = _beam_pipeline_args_by_runner['DirectRunner']

if use_vertex:
dag_runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_filename=_pipeline_definition_file)
else:
dag_runner = tfx.orchestration.experimental.KubeflowDagRunner(
config=tfx.orchestration.experimental.KubeflowDagRunnerConfig(
kubeflow_metadata_config=tfx.orchestration.experimental
.get_default_kubeflow_metadata_config()))

dag_runner.run(
create_pipeline(
pipeline_name=_pipeline_name,
pipeline_root=_pipeline_root,
data_root=_data_root,
module_file=_module_file,
enable_tuning=False,
enable_cache=True,
user_provided_schema_path=_user_provided_schema,
ai_platform_training_args=_ai_platform_training_args,
ai_platform_serving_args=_ai_platform_serving_args,
beam_pipeline_args=beam_pipeline_args,
use_cloud_component=use_cloud_component,
use_aip=use_aip,
use_vertex=use_vertex,
serving_model_dir=_serving_model_dir,
))
dag_runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_filename=_pipeline_definition_file)

dag_runner.run(
create_pipeline(
pipeline_name=_pipeline_name,
pipeline_root=_pipeline_root,
data_root=_data_root,
module_file=_module_file,
enable_tuning=False,
enable_cache=True,
user_provided_schema_path=_user_provided_schema,
ai_platform_training_args=_ai_platform_training_args,
ai_platform_serving_args=_ai_platform_serving_args,
beam_pipeline_args=beam_pipeline_args,
use_cloud_component=use_cloud_component,
use_aip=use_aip,
use_vertex=use_vertex,
serving_model_dir=_serving_model_dir,
))


# To compile the pipeline:
Expand Down
51 changes: 0 additions & 51 deletions tfx/examples/penguin/penguin_pipeline_kubeflow_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from absl.testing import parameterized
from tfx.dsl.io import fileio
from tfx.examples.penguin import penguin_pipeline_kubeflow
from tfx.orchestration.kubeflow import test_utils as kubeflow_test_utils
from tfx.orchestration.kubeflow.v2.e2e_tests import base_test_case
from tfx.utils import io_utils

Expand Down Expand Up @@ -80,53 +79,3 @@ def testEndToEndPipelineRun(self, use_pipeline_spec_2_1):
use_pipeline_spec_2_1=use_pipeline_spec_2_1,
)
self.assertTrue(fileio.exists(self._serving_model_dir))

@pytest.mark.e2e
class PenguinPipelineKubeflowTest(kubeflow_test_utils.BaseKubeflowTest):

def setUp(self):
super().setUp()
penguin_examples_dir = os.path.join(self._REPO_BASE, 'tfx', 'examples',
'penguin')
penguin_test_data_root = os.path.join(penguin_examples_dir, 'data')
penguin_test_schema_file = os.path.join(penguin_examples_dir, 'schema',
'user_provided', 'schema.pbtxt')
self._penguin_module_file = os.path.join(penguin_examples_dir,
'penguin_utils_cloud_tuner.py')
self._penguin_data_root = os.path.join(self._test_data_dir, 'data')
self._penguin_schema_file = os.path.join(self._test_data_dir,
'schema.pbtxt')

io_utils.copy_dir(penguin_test_data_root, self._penguin_data_root)
io_utils.copy_file(
penguin_test_schema_file, self._penguin_schema_file, overwrite=True)

def testEndToEndPipelineRun(self):
"""End-to-end test for pipeline with RuntimeParameter."""
pipeline_name = 'kubeflow-v1-e2e-test-{}'.format(self._test_id)
kubeflow_pipeline = penguin_pipeline_kubeflow.create_pipeline(
pipeline_name=pipeline_name,
pipeline_root=self._pipeline_root(pipeline_name),
data_root=self._penguin_data_root,
module_file=self._penguin_module_file,
enable_tuning=False,
enable_cache=True,
user_provided_schema_path=self._penguin_schema_file,
ai_platform_training_args=penguin_pipeline_kubeflow
._ai_platform_training_args,
ai_platform_serving_args=penguin_pipeline_kubeflow
._ai_platform_serving_args,
beam_pipeline_args=penguin_pipeline_kubeflow
._beam_pipeline_args_by_runner['DirectRunner'],
use_cloud_component=False,
use_aip=False,
use_vertex=False,
serving_model_dir=self._serving_model_dir)

parameters = {
'train-args': '{"num_steps": 100}',
'eval-args': '{"num_steps": 50}',
}
self._compile_and_run_pipeline(
pipeline=kubeflow_pipeline, parameters=parameters)
self.assertTrue(fileio.exists(self._serving_model_dir))
25 changes: 8 additions & 17 deletions tfx/examples/penguin/penguin_pipeline_kubeflow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,11 @@ def testPenguinPipelineConstructionAndDefinitionFileExists(
serving_model_dir=penguin_pipeline_kubeflow._serving_model_dir)
self.assertLen(kubeflow_pipeline.components, 9)

if use_vertex:
v2_dag_runner = orchestration.experimental.KubeflowV2DagRunner(
config=orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_dir=self.tmp_dir,
output_filename=penguin_pipeline_kubeflow._pipeline_definition_file)
v2_dag_runner.run(kubeflow_pipeline)
file_path = os.path.join(
self.tmp_dir, penguin_pipeline_kubeflow._pipeline_definition_file)
self.assertTrue(fileio.exists(file_path))
else:
v1_dag_runner = orchestration.experimental.KubeflowDagRunner(
config=orchestration.experimental.KubeflowDagRunnerConfig(
kubeflow_metadata_config=orchestration.experimental
.get_default_kubeflow_metadata_config()))
v1_dag_runner.run(kubeflow_pipeline)
file_path = os.path.join(self.tmp_dir, 'penguin-kubeflow.tar.gz')
self.assertTrue(fileio.exists(file_path))
v2_dag_runner = orchestration.experimental.KubeflowV2DagRunner(
config=orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_dir=self.tmp_dir,
output_filename=penguin_pipeline_kubeflow._pipeline_definition_file)
v2_dag_runner.run(kubeflow_pipeline)
file_path = os.path.join(
self.tmp_dir, penguin_pipeline_kubeflow._pipeline_definition_file)
self.assertTrue(fileio.exists(file_path))
145 changes: 0 additions & 145 deletions tfx/experimental/templates/container_based_test_case.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is probably not related to this PR, but the test tests (3.10, not e2e, DEFAULT) is failing in this PR because of KEYERROR, and _BASE_CONTAINER_IMAGE = os.environ['KFP_E2E_BASE_CONTAINER_IMAGE'] seems to be a culprint of the failure.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should update some CI settings and/or some test utility files. Since py_test collects non-e2e tests at runtime, the unit tests are complaining that there are no valid environments.

cc: @peytondmurray

Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,16 @@

import datetime
import os
import subprocess
import tarfile

from absl import logging
from google.cloud import aiplatform
import kfp
from tfx.dsl.io import fileio
from tfx.experimental.templates import test_utils
from tfx.orchestration import test_utils as orchestration_test_utils
from tfx.orchestration.kubeflow import test_utils as kubeflow_test_utils
from tfx.orchestration.kubeflow.v2 import vertex_client_utils
from tfx.utils import docker_utils
from tfx.utils import io_utils
from tfx.utils import retry
from tfx.utils import telemetry_utils
from tfx.utils import test_case_utils
import yaml


class BaseContainerBasedEndToEndTest(test_utils.BaseEndToEndTest):
Expand Down Expand Up @@ -111,144 +104,6 @@ def _delete_target_container_image(self):
docker_utils.delete_image(self._target_container_image)


class BaseKubeflowEndToEndTest(BaseContainerBasedEndToEndTest):
"""Common utilities for kubeflow engine."""

_RETRY_LIMIT = 3

# This default bucket name is valid for KFP marketplace deployment since KFP
# version 0.5.0.
_BUCKET_NAME = (
BaseContainerBasedEndToEndTest._GCP_PROJECT_ID +
'-kubeflowpipelines-default')

def setUp(self):
super().setUp()
self._namespace = 'kubeflow'
self._endpoint = self._get_endpoint(self._namespace)
self._kfp_client = kfp.Client(host=self._endpoint)
logging.info('ENDPOINT: %s', self._endpoint)
self.enter_context(
test_case_utils.override_env_var(
'KUBEFLOW_HOME', os.path.join(self._temp_dir, 'kubeflow')))

def tearDown(self):
super().tearDown()
self._delete_runs()
self._delete_pipeline()

def _get_endpoint(self, namespace):
cmd = 'kubectl describe configmap inverse-proxy-config -n {}'.format(
namespace)
output = subprocess.check_output(cmd.split())
for line in output.decode('utf-8').split('\n'):
if line.endswith('googleusercontent.com'):
return line

def _get_kfp_runs(self):
# CLI uses experiment_name which is the same as pipeline_name.
experiment_id = self._kfp_client.get_experiment(
experiment_name=self._pipeline_name).id
response = self._kfp_client.list_runs(experiment_id=experiment_id)
return response.runs

@retry.retry(ignore_eventual_failure=True)
def _delete_runs(self):
for run in self._get_kfp_runs():
self._kfp_client._run_api.delete_run(id=run.id) # pylint: disable=protected-access

@retry.retry(ignore_eventual_failure=True)
def _delete_pipeline(self):
self._runCli([
'pipeline', 'delete', '--engine', 'kubeflow', '--pipeline_name',
self._pipeline_name
])

def _parse_run_id(self, output: str):
run_id_lines = [
line for line in output.split('\n')
if '| {} |'.format(self._pipeline_name) in line
]
self.assertLen(run_id_lines, 1)
return run_id_lines[0].split('|')[2].strip()

def _wait_until_completed(self, run_id: str):
end_state = kubeflow_test_utils.poll_kfp_with_retry(
self._endpoint, run_id, self._RETRY_LIMIT, self._TIME_OUT,
self._POLLING_INTERVAL_IN_SECONDS)
self.assertEqual(end_state.lower(), kubeflow_test_utils.KFP_SUCCESS_STATUS)

def _create_pipeline(self):
self._runCli([
'pipeline',
'create',
'--engine',
'kubeflow',
'--pipeline_path',
'kubeflow_runner.py',
'--endpoint',
self._endpoint,
'--build-image',
'--build-base-image',
self._base_container_image,
])

def _compile_pipeline(self):
self._runCli([
'pipeline',
'compile',
'--engine',
'kubeflow',
'--pipeline_path',
'kubeflow_runner.py',
])

def _update_pipeline(self):
self._runCli([
'pipeline',
'update',
'--engine',
'kubeflow',
'--pipeline_path',
'kubeflow_runner.py',
'--endpoint',
self._endpoint,
'--build-image',
])

def _run_pipeline(self):
result = self._runCli([
'run',
'create',
'--engine',
'kubeflow',
'--pipeline_name',
self._pipeline_name,
'--endpoint',
self._endpoint,
])
run_id = self._parse_run_id(result)
self._wait_until_completed(run_id)
kubeflow_test_utils.print_failure_log_for_run(self._endpoint, run_id,
self._namespace)

def _check_telemetry_label(self):
file_path = os.path.join(self._project_dir,
'{}.tar.gz'.format(self._pipeline_name))
self.assertTrue(fileio.exists(file_path))

with tarfile.TarFile.open(file_path).extractfile(
'pipeline.yaml') as pipeline_file:
self.assertIsNotNone(pipeline_file)
pipeline = yaml.safe_load(pipeline_file)
metadata = [
c['metadata'] for c in pipeline['spec']['templates'] if 'dag' not in c
]
for m in metadata:
self.assertEqual('tfx-template',
m['labels'][telemetry_utils.LABEL_KFP_SDK_ENV])


class BaseVertexEndToEndTest(BaseContainerBasedEndToEndTest):
"""Common utilities for vertex engine."""

Expand Down
Loading
Loading