diff --git a/seqerakit/seqeraplatform.py b/seqerakit/seqeraplatform.py index 2372fd8..1ee5d45 100644 --- a/seqerakit/seqeraplatform.py +++ b/seqerakit/seqeraplatform.py @@ -1,8 +1,9 @@ -import json +import os +import shlex import logging import subprocess -import shlex import re +import json logging.basicConfig(level=logging.DEBUG) @@ -24,90 +25,85 @@ def __init__(self, tw_instance, cmd): self.cmd = cmd def __call__(self, *args, **kwargs): - command = ["tw"] - - # Prepend the CLI args if present command = self.cmd.split() command.extend(args) return self.tw_instance._tw_run(command, **kwargs) # Constructs a new SeqeraPlatform instance def __init__(self, cli_args=None, dryrun=False): - if cli_args and "--verbose" in cli_args: # TODO: remove this + if cli_args and "--verbose" in cli_args: raise ValueError( "--verbose is not supported as a CLI argument to seqerakit." ) - self.cli_args = cli_args if cli_args else [] + self.cli_args = cli_args or [] self.dryrun = dryrun - # Executes a 'tw' command in a subprocess and returns the output. - def _tw_run(self, cmd, *args, **kwargs): - """ - Run a tw command with supplied commands - """ - command = ["tw"] - - # Prepend the CLI args if present - command.extend(self.cli_args) + def _construct_command(self, cmd, *args, **kwargs): + command = ["tw"] + self.cli_args if kwargs.get("to_json"): - to_json = True command.extend(["-o", "json"]) - else: - to_json = False + command.extend(cmd) command.extend(args) - if kwargs.get("config") is not None: - config_path = kwargs["config"] - command.append(f"--config={config_path}") - if "params_file" in kwargs: - params_path = kwargs["params_file"] - command.append(f"--params-file={params_path}") - - full_cmd = " ".join(arg if "$" in arg else shlex.quote(arg) for arg in command) + if kwargs.get("config"): + command.append(f"--config={kwargs['config']}") - # Skip if --dryrun - if self.dryrun: - logging.debug(f" DRYRUN: Running command: {full_cmd}\n") - return - else: - logging.debug(f" Running command: {full_cmd}\n") + if "params_file" in kwargs: + command.append(f"--params-file={kwargs['params_file']}") + + return self._check_env_vars(command) + + # Checks environment variables to see that they are set accordingly + def _check_env_vars(self, command): + full_cmd_parts = [] + for arg in command: + if arg.startswith("$"): + env_var = arg[1:] + if env_var not in os.environ: + logging.error(f" Environment variable '{env_var}' is not set.") + return None # handle as desired + full_cmd_parts.append(os.environ[env_var]) + else: + full_cmd_parts.append(shlex.quote(arg)) + return " ".join(full_cmd_parts) - # Run the command and return the stdout + # Executes a 'tw' command in a subprocess and returns the output. + def _execute_command(self, full_cmd, to_json=False): + logging.debug(f" Running command: {full_cmd}") process = subprocess.Popen( full_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True ) stdout, _ = process.communicate() stdout = stdout.decode("utf-8").strip() - # Error handling for stdout - if stdout: - if re.search( - r"ERROR: .*already (exists|a participant)", stdout, flags=re.IGNORECASE - ): - raise ResourceExistsError( - " Resource already exists and cannot be created." - " Please delete first or if using a YAML, set 'overwrite: true'\n" - ) - elif re.search(r"ERROR: .*", stdout): - raise ResourceCreationError( - f" Resource creation failed with the following error: '{stdout}'.\n" - "Please check your config file and try again.\n" - ) - elif to_json is True: - return json.loads(stdout) - else: - return stdout + if "ERROR: " in stdout: + self._handle_command_errors(stdout) + + return json.loads(stdout) if to_json else stdout + + def _handle_command_errors(self, stdout): + if re.search( + r"ERROR: .*already (exists|a participant)", stdout, flags=re.IGNORECASE + ): + raise ResourceExistsError( + " Resource already exists. Please delete first or set 'overwrite: true'" + ) + raise ResourceCreationError( + f"Resource creation failed: '{stdout}'. Check your config and try again." + ) + + def _tw_run(self, cmd, *args, **kwargs): + full_cmd = self._construct_command(cmd, *args, **kwargs) + if not full_cmd or self.dryrun: + logging.debug(f"DRYRUN: Running command {full_cmd}") + return + return self._execute_command(full_cmd, kwargs.get("to_json")) # Allow any 'tw' subcommand to be called as a method. def __getattr__(self, cmd): - """ - Magic method to allow any 'tw' subcommand to be called as a method. - Returns a TwCommand object that can be called with arguments. - """ - cmd = cmd.replace("_", "-") # replace underscores with hyphens - return self.TwCommand(self, cmd) + return self.TwCommand(self, cmd.replace("_", "-")) class ResourceExistsError(Exception):