Skip to content

Commit

Permalink
Add timed pipeline operations. Closes #155 (#156)
Browse files Browse the repository at this point in the history
* Add timed pipeline operations. Closes #155

* Add tests for pipeline config
  • Loading branch information
umesh-timalsina authored Aug 2, 2023
1 parent 18d1043 commit 16c5962
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 5 deletions.
17 changes: 12 additions & 5 deletions chimerapy/orchestrator/cli/__main__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import sys
import time
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from pathlib import Path

Expand Down Expand Up @@ -40,11 +41,17 @@ def orchestrate(config: ChimeraPyPipelineConfig):

manager.record().result(timeout=config.timeouts.record_timeout)

# Wail until user stops
while True:
q = input("Stop? (Y/n)")
if q.lower() == "y":
break
# Wait until user stops
if config.runtime is None:
while True:
q = input("Stop? (Y/n)")
if q.lower() == "y":
break
else: # Wait for runtime to elapse
start_time = time.time()
elapsed_time = time.time() - start_time
while elapsed_time < config.runtime:
elapsed_time = time.time() - start_time

manager.stop().result(timeout=config.timeouts.stop_timeout)
manager.collect().result(timeout=config.timeouts.collect_timeout)
Expand Down
5 changes: 5 additions & 0 deletions chimerapy/orchestrator/models/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ class ChimeraPyPipelineConfig(BaseModel):
..., description="The nodes in the pipeline_service."
)

runtime: Optional[int] = Field(
default=None,
description="The runtime of the pipeline_service in seconds.",
)

adj: List[Tuple[str, str]] = Field(
..., description="The edge list of the pipeline_service graph."
)
Expand Down
1 change: 1 addition & 0 deletions chimerapy/orchestrator/tests/data/dummy_pipeline.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"runtime": 2000,
"workers": {
"manager_ip": "localhost",
"manager_port": 8000,
Expand Down
5 changes: 5 additions & 0 deletions chimerapy/orchestrator/tests/models/test_pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ def dummy_pipeline_config(self) -> ChimeraPyPipelineConfig:
with get_test_file_path("dummy_pipeline.json").open() as f:
return ChimeraPyPipelineConfig.model_validate(json.load(f))

def test_pipeline_config(self, dummy_pipeline_config):
assert dummy_pipeline_config.name == "Pipeline"
assert dummy_pipeline_config.description == "A pipeline"
assert dummy_pipeline_config.runtime == 2000

def test_worker_config(self, dummy_pipeline_config):
assert dummy_pipeline_config.workers.manager_ip == "localhost"
assert dummy_pipeline_config.workers.manager_port == 8000
Expand Down
38 changes: 38 additions & 0 deletions configs/local_camera_2mins.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"name": "webcam-demo",
"description": "A demo of the webcam node and the show window node",
"runtime": 120,
"mode": "record",
"workers": {
"manager_ip": "129.59.104.153",
"manager_port": 9001,
"instances": [
{
"name": "local",
"id": "local",
"remote": false,
"description": "Worker 1 for the webcam demo with a webcam node and a show window node"
}
]
},
"nodes": [
"WebcamNode",
"ShowWindow"
],
"adj": [
[
"WebcamNode",
"ShowWindow"
]
],
"manager_config": {
"logdir": "cp-logs",
"port": 9001
},
"mappings": {
"local": [
"WebcamNode",
"ShowWindow"
]
}
}

0 comments on commit 16c5962

Please sign in to comment.