-
Notifications
You must be signed in to change notification settings - Fork 323
[test] Add integration test for accessing sd sttr in dc #2969
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
d5ae883
e8abb4a
7937169
fbfac6e
865c390
b640eb6
c27c7f3
a02ea53
83975d0
3fc02ea
838153d
be68c2d
148599b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -14,7 +14,7 @@ | |||||||||||||||||||||||
from urllib.parse import urlparse | ||||||||||||||||||||||||
import uuid | ||||||||||||||||||||||||
import pytest | ||||||||||||||||||||||||
from mock import mock, patch | ||||||||||||||||||||||||
import mock | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
from flytekit import LaunchPlan, kwtypes, WorkflowExecutionPhase | ||||||||||||||||||||||||
from flytekit.configuration import Config, ImageConfig, SerializationSettings | ||||||||||||||||||||||||
|
@@ -28,6 +28,10 @@ | |||||||||||||||||||||||
from flytekit.types.schema import FlyteSchema | ||||||||||||||||||||||||
from flytekit.clients.friendly import SynchronousFlyteClient as _SynchronousFlyteClient | ||||||||||||||||||||||||
from flytekit.configuration import PlatformConfig | ||||||||||||||||||||||||
from botocore.client import BaseClient | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
from tests.flytekit.integration.remote.utils import SimpleFileTransfer | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
|
||||||||||||||||||||||||
MODULE_PATH = pathlib.Path(__file__).parent / "workflows/basic" | ||||||||||||||||||||||||
CONFIG = os.environ.get("FLYTECTL_CONFIG", str(pathlib.Path.home() / ".flyte" / "config-sandbox.yaml")) | ||||||||||||||||||||||||
|
@@ -799,3 +803,21 @@ def test_get_control_plane_version(): | |||||||||||||||||||||||
client = _SynchronousFlyteClient(PlatformConfig.for_endpoint("localhost:30080", True)) | ||||||||||||||||||||||||
version = client.get_control_plane_version() | ||||||||||||||||||||||||
assert version == "unknown" or version.startswith("v") | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
|
||||||||||||||||||||||||
def test_attr_access_sd(): | ||||||||||||||||||||||||
"""Test accessing StructuredDataset attribute from a dataclass.""" | ||||||||||||||||||||||||
# Upload a file to minio s3 bucket | ||||||||||||||||||||||||
file_transfer = SimpleFileTransfer() | ||||||||||||||||||||||||
remote_file_path = file_transfer.upload_file(file_type="parquet") | ||||||||||||||||||||||||
Comment on lines
+841
to
+842
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding error handling for upload
Consider adding error handling around the file upload operation since network operations can fail. The Code suggestionCheck the AI-generated fix before applying
Suggested change
Code Review Run #4d8bc5 Is this a valid issue, or was it incorrectly flagged by the Agent?
|
||||||||||||||||||||||||
|
||||||||||||||||||||||||
execution_id = run("attr_access_sd.py", "wf", "--uri", remote_file_path) | ||||||||||||||||||||||||
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) | ||||||||||||||||||||||||
execution = remote.fetch_execution(name=execution_id) | ||||||||||||||||||||||||
execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) | ||||||||||||||||||||||||
assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}" | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
# Delete the remote file to free the space | ||||||||||||||||||||||||
url = urlparse(remote_file_path) | ||||||||||||||||||||||||
bucket, key = url.netloc, url.path.lstrip("/") | ||||||||||||||||||||||||
file_transfer.delete_file(bucket=bucket, key=key) | ||||||||||||||||||||||||
Comment on lines
+850
to
+853
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider handling file deletion errors
Consider adding error handling around the file deletion operation. The Code suggestionCheck the AI-generated fix before applying
Suggested change
Code Review Run #4d8bc5 Is this a valid issue, or was it incorrectly flagged by the Agent?
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,101 @@ | ||||||||
""" | ||||||||
Common utilities for flyte remote runs in integration tests. | ||||||||
""" | ||||||||
import os | ||||||||
import json | ||||||||
import tempfile | ||||||||
import pathlib | ||||||||
|
||||||||
import botocore.session | ||||||||
from botocore.client import BaseClient | ||||||||
from flytekit.configuration import Config | ||||||||
from flytekit.remote.remote import FlyteRemote | ||||||||
|
||||||||
|
||||||||
# Define constants | ||||||||
CONFIG = os.environ.get("FLYTECTL_CONFIG", str(pathlib.Path.home() / ".flyte" / "config-sandbox.yaml")) | ||||||||
PROJECT = "flytesnacks" | ||||||||
DOMAIN = "development" | ||||||||
|
||||||||
|
||||||||
class SimpleFileTransfer: | ||||||||
"""Utilities for file transfer to minio s3 bucket. | ||||||||
|
||||||||
Mainly support single file uploading and automatic teardown. | ||||||||
""" | ||||||||
|
||||||||
def __init__(self) -> None: | ||||||||
self._remote = FlyteRemote( | ||||||||
config=Config.auto(config_file=CONFIG), | ||||||||
default_project=PROJECT, | ||||||||
default_domain=DOMAIN | ||||||||
) | ||||||||
self._s3_client = self._get_minio_s3_client(self._remote) | ||||||||
|
||||||||
def _get_minio_s3_client(self, remote: FlyteRemote) -> BaseClient: | ||||||||
"""Creat a botocore client.""" | ||||||||
minio_s3_config = remote.file_access.data_config.s3 | ||||||||
sess = botocore.session.get_session() | ||||||||
|
||||||||
return sess.create_client( | ||||||||
"s3", | ||||||||
endpoint_url=minio_s3_config.endpoint, | ||||||||
aws_access_key_id=minio_s3_config.access_key_id, | ||||||||
aws_secret_access_key=minio_s3_config.secret_access_key, | ||||||||
) | ||||||||
|
||||||||
def upload_file(self, file_type: str) -> str: | ||||||||
"""Upload a single file to minio s3 bucket. | ||||||||
|
||||||||
Args: | ||||||||
file_type: File type. Support "txt" and "json". | ||||||||
|
||||||||
Returns: | ||||||||
remote_file_path: Remote file path. | ||||||||
""" | ||||||||
with tempfile.TemporaryDirectory() as tmp_dir: | ||||||||
local_file_path = self._dump_tmp_file(file_type, tmp_dir) | ||||||||
|
||||||||
# Upload to minio s3 bucket | ||||||||
_, remote_file_path = self._remote.upload_file( | ||||||||
to_upload=local_file_path, | ||||||||
project=PROJECT, | ||||||||
domain=DOMAIN, | ||||||||
) | ||||||||
|
||||||||
return remote_file_path | ||||||||
|
||||||||
def _dump_tmp_file(self, file_type: str, tmp_dir: str) -> str: | ||||||||
"""Generate and dump a temporary file locally. | ||||||||
|
||||||||
Args: | ||||||||
file_type: File type. | ||||||||
tmp_dir: Temporary directory. | ||||||||
|
||||||||
Returns: | ||||||||
tmp_file_path: Temporary local file path. | ||||||||
""" | ||||||||
if file_type == "txt": | ||||||||
tmp_file_path = pathlib.Path(tmp_dir) / "test.txt" | ||||||||
with open(tmp_file_path, "w") as f: | ||||||||
f.write("Hello World!") | ||||||||
elif file_type == "json": | ||||||||
d = {"name": "john", "height": 190} | ||||||||
tmp_file_path = pathlib.Path(tmp_dir) / "test.json" | ||||||||
with open(tmp_file_path, "w") as f: | ||||||||
json.dump(d, f) | ||||||||
elif file_type == "parquet": | ||||||||
# Because `upload_file` accepts a single file only, we specify 00000 to make it a single file | ||||||||
tmp_file_path = pathlib.Path(__file__).parent / "workflows/basic/data/df.parquet/00000" | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider making test data path configurable
The hardcoded path Code suggestionCheck the AI-generated fix before applying
Suggested change
Code Review Run #4d8bc5 Is this a valid issue, or was it incorrectly flagged by the Agent?
|
||||||||
|
||||||||
return tmp_file_path | ||||||||
|
||||||||
def delete_file(self, bucket: str, key: str) -> None: | ||||||||
"""Delete the remote file from minio s3 bucket to free the space. | ||||||||
|
||||||||
Args: | ||||||||
bucket: s3 bucket name. | ||||||||
key: Key name of the object. | ||||||||
""" | ||||||||
res = self._s3_client.delete_object(Bucket=bucket, Key=key) | ||||||||
assert res["ResponseMetadata"]["HTTPStatusCode"] == 204 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
""" | ||
Test accessing StructuredDataset attribute from a dataclass. | ||
""" | ||
from dataclasses import dataclass | ||
|
||
import pandas as pd | ||
from flytekit import task, workflow | ||
from flytekit.types.structured import StructuredDataset | ||
|
||
|
||
@dataclass | ||
class DC: | ||
sd: StructuredDataset | ||
|
||
|
||
@task | ||
def create_dc(uri: str) -> DC: | ||
"""Create a dataclass with a StructuredDataset attribute. | ||
|
||
Args: | ||
uri: File URI. | ||
|
||
Returns: | ||
dc: A dataclass with a StructuredDataset attribute. | ||
""" | ||
dc = DC(sd=StructuredDataset(uri=uri, file_format="parquet")) | ||
|
||
return dc | ||
|
||
|
||
@task | ||
def read_sd(sd: StructuredDataset) -> StructuredDataset: | ||
"""Read input StructuredDataset.""" | ||
print("sd:", sd.open(pd.DataFrame).all()) | ||
|
||
return sd | ||
|
||
|
||
@workflow | ||
def wf(uri: str) -> None: | ||
dc = create_dc(uri=uri) | ||
read_sd(sd=dc.sd) | ||
|
||
|
||
if __name__ == "__main__": | ||
wf(uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet") |
Uh oh!
There was an error while loading. Please reload this page.