Skip to content

Commit 7481a7c

Browse files
Merge pull request #46 from glassflow/feature/add-status-to-pipeline
Feature/add status to pipeline
2 parents 55983b9 + 529a8bd commit 7481a7c

File tree

7 files changed

+38
-9
lines changed

7 files changed

+38
-9
lines changed

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.0.1
1+
3.0.2

src/glassflow/etl/models/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
JoinSourceConfigPatch,
99
JoinType,
1010
)
11-
from .pipeline import PipelineConfig, PipelineConfigPatch
11+
from .pipeline import PipelineConfig, PipelineConfigPatch, PipelineStatus
1212
from .sink import SinkConfig, SinkConfigPatch, SinkType, TableMapping
1313
from .source import (
1414
ConsumerGroupOffset,
@@ -40,6 +40,7 @@
4040
"JoinType",
4141
"PipelineConfig",
4242
"PipelineConfigPatch",
43+
"PipelineStatus",
4344
"SinkConfig",
4445
"SinkType",
4546
"TableMapping",

src/glassflow/etl/models/pipeline.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,24 @@
44
from pydantic import BaseModel, Field, field_validator, model_validator
55

66
from ..errors import InvalidDataTypeMappingError
7+
from .base import CaseInsensitiveStrEnum
78
from .data_types import kafka_to_clickhouse_data_type_mappings
89
from .join import JoinConfig, JoinConfigPatch
910
from .sink import SinkConfig, SinkConfigPatch
1011
from .source import SourceConfig, SourceConfigPatch
1112

1213

14+
class PipelineStatus(CaseInsensitiveStrEnum):
15+
CREATED = "Created"
16+
RUNNING = "Running"
17+
PAUSING = "Pausing"
18+
PAUSED = "Paused"
19+
RESUMING = "Resuming"
20+
TERMINATING = "Terminating"
21+
TERMINATED = "Terminated"
22+
FAILED = "Failed"
23+
24+
1325
class PipelineConfig(BaseModel):
1426
pipeline_id: str
1527
name: Optional[str] = Field(default=None)

src/glassflow/etl/pipeline.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def __init__(
5252
self.config = None
5353

5454
self._dlq = DLQ(pipeline_id=self.pipeline_id, host=host)
55+
self.status: models.PipelineStatus | None = None
5556

5657
def get(self) -> Pipeline:
5758
"""Fetch a pipeline by its ID.
@@ -67,6 +68,7 @@ def get(self) -> Pipeline:
6768
"GET", f"{self.ENDPOINT}/{self.pipeline_id}", event_name="PipelineGet"
6869
)
6970
self.config = models.PipelineConfig.model_validate(response.json())
71+
self.status = models.PipelineStatus(response.json()["status"])
7072
self._dlq = DLQ(pipeline_id=self.pipeline_id, host=self.host)
7173
return self
7274

@@ -94,6 +96,7 @@ def create(self) -> Pipeline:
9496
),
9597
event_name="PipelineCreated",
9698
)
99+
self.status = models.PipelineStatus.CREATED
97100
return self
98101

99102
except errors.ForbiddenError as e:
@@ -160,6 +163,7 @@ def delete(self, terminate: bool = True) -> None:
160163
self.get()
161164
endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/terminate"
162165
self._request("DELETE", endpoint, event_name="PipelineDeleted")
166+
self.status = models.PipelineStatus.TERMINATING
163167

164168
def pause(self) -> Pipeline:
165169
"""Pauses the pipeline with the given ID.
@@ -191,11 +195,13 @@ def health(self) -> dict[str, Any]:
191195
Returns:
192196
dict: Pipeline health
193197
"""
194-
return self._request(
198+
response = self._request(
195199
"GET",
196200
f"{self.ENDPOINT}/{self.pipeline_id}/health",
197201
event_name="PipelineHealth",
198202
).json()
203+
self.status = models.PipelineStatus(response["overall_status"])
204+
return response
199205

200206
def to_dict(self) -> dict[str, Any]:
201207
"""Convert the pipeline configuration to a dictionary.

tests/conftest.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ def valid_config() -> dict:
2121
return pipeline_configs.get_valid_pipeline_config()
2222

2323

24+
@pytest.fixture
25+
def get_pipeline_response(valid_config) -> dict:
26+
"""Fixture for a valid pipeline configuration with status."""
27+
config = valid_config
28+
config["status"] = "Running"
29+
return config
30+
31+
2432
@pytest.fixture
2533
def valid_config_without_joins() -> dict:
2634
"""Fixture for a valid pipeline configuration without joins."""
@@ -88,11 +96,11 @@ def mock_connection_error():
8896

8997

9098
@pytest.fixture
91-
def mock_success_get_pipeline(valid_config):
99+
def mock_success_get_pipeline(get_pipeline_response):
92100
"""Fixture for a successful GET pipeline response."""
93101
return mock_responses.create_mock_response_factory()(
94102
status_code=200,
95-
json_data=valid_config,
103+
json_data=get_pipeline_response,
96104
)
97105

98106

tests/test_client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@ def test_client_init(self):
1818
assert client.host == "https://example.com"
1919
assert client.http_client.base_url == "https://example.com"
2020

21-
def test_client_get_pipeline_success(self, valid_config, mock_success_response):
21+
def test_client_get_pipeline_success(
22+
self, get_pipeline_response, mock_success_response
23+
):
2224
"""Test successful pipeline retrieval by ID."""
2325
client = Client()
2426
pipeline_id = "test-pipeline-id"
2527

26-
mock_success_response.json.return_value = valid_config
28+
mock_success_response.json.return_value = get_pipeline_response
2729

2830
with patch(
2931
"httpx.Client.request", return_value=mock_success_response

tests/test_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,14 @@ def test_lifecycle_operations(
9191
method,
9292
endpoint,
9393
params,
94-
valid_config,
94+
get_pipeline_response,
9595
):
9696
"""Test common pipeline lifecycle operations."""
9797
with patch(
9898
"httpx.Client.request", return_value=mock_success_response
9999
) as mock_request:
100100
if method == "GET":
101-
mock_request.return_value.json.return_value = valid_config
101+
mock_request.return_value.json.return_value = get_pipeline_response
102102
result = getattr(pipeline, operation)(**params)
103103
expected_endpoint = f"{pipeline.ENDPOINT}/{pipeline.pipeline_id}{endpoint}"
104104
mock_request.assert_called_once_with(method, expected_endpoint)

0 commit comments

Comments
 (0)