Skip to content

Commit 9d60d94

Browse files
Fix StreamFlow MPI example
Prior to this commit, the MPI example was not working for these reasons: - When scheduling a job on multiple locations the STREAMFLOW_HOSTS variable was not correctly populated; - Helm connector had some issues with file transfers and tar streaming; - DockerCompose connector had issues with parsing available locations. All these issues are solved by this fix, which also fixes #82.
1 parent 5961dbb commit 9d60d94

File tree

8 files changed

+119
-81
lines changed

8 files changed

+119
-81
lines changed

examples/mpi/streamflow.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,6 @@ deployments:
2828
type: helm
2929
config:
3030
chart: environment/helm/openmpi
31-
kubeconfig: /home/glassofwhiskey/.kube/config-streamflow
31+
kubeconfig: ~/.kube/config-streamflow
3232
releaseName: openmpi-rel
33+
workdir: /tmp

streamflow/core/scheduling.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,13 @@ def __init__(
129129
self,
130130
job: str,
131131
target: Target,
132-
locations: MutableSequence[Location],
132+
locations: MutableSequence[AvailableLocation],
133133
status: Status,
134134
hardware: Hardware,
135135
):
136136
self.job: str = job
137137
self.target: Target = target
138-
self.locations: MutableSequence[Location] = locations
138+
self.locations: MutableSequence[AvailableLocation] = locations
139139
self.status: Status = status
140140
self.hardware: Hardware = hardware
141141

@@ -232,7 +232,7 @@ def get_connector(self, job_name: str) -> Connector | None:
232232

