Skip to content

Commit 632b42a

Browse files
Fix StreamFlow MPI example
Prior to this commit, the MPI example was not working for these reasons: - When scheduling a job on multiple locations the STREAMFLOW_HOSTS variable was not correctly populated; - Helm connector had some issues with file transfers and tar streaming; - DockerCompose connector had issues with parsing available locations. All these issues are solved by this fix, which also fixes #82.
1 parent 5961dbb commit 632b42a

File tree

8 files changed

+118
-81
lines changed

8 files changed

+118
-81
lines changed

examples/mpi/streamflow.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,6 @@ deployments:
2828
type: helm
2929
config:
3030
chart: environment/helm/openmpi
31-
kubeconfig: /home/glassofwhiskey/.kube/config-streamflow
31+
kubeconfig: ~/.kube/config-streamflow
3232
releaseName: openmpi-rel
33+
workdir: /tmp

streamflow/core/scheduling.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,13 @@ def __init__(
129129
self,
130130
job: str,
131131
target: Target,
132-
locations: MutableSequence[Location],
132+
locations: MutableSequence[AvailableLocation],
133133
status: Status,
134134
hardware: Hardware,
135135
):
136136
self.job: str = job
137137
self.target: Target = target
138-
self.locations: MutableSequence[Location] = locations
138+
self.locations: MutableSequence[AvailableLocation] = locations
139139
self.status: Status = status
140140
self.hardware: Hardware = hardware
141141

@@ -232,7 +232,7 @@ def get_connector(self, job_name: str) -> Connector | None:
232232

