Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions clarifai/cli/templates/pipeline_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ def get_pipeline_config_template(
- name: sequence
steps:
{steps_yaml}
# Optional: Define secrets for pipeline steps
# config:
# step_version_secrets:
# step-0:
# API_KEY: users/{user_id}/secrets/my-api-key
# DB_PASSWORD: users/{user_id}/secrets/db-secret
# step-1:
# EMAIL_TOKEN: users/{user_id}/secrets/email-token
"""


Expand Down
162 changes: 162 additions & 0 deletions clarifai/client/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,165 @@ def _display_new_logs(self, run_id: str, seen_logs: set, current_page: int = 1)
logger.debug(f"Error fetching logs: {e}")
# Return current page on error to retry the same page next fetch
return current_page

def get_pipeline_version(self, pipeline_version_id: Optional[str] = None) -> Dict:
"""Get pipeline version details including step secrets.

Args:
pipeline_version_id (Optional[str]): The pipeline version ID. If None, uses self.pipeline_version_id.

Returns:
Dict: Pipeline version information including step_version_secrets if configured.
"""
version_id = pipeline_version_id or self.pipeline_version_id
if not version_id:
raise UserError("pipeline_version_id is required")

request = service_pb2.GetPipelineVersionRequest()
request.user_app_id.CopyFrom(self.user_app_id)
request.pipeline_id = self.pipeline_id
request.pipeline_version_id = version_id

response = self.STUB.GetPipelineVersion(request, metadata=self.auth_helper.metadata)

if response.status.code != status_code_pb2.StatusCode.SUCCESS:
raise UserError(
f"Failed to get pipeline version: {response.status.description}. "
f"Details: {response.status.details}"
)

return json_format.MessageToDict(
response.pipeline_version, preserving_proto_field_name=True
)

def create_pipeline_version(
self,
orchestration_spec: Dict,
step_version_secrets: Optional[Dict[str, Dict[str, str]]] = None,
description: Optional[str] = None,
) -> str:
"""Create a new pipeline version with optional step secrets.

Note: This creates a new version by patching the pipeline with a new version.

Args:
orchestration_spec (Dict): The orchestration specification for the pipeline.
step_version_secrets (Optional[Dict[str, Dict[str, str]]]): Map of step references to their secrets.
Format: {step_ref: {secret_name: secret_path}}
Example: {"step-0": {"API_KEY": "users/user123/secrets/my-api-key"}}
description (Optional[str]): Description for the pipeline version.

Returns:
str: The created pipeline version ID.
"""
pipeline_version = resources_pb2.PipelineVersion()
if description:
pipeline_version.description = description

# Set orchestration spec
if "argo_orchestration_spec" in orchestration_spec:
argo_spec_str = orchestration_spec["argo_orchestration_spec"]
import yaml

argo_spec = yaml.safe_load(argo_spec_str)
api_version = argo_spec.get("apiVersion", "argoproj.io/v1alpha1")

orchestration_spec_proto = resources_pb2.OrchestrationSpec()
argo_orchestration_spec_proto = resources_pb2.ArgoOrchestrationSpec()
argo_orchestration_spec_proto.api_version = api_version
import json

argo_orchestration_spec_proto.spec_json = json.dumps(argo_spec)

orchestration_spec_proto.argo_orchestration_spec.CopyFrom(
argo_orchestration_spec_proto
)
pipeline_version.orchestration_spec.CopyFrom(orchestration_spec_proto)

# Add step_version_secrets if provided
if step_version_secrets:
for step_ref, secrets in step_version_secrets.items():
if not secrets:
continue
step_secret_config = resources_pb2.StepSecretConfig()
for secret_name, secret_path in secrets.items():
step_secret_config.secrets[secret_name] = secret_path
pipeline_version.config.step_version_secrets[step_ref].CopyFrom(step_secret_config)

# Make the API call using PatchPipelineVersions
# This creates a new version for an existing pipeline
request = service_pb2.PatchPipelineVersionsRequest()
request.user_app_id.CopyFrom(self.user_app_id)
request.pipeline_id = self.pipeline_id
request.pipeline_versions.append(pipeline_version)
request.action = "overwrite" # Create a new version

response = self.STUB.PatchPipelineVersions(request, metadata=self.auth_helper.metadata)

if response.status.code != status_code_pb2.StatusCode.SUCCESS:
raise UserError(
f"Failed to create pipeline version: {response.status.description}. "
f"Details: {response.status.details}"
)

if not response.pipeline_versions:
raise UserError("No pipeline version was created")

created_version = response.pipeline_versions[0]
logger.info(f"Created pipeline version: {created_version.id}")
return created_version.id

def add_step_secret(
self,
step_ref: str,
secret_name: str,
secret_ref: str,
pipeline_version_id: Optional[str] = None,
) -> None:
"""Add a secret to a specific pipeline step.

Args:
step_ref (str): The step reference (e.g., "step-0", "step-1").
secret_name (str): The name of the secret environment variable.
secret_ref (str): The secret reference path (e.g., "users/user123/secrets/my-api-key").
pipeline_version_id (Optional[str]): The pipeline version ID. If None, uses self.pipeline_version_id.

Note:
This is a convenience method. For production use, manage secrets via the config.yaml
orchestration spec and use the pipeline upload command.
"""
raise NotImplementedError(
"Adding secrets to existing pipeline versions is not supported. "
"Please define step secrets in your config.yaml orchestration spec "
"and use 'clarifai pipeline upload' to create a new pipeline version."
)

def list_step_secrets(
self, step_ref: Optional[str] = None, pipeline_version_id: Optional[str] = None
) -> Dict[str, Dict[str, str]]:
"""List secrets configured for pipeline steps.

Args:
step_ref (Optional[str]): If provided, only return secrets for this step.
pipeline_version_id (Optional[str]): The pipeline version ID. If None, uses self.pipeline_version_id.

Returns:
Dict[str, Dict[str, str]]: Map of step references to their secrets.
Format: {step_ref: {secret_name: secret_path}}
"""
version_data = self.get_pipeline_version(pipeline_version_id)
config = version_data.get("config", {})
step_version_secrets = config.get("step_version_secrets", {})

if step_ref:
# Return only the specified step's secrets
# Proto response has nested 'secrets' field in StepSecretConfig
return {step_ref: step_version_secrets.get(step_ref, {}).get("secrets", {})}

# Return all step secrets
# Extract the 'secrets' dict from each step's StepSecretConfig
result = {}
for step, step_config in step_version_secrets.items():
result[step] = step_config.get("secrets", {})
return result
return result
49 changes: 49 additions & 0 deletions clarifai/runners/pipelines/pipeline_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@
}
}

# Include step_version_secrets if present in pipeline config (not orchestration_spec)
step_version_secrets = pipeline_config.get("config", {}).get("step_version_secrets", {})
if step_version_secrets:
if "config" not in lockfile_data["pipeline"]:
lockfile_data["pipeline"]["config"] = {}
lockfile_data["pipeline"]["config"]["step_version_secrets"] = step_version_secrets

return lockfile_data

def update_lockfile_with_pipeline_info(
Expand Down Expand Up @@ -246,6 +253,13 @@
}
}

# Include step_version_secrets if present in pipeline config (not orchestration_spec)
step_version_secrets = pipeline_config.get("config", {}).get("step_version_secrets", {})
if step_version_secrets:
if "config" not in lockfile_data["pipeline"]:
lockfile_data["pipeline"]["config"] = {}
lockfile_data["pipeline"]["config"]["step_version_secrets"] = step_version_secrets

return lockfile_data

def save_lockfile(self, lockfile_data: Dict[str, Any], lockfile_path: str = None) -> None:
Expand Down Expand Up @@ -362,6 +376,34 @@

return None

def _add_step_version_secrets(
self, pipeline_version: resources_pb2.PipelineVersion, step_version_secrets: Dict[str, Any]
) -> None:
"""Add step_version_secrets to the pipeline version config.

Args:
pipeline_version: The PipelineVersion proto to update
step_version_secrets: Dictionary mapping step references to their secret configs
Format: {step_ref: {secret_name: secret_path}}
"""
logger.debug(f"Processing step version secrets for {len(step_version_secrets)} steps")

for step_ref, step_config in step_version_secrets.items():
# Note: 'step_config' contains the secret mappings directly (not nested under 'secrets')
# Secret references are like "users/user123/secrets/my-api-key"
if not step_config:
logger.debug(f"No secret references found for step {step_ref}, skipping")

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.

Copilot Autofix

AI 2 days ago

To fix this problem, avoid logging potentially sensitive information such as secret references or their paths. Instead of logging the entire step_ref, use a generic message or obfuscate the reference (e.g., only log the existence of a skipped step, log a hash, or mask the reference). Change line 395 inside method _add_step_version_secrets to avoid outputting the value of step_ref in cleartext. For example, replace the message with a generic "A step contains no secret references and will be skipped" or, if logging is required, sufficiently redact or hash the reference string. No additional imports are required—the logging library and framework is already set up.

Suggested changeset 1
clarifai/runners/pipelines/pipeline_builder.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/clarifai/runners/pipelines/pipeline_builder.py b/clarifai/runners/pipelines/pipeline_builder.py
--- a/clarifai/runners/pipelines/pipeline_builder.py
+++ b/clarifai/runners/pipelines/pipeline_builder.py
@@ -392,7 +392,7 @@
             # Note: 'step_config' contains the secret mappings directly (not nested under 'secrets')
             # Secret references are like "users/user123/secrets/my-api-key"
             if not step_config:
-                logger.debug(f"No secret references found for step {step_ref}, skipping")
+                logger.debug("No secret references found for a step, skipping")
                 continue
 
             # Create StepSecretConfig proto
EOF
@@ -392,7 +392,7 @@
# Note: 'step_config' contains the secret mappings directly (not nested under 'secrets')
# Secret references are like "users/user123/secrets/my-api-key"
if not step_config:
logger.debug(f"No secret references found for step {step_ref}, skipping")
logger.debug("No secret references found for a step, skipping")
continue

# Create StepSecretConfig proto
Copilot is powered by AI and may make mistakes. Always verify output.
continue

# Create StepSecretConfig proto
step_secret_config = resources_pb2.StepSecretConfig()
for secret_name, secret_ref in step_config.items():
step_secret_config.secrets[secret_name] = secret_ref

# Add to pipeline version config
pipeline_version.config.step_version_secrets[step_ref].CopyFrom(step_secret_config)
logger.debug(f"Configured secret references for step {step_ref}")

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.

Copilot Autofix

AI 2 days ago

The best way to fix this issue is to prevent logging of secret references in clear text. In particular, at line 405, we should avoid including step_ref verbatim as part of the debug message. Instead, we can safely log a generic message, such as "Configured secret references for a step" or, if needed for debugging, log only non-sensitive information (e.g., the number of configured secrets, or step indices).

Changes to make:

  • Update line 405 in clarifai/runners/pipelines/pipeline_builder.py to remove or redact the logging of step_ref.
  • If you must log info for debugging, use a generic message (without referencing the specific step_ref value).
  • No new imports or dependencies are needed.

Suggested changeset 1
clarifai/runners/pipelines/pipeline_builder.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/clarifai/runners/pipelines/pipeline_builder.py b/clarifai/runners/pipelines/pipeline_builder.py
--- a/clarifai/runners/pipelines/pipeline_builder.py
+++ b/clarifai/runners/pipelines/pipeline_builder.py
@@ -402,7 +402,7 @@
 
             # Add to pipeline version config
             pipeline_version.config.step_version_secrets[step_ref].CopyFrom(step_secret_config)
-            logger.debug(f"Configured secret references for step {step_ref}")
+            logger.debug("Configured secret references for a step (step reference redacted)")
 
     def create_pipeline(self) -> tuple[bool, str]:
         """Create the pipeline using PostPipelines RPC.
EOF
@@ -402,7 +402,7 @@

# Add to pipeline version config
pipeline_version.config.step_version_secrets[step_ref].CopyFrom(step_secret_config)
logger.debug(f"Configured secret references for step {step_ref}")
logger.debug("Configured secret references for a step (step reference redacted)")

def create_pipeline(self) -> tuple[bool, str]:
"""Create the pipeline using PostPipelines RPC.
Copilot is powered by AI and may make mistakes. Always verify output.

def create_pipeline(self) -> tuple[bool, str]:
"""Create the pipeline using PostPipelines RPC.

Expand Down Expand Up @@ -404,6 +446,13 @@
)
pipeline_version.orchestration_spec.CopyFrom(orchestration_spec_proto)

# Add step_version_secrets if present in pipeline config (not orchestration_spec)
step_version_secrets = pipeline_config.get("config", {}).get(
"step_version_secrets", {}
)
if step_version_secrets:
self._add_step_version_secrets(pipeline_version, step_version_secrets)

pipeline.pipeline_version.CopyFrom(pipeline_version)

# Make the RPC call
Expand Down
Loading
Loading