233233
def get_locations(
234234
self, job_name: str, statuses: MutableSequence[Status] | None = None
235-
) -> MutableSequence[Location]:
235+
) -> MutableSequence[AvailableLocation]:
236236
allocation = self.get_allocation(job_name)
237237
return (
238238
allocation.locations

streamflow/cwl/command.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -778,14 +778,9 @@ async def execute(self, job: Job) -> CWLCommandOutput:
778778
]
779779
# If step is assigned to multiple locations, add the STREAMFLOW_HOSTS environment variable
780780
if len(locations) > 1:
781-
service = self.step.workflow.context.scheduler.get_service(job.name)
782-
available_locations = await connector.get_available_locations(
783-
service=service
781+
parsed_env["STREAMFLOW_HOSTS"] = ",".join(
782+
[loc.hostname for loc in locations]
784783
)
785-
hosts = {
786-
k: v.hostname for k, v in available_locations.items() if k in locations
787-
}
788-
parsed_env["STREAMFLOW_HOSTS"] = ",".join(hosts.values())
789784
# Process streams
790785
stdin = utils.eval_expression(
791786
expression=self.stdin,

streamflow/cwl/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
def _parse_arg(path: str, context: StreamFlowContext):
2121
if "://" in path:
2222
return path
23-
elif os.path.isabs(path):
23+
elif not os.path.isabs(path):
2424
return os.path.join(os.path.dirname(context.config["path"]), path)
2525
else:
2626
return path

streamflow/cwl/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ async def _register_path(
189189
):
190190
data_location = data_locations[0]
191191
else:
192-
raise WorkflowExecutionException(f"Error registering path {path}")
192+
return None
193193
link_location = context.data_manager.register_path(
194194
location=location,
195195
path=path,

streamflow/deployment/connector/container.py

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -760,8 +760,16 @@ async def deploy(self, external: bool) -> None:
760760
)
761761
if logger.isEnabledFor(logging.DEBUG):
762762
logger.debug(f"EXECUTING command {deploy_command}")
763-
proc = await asyncio.create_subprocess_exec(*shlex.split(deploy_command))
764-
await proc.wait()
763+
proc = await asyncio.create_subprocess_exec(
764+
*shlex.split(deploy_command),
765+
stderr=asyncio.subprocess.STDOUT,
766+
stdout=asyncio.subprocess.PIPE,
767+
)
768+
stdout, _ = await proc.communicate()
769+
if proc.returncode != 0:
770+
raise WorkflowExecutionException(
771+
f"FAILED Deployment of {self.deployment_name} environment:\n\t{stdout.decode().strip()}"
772+
)
765773

766774
@cachedmethod(lambda self: self.locationsCache)
767775
async def get_available_locations(
@@ -771,24 +779,30 @@ async def get_available_locations(
771779
output_directory: str | None = None,
772780
tmp_directory: str | None = None,
773781
) -> MutableMapping[str, AvailableLocation]:
774-
ps_command = self.base_command() + "".join(["ps ", service or ""])
782+
ps_command = self.base_command() + "".join(
783+
["ps ", "--format ", "json ", service or ""]
784+
)
775785
if logger.isEnabledFor(logging.DEBUG):
776786
logger.debug(f"EXECUTING command {ps_command}")
777787
proc = await asyncio.create_subprocess_exec(
778788
*shlex.split(ps_command),
779789
stdout=asyncio.subprocess.PIPE,
780-
stderr=asyncio.subprocess.PIPE,
790+
stderr=asyncio.subprocess.STDOUT,
781791
)
782792
stdout, _ = await proc.communicate()
783-
lines = (line for line in stdout.decode().strip().split("\n"))
784-
locations = {}
785-
for line in lines:
786-
if line.startswith("---------"):
787-
break
788-
for line in lines:
789-
location_name = line.split()[0].strip()
790-
locations[location_name] = await self._get_location(location_name)
791-
return locations
793+
locations = json.loads(stdout.decode().strip())
794+
return {
795+
l["Name"]: v
796+
for l, v in zip(
797+
locations,
798+
await asyncio.gather(
799+
*(
800+
asyncio.create_task(self._get_location(loc["Name"]))
801+
for loc in locations
802+
)
803+
),
804+
)
805+
}
792806

793807
@classmethod
794808
def get_schema(cls) -> str:
@@ -804,8 +818,16 @@ async def undeploy(self, external: bool) -> None:
804818
)
805819
if logger.isEnabledFor(logging.DEBUG):
806820
logger.debug(f"EXECUTING command {undeploy_command}")
807-
proc = await asyncio.create_subprocess_exec(*shlex.split(undeploy_command))
808-
await proc.wait()
821+
proc = await asyncio.create_subprocess_exec(
822+
*shlex.split(undeploy_command),
823+
stderr=asyncio.subprocess.STDOUT,
824+
stdout=asyncio.subprocess.PIPE,
825+
)
826+
stdout, _ = await proc.communicate()
827+
if proc.returncode != 0:
828+
raise WorkflowExecutionException(
829+
f"FAILED Undeployment of {self.deployment_name} environment:\n\t{stdout.decode().strip()}"
830+
)
809831

810832

811833
class SingularityBaseConnector(ContainerConnector, ABC):
@@ -1073,7 +1095,7 @@ async def undeploy(self, external: bool) -> None:
10731095
proc = await asyncio.create_subprocess_exec(
10741096
*shlex.split(undeploy_command),
10751097
stdout=asyncio.subprocess.PIPE,
1076-
stderr=asyncio.subprocess.PIPE,
1098+
stderr=asyncio.subprocess.STDOUT,
10771099
)
1078-
await proc.wait()
1100+
stdout, _ = await proc.communicate()
10791101
self.instanceNames = []

streamflow/deployment/connector/kubernetes.py

Lines changed: 70 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,37 @@ async def _get_helm_version():
6464

6565

6666
class KubernetesResponseWrapper(BaseStreamWrapper):
67+
68+
def __init__(self, stream):
69+
super().__init__(stream)
70+
self.msg: bytes = bytes()
71+
6772
async def read(self, size: int | None = None):
68-
while not self.stream.closed:
73+
if len(self.msg) > 0:
74+
if len(self.msg) > size:
75+
data = self.msg[0:size]
76+
self.msg = self.msg[size:]
77+
return data
78+
else:
79+
data = self.msg
80+
size -= len(self.msg)
81+
self.msg = bytes()
82+
else:
83+
data = bytes()
84+
while size > 0 and not self.stream.closed:
6985
async for msg in self.stream:
7086
channel = msg.data[0]
71-
data = msg.data[1:]
72-
if data and channel == ws_client.STDOUT_CHANNEL:
73-
return data
74-
return None
87+
self.msg = msg.data[1:]
88+
if self.msg and channel == ws_client.STDOUT_CHANNEL:
89+
if len(self.msg) > size:
90+
data += self.msg[0:size]
91+
self.msg = self.msg[size:]
92+
return data
93+
else:
94+
data += self.msg
95+
size -= len(self.msg)
96+
self.msg = bytes()
97+
return data if len(data) > 0 else None
7598

7699
async def write(self, data: Any):
77100
channel_prefix = bytes(chr(ws_client.STDIN_CHANNEL), "ascii")
@@ -116,7 +139,7 @@ def __init__(
116139
)
117140
self.inCluster = inCluster
118141
self.kubeconfig = (
119-
kubeconfig
142+
str(Path(kubeconfig).expanduser())
120143
if kubeconfig is not None
121144
else os.path.join(str(Path.home()), ".kube", "config")
122145
)
@@ -207,33 +230,18 @@ async def _copy_local_to_remote_single(
207230
async def _copy_remote_to_local(
208231
self, src: str, dst: str, location: Location, read_only: bool = False
209232
):
210-
pod, container = location.name.split(":")
211-
command = ["tar", "chf", "-", "-C", "/", posixpath.relpath(src, "/")]
212-
# noinspection PyUnresolvedReferences
213-
response = await self.client_ws.connect_get_namespaced_pod_exec(
214-
name=pod,
215-
namespace=self.namespace or "default",
216-
container=container,
217-
command=command,
218-
stderr=True,
219-
stdin=False,
220-
stdout=True,
221-
tty=False,
222-
_preload_content=False,
223-
)
224-
try:
225-
async with aiotarstream.open(
226-
stream=KubernetesResponseWrapper(response),
227-
mode="r",
228-
copybufsize=self.transferBufferSize,
229-
) as tar:
230-
await extract_tar_stream(tar, src, dst, self.transferBufferSize)
231-
except tarfile.TarError as e:
232-
raise WorkflowExecutionException(
233-
f"Error copying {src} from location {location} to {dst}: {e}"
234-
) from e
235-
finally:
236-
await response.close()
233+
async with self._get_stream_reader(location, src) as reader:
234+
try:
235+
async with aiotarstream.open(
236+
stream=reader,
237+
mode="r",
238+
copybufsize=self.transferBufferSize,
239+
) as tar:
240+
await extract_tar_stream(tar, src, dst, self.transferBufferSize)
241+
except tarfile.TarError as e:
242+
raise WorkflowExecutionException(
243+
f"Error copying {src} from location {location} to {dst}: {e}"
244+
) from e
237245

238246
async def _copy_remote_to_remote(
239247
self,
@@ -277,9 +285,9 @@ async def _copy_remote_to_remote(
277285
namespace=self.namespace or "default",
278286
container=location.name.split(":")[1],
279287
command=write_command,
280-
stderr=True,
281-
stdin=False,
282-
stdout=True,
288+
stderr=False,
289+
stdin=True,
290+
stdout=False,
283291
tty=False,
284292
_preload_content=False,
285293
),
@@ -383,9 +391,9 @@ def _get_stream_reader(self, location: Location, src: str) -> StreamWrapperConte
383391
namespace=self.namespace or "default",
384392
container=container,
385393
command=["tar", "chf", "-", "-C", dirname, basename],
386-
stderr=False,
387-
stdin=True,
388-
stdout=False,
394+
stderr=True,
395+
stdin=False,
396+
stdout=True,
389397
tty=False,
390398
_preload_content=False,
391399
),
@@ -565,20 +573,20 @@ def __init__(
565573
self.stringValues: MutableSequence[str] | None = stringValues
566574
self.skipCrds: bool = skipCrds
567575
self.registryConfig = (
568-
registryConfig
576+
str(Path(registryConfig).expanduser())
569577
if registryConfig is not None
570578
else os.path.join(str(Path.home()), ".config/helm/registry.json")
571579
)
572580
self.releaseName: str = (
573581
releaseName if releaseName is not None else f"release-{uuid.uuid1()}"
574582
)
575583
self.repositoryCache = (
576-
repositoryCache
584+
str(Path(repositoryCache).expanduser())
577585
if repositoryCache is not None
578586
else os.path.join(str(Path.home()), ".cache/helm/repository")
579587
)
580588
self.repositoryConfig = (
581-
repositoryConfig
589+
str(Path(repositoryConfig).expanduser())
582590
if repositoryConfig is not None
583591
else os.path.join(str(Path.home()), ".config/helm/repositories.yaml")
584592
)
@@ -589,7 +597,7 @@ def __init__(
589597
self.chartVersion: str | None = chartVersion
590598
self.wait: bool = wait
591599

592-
def base_command(self) -> str:
600+
def _get_base_command(self) -> str:
593601
return "".join(
594602
[
595603
"helm ",
@@ -616,7 +624,7 @@ async def deploy(self, external: bool) -> None:
616624
f"Helm {version} is not compatible with Helm3Connector"
617625
)
618626
# Deploy Helm charts
619-
deploy_command = self.base_command() + "".join(
627+
deploy_command = self._get_base_command() + "".join(
620628
[
621629
"install ",
622630
self.get_option("atomic", self.atomic),
@@ -649,10 +657,14 @@ async def deploy(self, external: bool) -> None:
649657
logger.debug(f"EXECUTING {deploy_command}")
650658
proc = await asyncio.create_subprocess_exec(
651659
*shlex.split(deploy_command),
652-
stderr=asyncio.subprocess.DEVNULL,
653-
stdout=asyncio.subprocess.DEVNULL,
660+
stderr=asyncio.subprocess.STDOUT,
661+
stdout=asyncio.subprocess.PIPE,
654662
)
655-
await proc.wait()
663+
stdout, _ = await proc.communicate()
664+
if proc.returncode != 0:
665+
raise WorkflowExecutionException(
666+
f"FAILED Deployment of {self.deployment_name} environment:\n\t{stdout.decode().strip()}"
667+
)
656668

657669
@cachedmethod(lambda self: self.locationsCache)
658670
async def get_available_locations(
@@ -698,7 +710,7 @@ def get_schema(cls) -> str:
698710
async def undeploy(self, external: bool) -> None:
699711
if not external:
700712
# Undeploy
701-
undeploy_command = self.base_command() + "".join(
713+
undeploy_command = self._get_base_command() + "".join(
702714
[
703715
"uninstall ",
704716
self.get_option("keep-history", self.keepHistory),
@@ -709,7 +721,15 @@ async def undeploy(self, external: bool) -> None:
709721
)
710722
if logger.isEnabledFor(logging.DEBUG):
711723
logger.debug(f"EXECUTING {undeploy_command}")
712-
proc = await asyncio.create_subprocess_exec(*shlex.split(undeploy_command))
713-
await proc.wait()
724+
proc = await asyncio.create_subprocess_exec(
725+
*shlex.split(undeploy_command),
726+
stderr=asyncio.subprocess.STDOUT,
727+
stdout=asyncio.subprocess.PIPE,
728+
)
729+
stdout, _ = await proc.communicate()
730+
if proc.returncode != 0:
731+
raise WorkflowExecutionException(
732+
f"FAILED Undeployment of {self.deployment_name} environment:\n\t{stdout.decode().strip()}"
733+
)
714734
# Close connections
715735
await super().undeploy(external)

streamflow/scheduling/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def _allocate_job(
7878
else:
7979
logger.debug(
8080
f"Job {job.name} allocated on locations "
81-
", ".join([str(loc) for loc in selected_locations])
81+
f"{', '.join([str(loc) for loc in selected_locations])}"
8282
)
8383
self.job_allocations[job.name] = JobAllocation(
8484
job=job.name,

0 commit comments

Comments
 (0)