233233
def get_locations(
234234
self, job_name: str, statuses: MutableSequence[Status] | None = None
235-
) -> MutableSequence[Location]:
235+
) -> MutableSequence[AvailableLocation]:
236236
allocation = self.get_allocation(job_name)
237237
return (
238238
allocation.locations

streamflow/cwl/command.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -778,14 +778,9 @@ async def execute(self, job: Job) -> CWLCommandOutput:
778778
]
779779
# If step is assigned to multiple locations, add the STREAMFLOW_HOSTS environment variable
780780
if len(locations) > 1:
781-
service = self.step.workflow.context.scheduler.get_service(job.name)
782-
available_locations = await connector.get_available_locations(
783-
service=service
781+
parsed_env["STREAMFLOW_HOSTS"] = ",".join(
782+
[loc.hostname for loc in locations]
784783
)
785-
hosts = {
786-
k: v.hostname for k, v in available_locations.items() if k in locations
787-
}
788-
parsed_env["STREAMFLOW_HOSTS"] = ",".join(hosts.values())
789784
# Process streams
790785
stdin = utils.eval_expression(
791786
expression=self.stdin,

streamflow/cwl/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
def _parse_arg(path: str, context: StreamFlowContext):
2121
if "://" in path:
2222
return path
23-
elif os.path.isabs(path):
23+
elif not os.path.isabs(path):
2424
return os.path.join(os.path.dirname(context.config["path"]), path)
2525
else:
2626
return path

streamflow/cwl/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ async def _register_path(
189189
):
190190
data_location = data_locations[0]
191191
else:
192-
raise WorkflowExecutionException(f"Error registering path {path}")
192+
return None
193193
link_location = context.data_manager.register_path(
194194
location=location,
195195
path=path,

streamflow/deployment/connector/container.py

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -760,8 +760,16 @@ async def deploy(self, external: bool) -> None:
760760
)
761761
if logger.isEnabledFor(logging.DEBUG):
762762
logger.debug(f"EXECUTING command {deploy_command}")
763-
proc = await asyncio.create_subprocess_exec(*shlex.split(deploy_command))
764-
await proc.wait()
763+
proc = await asyncio.create_subprocess_exec(
764+
*shlex.split(deploy_command),
765+
stderr=asyncio.subprocess.STDOUT,
766+
stdout=asyncio.subprocess.PIPE,
767+
)
768+
stdout, _ = await proc.communicate()
769+
if proc.returncode != 0:
770+
raise WorkflowExecutionException(
771+
f"FAILED Deployment of {self.deployment_name} environment:\n\t{stdout.decode().strip()}"
772+
)
765773

766774
@cachedmethod(lambda self: self.locationsCache)
767775
async def get_available_locations(
@@ -771,24 +779,30 @@ async def get_available_locations(
771779
output_directory: str | None = None,
772780
tmp_directory: str | None = None,
773781
) -> MutableMapping[str, AvailableLocation]:
774-
ps_command = self.base_command() + "".join(["ps ", service or ""])
782+
ps_command = self.base_command() + "".join(
783+
["ps ", "--format ", "json ", service or ""]
784+
)
775785
if logger.isEnabledFor(logging.DEBUG):
776786
logger.debug(f"EXECUTING command {ps_command}")
777787
proc = await asyncio.create_subprocess_exec(
778788
*shlex.split(ps_command),
779789
stdout=asyncio.subprocess.PIPE,
780-
stderr=asyncio.subprocess.PIPE,
790+
stderr=asyncio.subprocess.STDOUT,
781791
)
782792
stdout, _ = await proc.communicate()
783-
lines = (line for line in stdout.decode().strip().split("\n"))
784-
locations = {}
785-
for line in lines:
786-
if line.startswith("---------"):
787-
break
788-
for line in lines:
789-
location_name = line.split()[0].strip()
790-
locations[location_name] = await self._get_location(location_name)
791-
return locations
793+
locations = json.loads(stdout.decode().strip())
794+
return {
795+
loc["Name"]: v
796+
for loc, v in zip(
797+
locations,
798+
await asyncio.gather(
799+
*(
800+
asyncio.create_task(self._get_location(location["Name"]))
801+
for location in locations
802+
)
803+
),
804+
)
805+
}
792806

793807
@classmethod
794808
def get_schema(cls) -> str:
@@ -804,8 +818,16 @@ async def undeploy(self, external: bool) -> None:
804818
)
805819
if logger.isEnabledFor(logging.DEBUG):
806820
logger.debug(f"EXECUTING command {undeploy_command}")
807-
proc = await asyncio.create_subprocess_exec(*shlex.split(undeploy_command))
808-
await proc.wait()
821+
proc = await asyncio.create_subprocess_exec(
822+
*shlex.split(undeploy_command),
823+
stderr=asyncio.subprocess.STDOUT,
824+
stdout=asyncio.subprocess.PIPE,
825+
)
826+
stdout, _ = await proc.communicate()
827+
if proc.returncode != 0:
828+
raise WorkflowExecutionException(
829+
f"FAILED Undeployment of {self.deployment_name} environment:\n\t{stdout.decode().strip()}"
830+
)
809831

810832

811833
class SingularityBaseConnector(ContainerConnector, ABC):
@@ -1073,7 +1095,7 @@ async def undeploy(self, external: bool) -> None:
10731095
proc = await asyncio.create_subprocess_exec(
10741096
*shlex.split(undeploy_command),
10751097
stdout=asyncio.subprocess.PIPE,
1076-
stderr=asyncio.subprocess.PIPE,
1098+
stderr=asyncio.subprocess.STDOUT,
10771099
)
1078-
await proc.wait()
1100+
stdout, _ = await proc.communicate()
10791101
self.instanceNames = []

streamflow/deployment/connector/kubernetes.py

Lines changed: 69 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,36 @@ async def _get_helm_version():
6464

6565

6666
class KubernetesResponseWrapper(BaseStreamWrapper):
67+
def __init__(self, stream):
68+
super().__init__(stream)
69+
self.msg: bytes = bytes()
70+
6771
async def read(self, size: int | None = None):
68-
while not self.stream.closed:
72+
if len(self.msg) > 0:
73+
if len(self.msg) > size:
74+
data = self.msg[0:size]
75+
self.msg = self.msg[size:]
76+
return data
77+
else:
78+
data = self.msg
79+
size -= len(self.msg)
80+
self.msg = bytes()
81+
else:
82+
data = bytes()
83+
while size > 0 and not self.stream.closed:
6984
async for msg in self.stream:
7085
channel = msg.data[0]
71-
data = msg.data[1:]
72-
if data and channel == ws_client.STDOUT_CHANNEL:
73-
return data
74-
return None
86+
self.msg = msg.data[1:]
87+
if self.msg and channel == ws_client.STDOUT_CHANNEL:
88+
if len(self.msg) > size:
89+
data += self.msg[0:size]
90+
self.msg = self.msg[size:]
91+
return data
92+
else:
93+
data += self.msg
94+
size -= len(self.msg)
95+
self.msg = bytes()
96+
return data if len(data) > 0 else None
7597

7698
async def write(self, data: Any):
7799
channel_prefix = bytes(chr(ws_client.STDIN_CHANNEL), "ascii")
@@ -116,7 +138,7 @@ def __init__(
116138
)
117139
self.inCluster = inCluster
118140
self.kubeconfig = (
119-
kubeconfig
141+
str(Path(kubeconfig).expanduser())
120142
if kubeconfig is not None
121143
else os.path.join(str(Path.home()), ".kube", "config")
122144
)
@@ -207,33 +229,18 @@ async def _copy_local_to_remote_single(
207229
async def _copy_remote_to_local(
208230
self, src: str, dst: str, location: Location, read_only: bool = False
209231
):
210-
pod, container = location.name.split(":")
211-
command = ["tar", "chf", "-", "-C", "/", posixpath.relpath(src, "/")]
212-
# noinspection PyUnresolvedReferences
213-
response = await self.client_ws.connect_get_namespaced_pod_exec(
214-
name=pod,
215-
namespace=self.namespace or "default",
216-
container=container,
217-
command=command,
218-
stderr=True,
219-
stdin=False,
220-
stdout=True,
221-
tty=False,
222-
_preload_content=False,
223-
)
224-
try:
225-
async with aiotarstream.open(
226-
stream=KubernetesResponseWrapper(response),
227-
mode="r",
228-
copybufsize=self.transferBufferSize,
229-
) as tar:
230-
await extract_tar_stream(tar, src, dst, self.transferBufferSize)
231-
except tarfile.TarError as e:
232-
raise WorkflowExecutionException(
233-
f"Error copying {src} from location {location} to {dst}: {e}"
234-
) from e
235-
finally:
236-
await response.close()
232+
async with self._get_stream_reader(location, src) as reader:
233+
try:
234+
async with aiotarstream.open(
235+
stream=reader,
236+
mode="r",
237+
copybufsize=self.transferBufferSize,
238+
) as tar:
239+
await extract_tar_stream(tar, src, dst, self.transferBufferSize)
240+
except tarfile.TarError as e:
241+
raise WorkflowExecutionException(
242+
f"Error copying {src} from location {location} to {dst}: {e}"
243+
) from e
237244

238245
async def _copy_remote_to_remote(
239246
self,
@@ -277,9 +284,9 @@ async def _copy_remote_to_remote(
277284
namespace=self.namespace or "default",
278285
container=location.name.split(":")[1],
279286
command=write_command,
280-
stderr=True,
281-
stdin=False,
282-
stdout=True,
287+
stderr=False,
288+
stdin=True,
289+
stdout=False,
283290
tty=False,
284291
_preload_content=False,
285292
),
@@ -383,9 +390,9 @@ def _get_stream_reader(self, location: Location, src: str) -> StreamWrapperConte
383390
namespace=self.namespace or "default",
384391
container=container,
385392
command=["tar", "chf", "-", "-C", dirname, basename],
386-
stderr=False,
387-
stdin=True,
388-
stdout=False,
393+
stderr=True,
394+
stdin=False,
395+
stdout=True,
389396
tty=False,
390397
_preload_content=False,
391398
),
@@ -565,20 +572,20 @@ def __init__(
565572
self.stringValues: MutableSequence[str] | None = stringValues
566573
self.skipCrds: bool = skipCrds
567574
self.registryConfig = (
568-
registryConfig
575+
str(Path(registryConfig).expanduser())
569576
if registryConfig is not None
570577
else os.path.join(str(Path.home()), ".config/helm/registry.json")
571578
)
572579
self.releaseName: str = (
573580
releaseName if releaseName is not None else f"release-{uuid.uuid1()}"
574581
)
575582
self.repositoryCache = (
576-
repositoryCache
583+
str(Path(repositoryCache).expanduser())
577584
if repositoryCache is not None
578585
else os.path.join(str(Path.home()), ".cache/helm/repository")
579586
)
580587
self.repositoryConfig = (
581-
repositoryConfig
588+
str(Path(repositoryConfig).expanduser())
582589
if repositoryConfig is not None
583590
else os.path.join(str(Path.home()), ".config/helm/repositories.yaml")
584591
)
@@ -589,7 +596,7 @@ def __init__(
589596
self.chartVersion: str | None = chartVersion
590597
self.wait: bool = wait
591598

592-
def base_command(self) -> str:
599+
def _get_base_command(self) -> str:
593600
return "".join(
594601
[
595602
"helm ",
@@ -616,7 +623,7 @@ async def deploy(self, external: bool) -> None:
616623
f"Helm {version} is not compatible with Helm3Connector"
617624
)
618625
# Deploy Helm charts
619-
deploy_command = self.base_command() + "".join(
626+
deploy_command = self._get_base_command() + "".join(
620627
[
621628
"install ",
622629
self.get_option("atomic", self.atomic),
@@ -649,10 +656,14 @@ async def deploy(self, external: bool) -> None:
649656
logger.debug(f"EXECUTING {deploy_command}")
650657
proc = await asyncio.create_subprocess_exec(
651658
*shlex.split(deploy_command),
652-
stderr=asyncio.subprocess.DEVNULL,
653-
stdout=asyncio.subprocess.DEVNULL,
659+
stderr=asyncio.subprocess.STDOUT,
660+
stdout=asyncio.subprocess.PIPE,
654661
)
655-
await proc.wait()
662+
stdout, _ = await proc.communicate()
663+
if proc.returncode != 0:
664+
raise WorkflowExecutionException(
665+
f"FAILED Deployment of {self.deployment_name} environment:\n\t{stdout.decode().strip()}"
666+
)
656667

657668
@cachedmethod(lambda self: self.locationsCache)
658669
async def get_available_locations(
@@ -698,7 +709,7 @@ def get_schema(cls) -> str:
698709
async def undeploy(self, external: bool) -> None:
699710
if not external:
700711
# Undeploy
701-
undeploy_command = self.base_command() + "".join(
712+
undeploy_command = self._get_base_command() + "".join(
702713
[
703714
"uninstall ",
704715
self.get_option("keep-history", self.keepHistory),
@@ -709,7 +720,15 @@ async def undeploy(self, external: bool) -> None:
709720
)
710721
if logger.isEnabledFor(logging.DEBUG):
711722
logger.debug(f"EXECUTING {undeploy_command}")
712-
proc = await asyncio.create_subprocess_exec(*shlex.split(undeploy_command))
713-
await proc.wait()
723+
proc = await asyncio.create_subprocess_exec(
724+
*shlex.split(undeploy_command),
725+
stderr=asyncio.subprocess.STDOUT,
726+
stdout=asyncio.subprocess.PIPE,
727+
)
728+
stdout, _ = await proc.communicate()
729+
if proc.returncode != 0:
730+
raise WorkflowExecutionException(
731+
f"FAILED Undeployment of {self.deployment_name} environment:\n\t{stdout.decode().strip()}"
732+
)
714733
# Close connections
715734
await super().undeploy(external)

streamflow/scheduling/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def _allocate_job(
7878
else:
7979
logger.debug(
8080
f"Job {job.name} allocated on locations "
81-
", ".join([str(loc) for loc in selected_locations])
81+
f"{', '.join([str(loc) for loc in selected_locations])}"
8282
)
8383
self.job_allocations[job.name] = JobAllocation(
8484
job=job.name,

0 commit comments

Comments
 (0)