diff --git a/charts/restate-operator-helm/templates/rbac.yaml b/charts/restate-operator-helm/templates/rbac.yaml index 6b7afe6..19c59b6 100644 --- a/charts/restate-operator-helm/templates/rbac.yaml +++ b/charts/restate-operator-helm/templates/rbac.yaml @@ -103,6 +103,19 @@ rules: apiGroups: - eks.services.k8s.aws {{- end }} + {{- if .Values.gcpConfigConnector }} + - resources: + - iampolicymembers + verbs: + - get + - list + - watch + - create + - patch + - delete + apiGroups: + - iam.cnrm.cloud.google.com + {{- end }} - resources: - configurations - routes diff --git a/charts/restate-operator-helm/values.yaml b/charts/restate-operator-helm/values.yaml index 03aed7b..b17eeb0 100644 --- a/charts/restate-operator-helm/values.yaml +++ b/charts/restate-operator-helm/values.yaml @@ -14,6 +14,7 @@ serviceAccount: podAnnotations: {} awsPodIdentityAssociationCluster: null +gcpConfigConnector: null clusterDns: null # defaults to "cluster.local" in the operator binary podSecurityContext: diff --git a/release-notes/unreleased/gcp-workload-identity.md b/release-notes/unreleased/gcp-workload-identity.md new file mode 100644 index 0000000..d7a8bad --- /dev/null +++ b/release-notes/unreleased/gcp-workload-identity.md @@ -0,0 +1,49 @@ +# Release Notes: GCP Workload Identity automation via Config Connector + +## New Feature + +### What Changed +The operator now automatically creates Config Connector `IAMPolicyMember` +resources to bind Kubernetes service accounts to GCP service accounts via +Workload Identity. This is triggered when a RestateCluster has the +`iam.gke.io/gcp-service-account` annotation in `serviceAccountAnnotations`. + +The GCP project ID is extracted from the service account email +(`name@PROJECT.iam.gserviceaccount.com`), so no additional configuration +is needed beyond the annotation that the control plane already sets. + +A canary job validates that Workload Identity credentials are available +before allowing the StatefulSet to proceed, preventing Restate from starting +without GCS access. + +This mirrors the existing AWS Pod Identity Association pattern. + +### Why This Matters +Previously, the IAM binding for Workload Identity had to be created manually +via `gcloud` commands each time a new environment was provisioned. This +automates that step, bringing GCP to parity with the existing AWS automation. + +### Impact on Users +- **Existing deployments**: No impact. The feature only activates when + `iam.gke.io/gcp-service-account` is present in `serviceAccountAnnotations`. +- **GCP deployments using this annotation**: Config Connector must be installed + on the GKE cluster. If the `IAMPolicyMember` CRD is not available, the + operator sets a `NotReady` status condition with a clear message rather + than crashing. +- **AWS deployments**: No impact. The canary job infrastructure was refactored + to share code between AWS and GCP, but behavior is unchanged. + +### Migration Guidance +No migration needed. To use this feature: + +1. Install Config Connector on the GKE cluster +2. Ensure the Config Connector service account has `roles/iam.serviceAccountAdmin` + on the target GCP service account +3. Set `serviceAccountAnnotations` on the RestateCluster: + +```yaml +spec: + security: + serviceAccountAnnotations: + iam.gke.io/gcp-service-account: restate@my-project.iam.gserviceaccount.com +``` diff --git a/src/controllers/restatecluster/controller.rs b/src/controllers/restatecluster/controller.rs index 48b1af2..c4bed1e 100644 --- a/src/controllers/restatecluster/controller.rs +++ b/src/controllers/restatecluster/controller.rs @@ -35,6 +35,7 @@ use tokio::{sync::RwLock, time::Duration}; use tracing::*; use crate::controllers::{Diagnostics, State}; +use crate::resources::iampolicymembers::IAMPolicyMember; use crate::resources::podidentityassociations::PodIdentityAssociation; use crate::resources::restateclusters::{ RESTATE_CLUSTER_FINALIZER, RestateCluster, RestateClusterCondition, RestateClusterStatus, @@ -396,10 +397,10 @@ pub async fn run(client: Client, metrics: Metrics, state: State) { security_group_policy_installed, pod_identity_association_installed, secret_provider_class_installed, - ) = api_groups - .groups - .iter() - .fold((false, false, false), |(sgp, pia, spc), group| { + iam_policy_member_installed, + ) = api_groups.groups.iter().fold( + (false, false, false, false), + |(sgp, pia, spc, ipm), group| { fn group_matches>(group: &APIGroup) -> bool { group.name == R::group(&()) && group.versions.iter().any(|v| v.version == R::version(&())) @@ -408,8 +409,10 @@ pub async fn run(client: Client, metrics: Metrics, state: State) { sgp || group_matches::(group), pia || group_matches::(group), spc || group_matches::(group), + ipm || group_matches::(group), ) - }); + }, + ); let rc_api = Api::::all(client.clone()); let ns_api = Api::::all(client.clone()); @@ -421,6 +424,7 @@ pub async fn run(client: Client, metrics: Metrics, state: State) { let cm_api = Api::::all(client.clone()); let np_api = Api::::all(client.clone()); let pia_api = Api::::all(client.clone()); + let ipm_api = Api::::all(client.clone()); let sgp_api = Api::::all(client.clone()); let spc_api = Api::::all(client.clone()); @@ -551,6 +555,27 @@ pub async fn run(client: Client, metrics: Metrics, state: State) { } else { controller }; + let controller = if iam_policy_member_installed { + let ipm_watcher = watcher(ipm_api, cfg.clone()) + .map(ensure_deletion_change) + .touched_objects() + .predicate_filter(changed_predicate.combine(status_predicate)); + + let wi_job_api = Api::::all(client.clone()); + let wi_job_watcher = metadata_watcher( + wi_job_api, + Config::default().labels("app.kubernetes.io/name=restate-wi-canary"), + ) + .map(ensure_deletion_change) + .touched_objects() + .predicate_filter(changed_predicate); + + controller + .owns_stream(ipm_watcher) + .owns_stream(wi_job_watcher) + } else { + controller + }; controller .run( reconcile, diff --git a/src/controllers/restatecluster/reconcilers/compute.rs b/src/controllers/restatecluster/reconcilers/compute.rs index 6ba25b4..cd28561 100644 --- a/src/controllers/restatecluster/reconcilers/compute.rs +++ b/src/controllers/restatecluster/reconcilers/compute.rs @@ -28,6 +28,9 @@ use tracing::{debug, error, trace, warn}; use crate::Error; use crate::controllers::restatecluster::controller::Context; +use crate::resources::iampolicymembers::{ + IAMPolicyMember, IAMPolicyMemberResourceRef, IAMPolicyMemberSpec, +}; use crate::resources::podidentityassociations::{ PodIdentityAssociation, PodIdentityAssociationSpec, }; @@ -501,6 +504,7 @@ pub async fn reconcile_compute( let job_api: Api = Api::namespaced(ctx.client.clone(), name); let pod_api: Api = Api::namespaced(ctx.client.clone(), name); let sgp_api: Api = Api::namespaced(ctx.client.clone(), name); + let ipm_api: Api = Api::namespaced(ctx.client.clone(), name); let pdb_api: Api = Api::namespaced(ctx.client.clone(), name); apply_service_account( @@ -613,6 +617,72 @@ pub async fn reconcile_compute( None | Some(_) => {} } + // GCP Workload Identity via Config Connector IAMPolicyMember + let gcp_sa_email = spec + .security + .as_ref() + .and_then(|s| s.service_account_annotations.as_ref()) + .and_then(|a| a.get("iam.gke.io/gcp-service-account")); + + if let Some(gcp_sa_email) = gcp_sa_email { + let gcp_project = match parse_gcp_project_from_sa_email(gcp_sa_email) { + Some(project) => project, + None => { + return Err(Error::InvalidRestateConfig(format!( + "Cannot parse GCP project from service account email '{gcp_sa_email}'; expected format: name@PROJECT.iam.gserviceaccount.com" + ))); + } + }; + + let ipm = match apply_iam_policy_member( + name, + &ipm_api, + restate_iam_policy_member(name, base_metadata, gcp_project, gcp_sa_email), + ) + .await + { + Ok(ipm) => ipm, + Err(Error::KubeError(kube::Error::Api(kube::error::ErrorResponse { + code: 404, + .. + }))) => { + return Err(Error::NotReady { + reason: "IAMPolicyMemberCRDNotFound".into(), + message: "IAMPolicyMember CRD not found - is Config Connector installed?" + .into(), + requeue_after: Some(Duration::from_secs(60)), + }); + } + Err(err) => return Err(err), + }; + + if !is_iam_policy_member_ready(&ipm) { + return Err(Error::NotReady { + reason: "IAMPolicyMemberNotReady".into(), + message: "Waiting for Config Connector to provision the IAM policy member binding" + .into(), + requeue_after: None, + }); + } + + check_workload_identity( + name, + base_metadata, + spec.compute.tolerations.as_ref(), + &job_api, + ) + .await?; + + let pod_annotations = pod_annotations.get_or_insert_with(Default::default); + pod_annotations.insert( + "restate.dev/gcp-service-account".into(), + gcp_sa_email.to_owned(), + ); + } else { + delete_iam_policy_member(name, &ipm_api, "restate-workload-identity").await?; + delete_job(name, &job_api, "restate-wi-canary").await?; + } + let restate_service = restate_service( base_metadata, spec.security @@ -741,65 +811,103 @@ async fn apply_pod_identity_association( Ok(pia_api.patch(name, ¶ms, &Patch::Apply(&pia)).await?) } -async fn check_pia( - namespace: &str, +/// Configuration for a canary job that validates cloud credential injection +struct CanaryConfig { + /// Job name, e.g. "restate-pia-canary" + name: &'static str, + /// Command to run in the canary container + command: Vec, + /// Reason prefix for NotReady conditions, e.g. "PodIdentityAssociation" + reason_prefix: &'static str, + /// Human-readable description of what failed, for error messages + failure_message: &'static str, + /// Human-readable description for pending state + pending_message: &'static str, +} + +/// Build a canary Job spec for cloud credential validation. +fn canary_job_spec( base_metadata: &ObjectMeta, tolerations: Option<&Vec>, - job_api: &Api, - pod_api: &Api, -) -> Result<(), Error> { - let name = "restate-pia-canary"; - let params: PatchParams = PatchParams::apply("restate-operator").force(); - - let mut metadata = object_meta(base_metadata, name); + config: &CanaryConfig, +) -> Job { + let mut metadata = object_meta(base_metadata, config.name); let labels = metadata.labels.get_or_insert(Default::default()); if let Some(existing) = labels.get_mut("app.kubernetes.io/name") { - *existing = name.into() + *existing = config.name.into() } else { - labels.insert("app.kubernetes.io/name".into(), name.into()); + labels.insert("app.kubernetes.io/name".into(), config.name.into()); } - debug!( - "Applying PodIdentityAssociation canary Job in namespace {}", - namespace - ); - - let created = match job_api - .patch( - name, - ¶ms, - &Patch::Apply(&Job { - metadata, - spec: Some(JobSpec { - // single-use job that we delete on failuire; don't want to wait 10 seconds for retries - backoff_limit: Some(1), - template: PodTemplateSpec { - metadata: None, - spec: Some(PodSpec { - service_account_name: Some("restate".into()), - containers: vec![Container { - name: "canary".into(), - image: Some("busybox:uclibc".into()), - command: Some(vec![ - "grep".into(), - "-q".into(), - "AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE".into(), - "/proc/self/environ".into(), - ]), - ..Default::default() - }], - tolerations: tolerations.cloned(), - restart_policy: Some("Never".into()), - ..Default::default() - }), - }, + Job { + metadata, + spec: Some(JobSpec { + backoff_limit: Some(1), + template: PodTemplateSpec { + metadata: None, + spec: Some(PodSpec { + service_account_name: Some("restate".into()), + containers: vec![Container { + name: "canary".into(), + image: Some("busybox:uclibc".into()), + command: Some(config.command.clone()), + ..Default::default() + }], + tolerations: tolerations.cloned(), + restart_policy: Some("Never".into()), ..Default::default() }), - status: None, - }), - ) - .await - { + }, + ..Default::default() + }), + status: None, + } +} + +/// Result of checking canary job conditions. +enum CanaryResult { + /// Job completed successfully + Complete, + /// Job failed + Failed, + /// Job hasn't completed yet (no terminal condition) + Pending, +} + +/// Check the conditions on a canary Job to determine its state. +fn check_canary_conditions(job: &Job) -> CanaryResult { + if let Some(conditions) = job.status.as_ref().and_then(|s| s.conditions.as_ref()) { + for condition in conditions { + if condition.status != "True" { + continue; + } + match condition.type_.as_str() { + "Complete" => return CanaryResult::Complete, + "Failed" => return CanaryResult::Failed, + _ => {} + } + } + } + CanaryResult::Pending +} + +/// Shared canary job logic for cloud credential validation. +/// Creates a one-shot Job and checks whether credentials are available. +/// Returns the CanaryResult so callers can distinguish Complete from Pending. +async fn run_canary_job( + namespace: &str, + base_metadata: &ObjectMeta, + tolerations: Option<&Vec>, + job_api: &Api, + config: &CanaryConfig, +) -> Result { + let name = config.name; + let params: PatchParams = PatchParams::apply("restate-operator").force(); + let job = canary_job_spec(base_metadata, tolerations, config); + + debug!("Applying canary Job {name} in namespace {namespace}"); + + let created = match job_api.patch(name, ¶ms, &Patch::Apply(&job)).await { Ok(created) => created, Err(kube::Error::Api(kube::error::ErrorResponse { code: 422, @@ -807,64 +915,75 @@ async fn check_pia( message, .. })) if reason == "Invalid" && message.contains("field is immutable") => { - // when tolerations change, the existing job becomes invalid delete_job(namespace, job_api, name).await?; return Err(Error::NotReady { - reason: "PodIdentityAssociationCanaryPending".into(), + reason: format!("{}CanaryPending", config.reason_prefix), message: "Canary Job has not yet succeeded; recreated Job after tolerations change" .into(), - // job watch will cover this requeue_after: None, }); } Err(err) => return Err(err.into()), }; - if let Some(conditions) = created.status.and_then(|s| s.conditions) { - for condition in conditions { - if condition.status != "True" { - continue; - } - match condition.type_.as_str() { - "Complete" => { - debug!( - "PodIdentityAssociation canary check succeeded in namespace {}", - namespace - ); - return Ok(()); - } - "Failed" => { - error!( - "PodIdentityAssociation canary check failed in namespace {}, deleting Job", - namespace - ); - - delete_job(namespace, job_api, name).await?; - - return Err(Error::NotReady { - reason: "PodIdentityAssociationCanaryFailed".into(), - message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up".into(), - // job watch will cover this - requeue_after: None, - }); - } - _ => {} - } + let result = check_canary_conditions(&created); + match &result { + CanaryResult::Complete => { + debug!("Canary {name} succeeded in namespace {namespace}"); + } + CanaryResult::Failed => { + error!("Canary {name} failed in namespace {namespace}, deleting Job"); + delete_job(namespace, job_api, name).await?; + + return Err(Error::NotReady { + reason: format!("{}CanaryFailed", config.reason_prefix), + message: config.failure_message.into(), + requeue_after: None, + }); + } + CanaryResult::Pending => { + debug!("Canary {name} not yet succeeded in namespace {namespace}"); } } + Ok(result) +} - // if we are here then the job hasn't succeeded or failed yet; lets try and figure things out a bit quicker - // because it takes times for pods to schedule etc +async fn check_pia( + namespace: &str, + base_metadata: &ObjectMeta, + tolerations: Option<&Vec>, + job_api: &Api, + pod_api: &Api, +) -> Result<(), Error> { + let config = CanaryConfig { + name: "restate-pia-canary", + command: vec![ + "grep".into(), + "-q".into(), + "AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE".into(), + "/proc/self/environ".into(), + ], + reason_prefix: "PodIdentityAssociation", + failure_message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up", + pending_message: "Canary Job has not yet succeeded; PIA webhook may need to catch up", + }; - let pods = pod_api - .list(&ListParams::default().labels(&format!( - "batch.kubernetes.io/job-name={name},batch.kubernetes.io/controller-uid={}", - created.metadata.uid.unwrap() - ))) - .await?; + match run_canary_job(namespace, base_metadata, tolerations, job_api, &config).await? { + CanaryResult::Complete => return Ok(()), + CanaryResult::Failed => unreachable!("run_canary_job returns Err for Failed"), + CanaryResult::Pending => {} + } - if let Some(pod) = pods.items.first() { + // job hasn't completed yet - try the pod volume shortcut before declaring pending. + // the eks-pod-identity-token volume is visible immediately at pod creation. + // no need for a controller-uid filter since the job name is unique within the namespace. + let name = config.name; + if let Ok(pods) = pod_api + .list(&ListParams::default().labels(&format!("batch.kubernetes.io/job-name={name}"))) + .await + && let Some(pod) = pods.items.first() + { if pod .spec .as_ref() @@ -873,36 +992,26 @@ async fn check_pia( .unwrap_or(false) { debug!( - "PodIdentityAssociation canary check succeeded via pod lookup in namespace {}", - namespace + "PodIdentityAssociation canary check succeeded via pod lookup in namespace {namespace}" ); return Ok(()); } debug!( - "PodIdentityAssociation canary check failed via pod lookup in namespace {}, deleting Job", - namespace + "PodIdentityAssociation canary check failed via pod lookup in namespace {namespace}, deleting Job" ); delete_job(namespace, job_api, name).await?; return Err(Error::NotReady { reason: "PodIdentityAssociationCanaryFailed".into(), - message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up".into(), - // job watch will cover this + message: config.failure_message.into(), requeue_after: None, }); } - // no pods; we generally expect this immediately after creating the job - debug!( - "PodIdentityAssociation canary Job not yet succeeded in namespace {}", - namespace - ); - Err(Error::NotReady { reason: "PodIdentityAssociationCanaryPending".into(), - message: "Canary Job has not yet succeeded; PIA webhook may need to catch up".into(), - // job watch will cover this + message: config.pending_message.into(), requeue_after: None, }) } @@ -980,6 +1089,117 @@ async fn delete_security_group_policy( } } +/// Parse the GCP project ID from a service account email. +/// Expected format: `name@PROJECT.iam.gserviceaccount.com` +fn parse_gcp_project_from_sa_email(email: &str) -> Option<&str> { + email + .split('@') + .nth(1)? + .strip_suffix(".iam.gserviceaccount.com") +} + +fn restate_iam_policy_member( + ns: &str, + base_metadata: &ObjectMeta, + gcp_project: &str, + gcp_service_account_email: &str, +) -> IAMPolicyMember { + let mut metadata = object_meta(base_metadata, "restate-workload-identity"); + let annotations = metadata.annotations.get_or_insert_with(Default::default); + annotations.insert( + "cnrm.cloud.google.com/project-id".into(), + gcp_project.into(), + ); + + IAMPolicyMember { + metadata, + spec: IAMPolicyMemberSpec { + member: format!("serviceAccount:{gcp_project}.svc.id.goog[{ns}/restate]"), + role: "roles/iam.workloadIdentityUser".into(), + resource_ref: IAMPolicyMemberResourceRef { + kind: "IAMServiceAccount".into(), + external: Some(format!( + "projects/{gcp_project}/serviceAccounts/{gcp_service_account_email}" + )), + name: None, + namespace: None, + }, + }, + status: None, + } +} + +fn is_iam_policy_member_ready(ipm: &IAMPolicyMember) -> bool { + ipm.status + .as_ref() + .and_then(|s| s.conditions.as_ref()) + .and_then(|cs| cs.iter().find(|c| c.r#type == "Ready")) + .is_some_and(|c| c.status == "True") +} + +async fn apply_iam_policy_member( + namespace: &str, + ipm_api: &Api, + ipm: IAMPolicyMember, +) -> Result { + let name = ipm.metadata.name.as_ref().unwrap(); + let params: PatchParams = PatchParams::apply("restate-operator").force(); + debug!( + "Applying IAMPolicyMember {} in namespace {}", + name, namespace + ); + Ok(ipm_api.patch(name, ¶ms, &Patch::Apply(&ipm)).await?) +} + +async fn delete_iam_policy_member( + namespace: &str, + ipm_api: &Api, + name: &str, +) -> Result<(), Error> { + debug!( + "Ensuring IAMPolicyMember {} in namespace {} does not exist", + name, namespace + ); + match ipm_api.delete(name, &DeleteParams::default()).await { + Err(kube::Error::Api(kube::error::ErrorResponse { code: 404, .. })) => Ok(()), + Err(err) => Err(err.into()), + Ok(_) => Ok(()), + } +} + +async fn check_workload_identity( + namespace: &str, + base_metadata: &ObjectMeta, + tolerations: Option<&Vec>, + job_api: &Api, +) -> Result<(), Error> { + let config = CanaryConfig { + name: "restate-wi-canary", + command: vec![ + "wget".into(), + "--header".into(), + "Metadata-Flavor: Google".into(), + "-q".into(), + "-O".into(), + "/dev/null".into(), + "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token".into(), + ], + reason_prefix: "WorkloadIdentity", + failure_message: "Canary pod could not obtain a Workload Identity token from the GKE metadata server; check that Workload Identity is enabled and the IAM binding is correct", + pending_message: "Canary Job has not yet succeeded; Workload Identity may need to propagate", + }; + + match run_canary_job(namespace, base_metadata, tolerations, job_api, &config).await? { + CanaryResult::Complete => Ok(()), + CanaryResult::Failed => unreachable!("run_canary_job returns Err for Failed"), + CanaryResult::Pending => Err(Error::NotReady { + reason: "WorkloadIdentityCanaryPending".into(), + message: config.pending_message.into(), + requeue_after: None, + }), + } +} + async fn change_statefulset_storage( namespace: &str, base_metadata: &ObjectMeta, @@ -1180,3 +1400,338 @@ async fn apply_pod_disruption_budget( ); Ok(pdb_api.patch(name, ¶ms, &Patch::Apply(&pdb)).await?) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::resources::iampolicymembers::{IAMPolicyMemberCondition, IAMPolicyMemberStatus}; + use k8s_openapi::api::batch::v1::{JobCondition, JobStatus}; + + #[test] + fn test_parse_gcp_project_from_sa_email() { + assert_eq!( + parse_gcp_project_from_sa_email("restate@my-project.iam.gserviceaccount.com"), + Some("my-project") + ); + } + + #[test] + fn test_parse_gcp_project_from_sa_email_complex_project() { + assert_eq!( + parse_gcp_project_from_sa_email("sa@my-org-prod-123.iam.gserviceaccount.com"), + Some("my-org-prod-123") + ); + } + + #[test] + fn test_parse_gcp_project_from_sa_email_no_at() { + assert_eq!(parse_gcp_project_from_sa_email("not-an-email"), None); + } + + #[test] + fn test_parse_gcp_project_from_sa_email_wrong_suffix() { + assert_eq!( + parse_gcp_project_from_sa_email("sa@project.example.com"), + None + ); + } + + #[test] + fn test_parse_gcp_project_from_sa_email_empty() { + assert_eq!(parse_gcp_project_from_sa_email(""), None); + } + + fn make_ipm_with_conditions( + conditions: Option>, + ) -> IAMPolicyMember { + IAMPolicyMember { + metadata: Default::default(), + spec: IAMPolicyMemberSpec { + member: "test".into(), + role: "test".into(), + resource_ref: IAMPolicyMemberResourceRef { + kind: "IAMServiceAccount".into(), + external: None, + name: None, + namespace: None, + }, + }, + status: Some(IAMPolicyMemberStatus { conditions }), + } + } + + #[test] + fn test_is_iam_policy_member_ready_true() { + let ipm = make_ipm_with_conditions(Some(vec![IAMPolicyMemberCondition { + r#type: "Ready".into(), + status: "True".into(), + message: None, + reason: None, + last_transition_time: None, + }])); + assert!(is_iam_policy_member_ready(&ipm)); + } + + #[test] + fn test_is_iam_policy_member_ready_false() { + let ipm = make_ipm_with_conditions(Some(vec![IAMPolicyMemberCondition { + r#type: "Ready".into(), + status: "False".into(), + message: Some("error".into()), + reason: None, + last_transition_time: None, + }])); + assert!(!is_iam_policy_member_ready(&ipm)); + } + + #[test] + fn test_is_iam_policy_member_ready_no_conditions() { + let ipm = make_ipm_with_conditions(None); + assert!(!is_iam_policy_member_ready(&ipm)); + } + + #[test] + fn test_is_iam_policy_member_ready_no_status() { + let ipm = IAMPolicyMember { + metadata: Default::default(), + spec: IAMPolicyMemberSpec { + member: "test".into(), + role: "test".into(), + resource_ref: IAMPolicyMemberResourceRef { + kind: "IAMServiceAccount".into(), + external: None, + name: None, + namespace: None, + }, + }, + status: None, + }; + assert!(!is_iam_policy_member_ready(&ipm)); + } + + #[test] + fn test_is_iam_policy_member_ready_wrong_condition_type() { + let ipm = make_ipm_with_conditions(Some(vec![IAMPolicyMemberCondition { + r#type: "Reconciling".into(), + status: "True".into(), + message: None, + reason: None, + last_transition_time: None, + }])); + assert!(!is_iam_policy_member_ready(&ipm)); + } + + // restate_iam_policy_member tests + + fn test_base_metadata() -> ObjectMeta { + ObjectMeta { + name: Some("test-cluster".into()), + namespace: Some("test-ns".into()), + uid: Some("test-uid".into()), + ..Default::default() + } + } + + #[test] + fn test_restate_iam_policy_member_member_format() { + let ipm = restate_iam_policy_member( + "my-namespace", + &test_base_metadata(), + "my-project", + "restate@my-project.iam.gserviceaccount.com", + ); + assert_eq!( + ipm.spec.member, + "serviceAccount:my-project.svc.id.goog[my-namespace/restate]" + ); + } + + #[test] + fn test_restate_iam_policy_member_role() { + let ipm = restate_iam_policy_member( + "ns", + &test_base_metadata(), + "proj", + "sa@proj.iam.gserviceaccount.com", + ); + assert_eq!(ipm.spec.role, "roles/iam.workloadIdentityUser"); + } + + #[test] + fn test_restate_iam_policy_member_resource_ref() { + let ipm = restate_iam_policy_member( + "ns", + &test_base_metadata(), + "proj", + "sa@proj.iam.gserviceaccount.com", + ); + assert_eq!(ipm.spec.resource_ref.kind, "IAMServiceAccount"); + assert_eq!( + ipm.spec.resource_ref.external.as_deref(), + Some("projects/proj/serviceAccounts/sa@proj.iam.gserviceaccount.com") + ); + } + + #[test] + fn test_restate_iam_policy_member_project_annotation() { + let ipm = restate_iam_policy_member( + "ns", + &test_base_metadata(), + "my-project", + "sa@my-project.iam.gserviceaccount.com", + ); + assert_eq!( + ipm.metadata + .annotations + .as_ref() + .and_then(|a| a.get("cnrm.cloud.google.com/project-id")) + .map(|s| s.as_str()), + Some("my-project") + ); + } + + // check_canary_conditions tests + + fn make_job_with_conditions(conditions: Option>) -> Job { + Job { + metadata: Default::default(), + spec: None, + status: conditions.map(|c| JobStatus { + conditions: Some(c), + ..Default::default() + }), + } + } + + #[test] + fn test_check_canary_conditions_complete() { + let job = make_job_with_conditions(Some(vec![JobCondition { + type_: "Complete".into(), + status: "True".into(), + ..Default::default() + }])); + assert!(matches!( + check_canary_conditions(&job), + CanaryResult::Complete + )); + } + + #[test] + fn test_check_canary_conditions_failed() { + let job = make_job_with_conditions(Some(vec![JobCondition { + type_: "Failed".into(), + status: "True".into(), + ..Default::default() + }])); + assert!(matches!( + check_canary_conditions(&job), + CanaryResult::Failed + )); + } + + #[test] + fn test_check_canary_conditions_pending_no_conditions() { + let job = make_job_with_conditions(None); + assert!(matches!( + check_canary_conditions(&job), + CanaryResult::Pending + )); + } + + #[test] + fn test_check_canary_conditions_pending_not_true() { + let job = make_job_with_conditions(Some(vec![JobCondition { + type_: "Complete".into(), + status: "False".into(), + ..Default::default() + }])); + assert!(matches!( + check_canary_conditions(&job), + CanaryResult::Pending + )); + } + + #[test] + fn test_check_canary_conditions_no_status() { + let job = Job { + metadata: Default::default(), + spec: None, + status: None, + }; + assert!(matches!( + check_canary_conditions(&job), + CanaryResult::Pending + )); + } + + // canary_job_spec tests + + #[test] + fn test_canary_job_spec_structure() { + let config = CanaryConfig { + name: "test-canary", + command: vec!["echo".into(), "hello".into()], + reason_prefix: "Test", + failure_message: "test failed", + pending_message: "test pending", + }; + let job = canary_job_spec(&test_base_metadata(), None, &config); + + let spec = job.spec.unwrap(); + assert_eq!(spec.backoff_limit, Some(1)); + + let pod_spec = spec.template.spec.unwrap(); + assert_eq!(pod_spec.service_account_name.as_deref(), Some("restate")); + assert_eq!(pod_spec.restart_policy.as_deref(), Some("Never")); + assert_eq!(pod_spec.containers.len(), 1); + + let container = &pod_spec.containers[0]; + assert_eq!(container.name, "canary"); + assert_eq!(container.image.as_deref(), Some("busybox:uclibc")); + assert_eq!( + container.command.as_ref().unwrap(), + &vec!["echo".to_string(), "hello".to_string()] + ); + } + + #[test] + fn test_canary_job_spec_label() { + let config = CanaryConfig { + name: "my-canary", + command: vec!["true".into()], + reason_prefix: "Test", + failure_message: "", + pending_message: "", + }; + let job = canary_job_spec(&test_base_metadata(), None, &config); + + assert_eq!( + job.metadata + .labels + .as_ref() + .and_then(|l| l.get("app.kubernetes.io/name")) + .map(|s| s.as_str()), + Some("my-canary") + ); + } + + #[test] + fn test_canary_job_spec_tolerations() { + let tolerations = vec![Toleration { + key: Some("node-role".into()), + operator: Some("Exists".into()), + ..Default::default() + }]; + let config = CanaryConfig { + name: "test-canary", + command: vec!["true".into()], + reason_prefix: "Test", + failure_message: "", + pending_message: "", + }; + let job = canary_job_spec(&test_base_metadata(), Some(&tolerations), &config); + + let pod_spec = job.spec.unwrap().template.spec.unwrap(); + assert_eq!(pod_spec.tolerations.unwrap().len(), 1); + } +} diff --git a/src/resources/iampolicymembers.rs b/src/resources/iampolicymembers.rs new file mode 100644 index 0000000..cc5c05a --- /dev/null +++ b/src/resources/iampolicymembers.rs @@ -0,0 +1,82 @@ +use kube::CustomResource; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// IAMPolicyMemberSpec defines the desired state of an IAM policy binding. +/// This is a minimal representation of the Config Connector IAMPolicyMember CRD, +/// containing only the fields needed by the operator. +#[derive(CustomResource, Serialize, Deserialize, Clone, Debug, JsonSchema)] +#[kube( + group = "iam.cnrm.cloud.google.com", + version = "v1beta1", + kind = "IAMPolicyMember", + plural = "iampolicymembers" +)] +#[kube(namespaced)] +#[kube(status = "IAMPolicyMemberStatus")] +#[serde(rename_all = "camelCase")] +pub struct IAMPolicyMemberSpec { + /// The identity to grant the role to, e.g. + /// `serviceAccount:PROJECT.svc.id.goog[NAMESPACE/KSA_NAME]` + pub member: String, + + /// The IAM role to grant, e.g. `roles/iam.workloadIdentityUser` + pub role: String, + + /// Reference to the GCP resource to apply the IAM binding to + pub resource_ref: IAMPolicyMemberResourceRef, +} + +/// Reference to a GCP resource managed by Config Connector +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct IAMPolicyMemberResourceRef { + /// The Config Connector kind, e.g. `IAMServiceAccount` + pub kind: String, + + /// External reference to a GCP resource not managed by Config Connector, + /// e.g. `sa@project.iam.gserviceaccount.com` + #[serde(default, skip_serializing_if = "Option::is_none")] + pub external: Option, + + /// Name of a Config Connector resource in the same namespace + #[serde(default, skip_serializing_if = "Option::is_none")] + pub name: Option, + + /// Namespace of the referenced Config Connector resource + #[serde(default, skip_serializing_if = "Option::is_none")] + pub namespace: Option, +} + +/// Status of the IAMPolicyMember resource as reported by Config Connector +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Hash)] +#[serde(rename_all = "camelCase")] +pub struct IAMPolicyMemberStatus { + /// Conditions represent the latest available observations of the resource's state + #[serde(default, skip_serializing_if = "Option::is_none")] + pub conditions: Option>, +} + +/// A condition on an IAMPolicyMember resource +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Hash)] +#[serde(rename_all = "camelCase")] +pub struct IAMPolicyMemberCondition { + /// Type of condition, e.g. `Ready` + #[serde(rename = "type")] + pub r#type: String, + + /// Status of the condition: `True`, `False`, or `Unknown` + pub status: String, + + /// Human-readable message indicating details about the condition + #[serde(default, skip_serializing_if = "Option::is_none")] + pub message: Option, + + /// Machine-readable reason for the condition's last transition + #[serde(default, skip_serializing_if = "Option::is_none")] + pub reason: Option, + + /// Last time the condition transitioned from one status to another + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_transition_time: Option, +} diff --git a/src/resources/mod.rs b/src/resources/mod.rs index ccce6bc..9627889 100644 --- a/src/resources/mod.rs +++ b/src/resources/mod.rs @@ -1,3 +1,4 @@ +pub mod iampolicymembers; pub mod knative; pub mod podidentityassociations; pub mod restatecloudenvironments;