diff --git a/tembo-operator/src/cloudnativepg/hibernate.rs b/tembo-operator/src/cloudnativepg/hibernate.rs index c7a8b279e..c49df7f43 100644 --- a/tembo-operator/src/cloudnativepg/hibernate.rs +++ b/tembo-operator/src/cloudnativepg/hibernate.rs @@ -3,7 +3,6 @@ use crate::cloudnativepg::clusters::{ClusterStatusConditions, ClusterStatusCondi use crate::cloudnativepg::cnpg::{get_cluster, get_pooler, get_scheduled_backups}; use crate::cloudnativepg::poolers::Pooler; use crate::cloudnativepg::scheduledbackups::ScheduledBackup; -use crate::ingress::{delete_ingress_route, delete_ingress_route_tcp}; use crate::Error; use crate::{patch_cdb_status_merge, requeue_normal_with_jitter, Context}; @@ -14,17 +13,17 @@ use serde_json::json; use k8s_openapi::api::apps::v1::Deployment; -use super::clusters::Cluster; use crate::app_service::manager::get_appservice_deployment_objects; use crate::cloudnativepg::cnpg_utils::{ get_pooler_instances, patch_cluster_merge, patch_pooler_merge, patch_scheduled_backup_merge, removed_stalled_backups, }; -use crate::ingress_route_crd::IngressRoute; use std::sync::Arc; use std::time::Duration; use tracing::{debug, error, info, warn}; +use super::clusters::Cluster; + /// Resolves hibernation in the Cluster and related services of the CoreDB /// /// If the cluster is in spec.stop state, this will activate the CNPG hibernation @@ -146,90 +145,6 @@ pub async fn reconcile_cluster_hibernation(cdb: &CoreDB, ctx: &Arc) -> } } - // Remove IngressRoutes for stopped instances - let ingress_route_api: Api = Api::namespaced(ctx.client.clone(), &namespace); - if let Err(err) = delete_ingress_route(ingress_route_api.clone(), &namespace, &name).await { - warn!( - "Error deleting app service IngressRoute for {}: {}", - cdb.name_any(), - err - ); - return Err(Action::requeue(Duration::from_secs(300))); - } - - let metrics_ing_route_name = format!("{}-metrics", cdb.name_any().as_str()); - if let Err(err) = delete_ingress_route( - ingress_route_api.clone(), - &namespace, - &metrics_ing_route_name, - ) - .await - { - warn!( - "Error deleting metrics IngressRoute for {}: {}", - cdb.name_any(), - err - ); - return Err(Action::requeue(Duration::from_secs(300))); - } - - // Remove IngressRouteTCP route for stopped instances - let ingress_route_tcp_api = Api::namespaced(ctx.client.clone(), &namespace); - let prefix_read_only = format!("{}-ro-0", cdb.name_any().as_str()); - if let Err(err) = - delete_ingress_route_tcp(ingress_route_tcp_api.clone(), &namespace, &prefix_read_only).await - { - warn!( - "Error deleting postgres IngressRouteTCP for {}: {}", - cdb.name_any(), - err - ); - return Err(Action::requeue(Duration::from_secs(300))); - } - - let prefix_read_write = format!("{}-rw-0", cdb.name_any().as_str()); - if let Err(err) = delete_ingress_route_tcp( - ingress_route_tcp_api.clone(), - &namespace, - &prefix_read_write, - ) - .await - { - warn!( - "Error deleting postgres IngressRouteTCP for {}: {}", - cdb.name_any(), - err - ); - return Err(Action::requeue(Duration::from_secs(300))); - } - - let prefix_pooler = format!("{}-pooler-0", cdb.name_any().as_str()); - if let Err(err) = - delete_ingress_route_tcp(ingress_route_tcp_api.clone(), &namespace, &prefix_pooler).await - { - warn!( - "Error deleting postgres IngressRouteTCP for {}: {}", - cdb.name_any(), - err - ); - return Err(Action::requeue(Duration::from_secs(300))); - } - - let extra_domain_names = cdb.spec.extra_domains_rw.clone().unwrap_or_default(); - if !extra_domain_names.is_empty() { - let prefix_extra = format!("extra-{}-rw", cdb.name_any().as_str()); - if let Err(err) = - delete_ingress_route_tcp(ingress_route_tcp_api.clone(), &namespace, &prefix_extra).await - { - warn!( - "Error deleting extra postgres IngressRouteTCP for {}: {}", - cdb.name_any(), - err - ); - return Err(Action::requeue(Duration::from_secs(300))); - } - } - // Stop CNPG reconciliation for hibernated instances. // We should not stop CNPG reconciliation until hibernation is fully completed, // as the instance may not finish hibernating otherwise. diff --git a/tembo-operator/src/ingress.rs b/tembo-operator/src/ingress.rs index 0568c8e6f..1d7f67128 100644 --- a/tembo-operator/src/ingress.rs +++ b/tembo-operator/src/ingress.rs @@ -13,7 +13,6 @@ use kube::{ use regex::Regex; use std::sync::Arc; -use crate::ingress_route_crd::IngressRoute; use crate::{ apis::coredb_types::CoreDB, errors::OperatorError, @@ -197,48 +196,7 @@ async fn apply_ingress_route_tcp( Ok(()) } -pub async fn delete_ingress_route( - ingress_route_api: Api, - namespace: &str, - ingress_route_name: &String, -) -> Result<(), OperatorError> { - // Check if the resource exists - if ingress_route_api - .get(&ingress_route_name.clone()) - .await - .is_ok() - { - // If it exists, proceed with the deletion - let delete_parameters = DeleteParams::default(); - match ingress_route_api - .delete(&ingress_route_name.clone(), &delete_parameters) - .await - { - Ok(_) => { - info!( - "Deleted IngressRoute {}.{}", - ingress_route_name.clone(), - namespace - ); - } - Err(e) => { - error!( - "Failed to delete IngressRoute {}.{}: {}", - ingress_route_name, namespace, e - ); - return Err(OperatorError::IngressRouteError); - } - } - } else { - debug!( - "IngressRoute {}.{} was not found. Assuming it's already deleted.", - ingress_route_name, namespace - ); - } - Ok(()) -} - -pub async fn delete_ingress_route_tcp( +async fn delete_ingress_route_tcp( ingress_route_tcp_api: Api, namespace: &str, ingress_route_tcp_name: &String, diff --git a/tembo-operator/tests/integration_tests.rs b/tembo-operator/tests/integration_tests.rs index b429eed05..53f00a9d0 100644 --- a/tembo-operator/tests/integration_tests.rs +++ b/tembo-operator/tests/integration_tests.rs @@ -697,7 +697,6 @@ mod test { client: Client, cdb_name: &str, namespace: &str, - list_params: &ListParams, num_expected: usize, ) -> Result, errors::OperatorError> where @@ -710,22 +709,22 @@ mod test { R::DynamicType: Default, { let api: Api = Api::namespaced(client, namespace); + let lp = ListParams::default().labels(format!("coredb.io/name={}", cdb_name).as_str()); let retry = 15; let mut passed_retry = false; let mut resource_list: Vec = Vec::new(); for _ in 0..retry { - let resources = api.list(&list_params).await?; + let resources = api.list(&lp).await?; if resources.items.len() == num_expected { resource_list.extend(resources.items); passed_retry = true; break; } else { println!( - "ns:{}.cdb:{} Found {} {}, expected {}", + "ns:{}.cdb:{} Found {}, expected {}", namespace, cdb_name, resources.items.len(), - std::any::type_name::(), num_expected ); } @@ -3916,20 +3915,16 @@ mod test { let _coredb_resource = coredbs.patch(cdb_name, ¶ms, &patch).await.unwrap(); // assert we created three Deployments, with the names we provided - let default_params = ListParams::default(); let deployment_items: Vec = - list_resources(client.clone(), cdb_name, &namespace, &default_params, 3) + list_resources(client.clone(), cdb_name, &namespace, 3) .await .unwrap(); // three AppService deployments. the postgres exporter is disabled assert!(deployment_items.len() == 3); - let cdb_params = - ListParams::default().labels(format!("coredb.io/name={}", cdb_name).as_str()); - let service_items: Vec = - list_resources(client.clone(), cdb_name, &namespace, &cdb_params, 2) - .await - .unwrap(); + let service_items: Vec = list_resources(client.clone(), cdb_name, &namespace, 2) + .await + .unwrap(); // two AppService Services, because two of the AppServices expose ports assert!(service_items.len() == 2); @@ -4023,7 +4018,7 @@ mod test { assert!(found); let ingresses: Result, errors::OperatorError> = - list_resources(client.clone(), cdb_name, &namespace, &cdb_params, 1).await; + list_resources(client.clone(), cdb_name, &namespace, 1).await; let ingress = ingresses.unwrap(); assert_eq!(ingress.len(), 1); let ingress_route = ingress[0].clone(); @@ -4043,10 +4038,8 @@ mod test { // Check for IngressRouteTCP let ing_name = format!("{cdb_name}-ferretdb"); - let ing_params = - ListParams::default().labels(format!("coredb.io/name={}", ing_name).as_str()); let ingresses_tcp: Result, errors::OperatorError> = - list_resources(client.clone(), &ing_name, &namespace, &ing_params, 1).await; + list_resources(client.clone(), &ing_name, &namespace, 1).await; let ingress_tcp = ingresses_tcp.unwrap(); assert_eq!(ingress_tcp.len(), 1); let ingress_route_tcp = ingress_tcp[0].clone(); @@ -4152,7 +4145,7 @@ mod test { coredbs.patch(cdb_name, ¶ms, &patch).await.unwrap(); let deployment_items: Vec = - list_resources(client.clone(), cdb_name, &namespace, &default_params, 1) + list_resources(client.clone(), cdb_name, &namespace, 1) .await .unwrap(); assert!(deployment_items.len() == 1); @@ -4163,10 +4156,9 @@ mod test { ); // should still be just one Service - let service_items: Vec = - list_resources(client.clone(), cdb_name, &namespace, &cdb_params, 1) - .await - .unwrap(); + let service_items: Vec = list_resources(client.clone(), cdb_name, &namespace, 1) + .await + .unwrap(); // One appService Services assert!(service_items.len() == 1); @@ -4357,28 +4349,26 @@ CREATE EVENT TRIGGER pgrst_watch let patch = Patch::Apply(&coredb_json); coredbs.patch(cdb_name, ¶ms, &patch).await.unwrap(); let deployment_items: Vec = - list_resources(client.clone(), cdb_name, &namespace, &default_params, 0) + list_resources(client.clone(), cdb_name, &namespace, 0) .await .unwrap(); assert!(deployment_items.is_empty()); - let service_items: Vec = - list_resources(client.clone(), cdb_name, &namespace, &cdb_params, 0) - .await - .unwrap(); + let service_items: Vec = list_resources(client.clone(), cdb_name, &namespace, 0) + .await + .unwrap(); assert!(service_items.is_empty()); // should be no Services // ingress must be gone - let ingresses: Vec = - list_resources(client.clone(), cdb_name, &namespace, &default_params, 0) - .await - .unwrap(); + let ingresses: Vec = list_resources(client.clone(), cdb_name, &namespace, 0) + .await + .unwrap(); assert_eq!(ingresses.len(), 0); // Assert IngressRouteTCP is gone let ingresses_tcp: Vec = - list_resources(client.clone(), cdb_name, &namespace, &default_params, 0) + list_resources(client.clone(), cdb_name, &namespace, 0) .await .unwrap(); assert_eq!(ingresses_tcp.len(), 0); @@ -5818,8 +5808,6 @@ CREATE EVENT TRIGGER pgrst_watch let test = TestCore::new(test_name).await; let name = test.name.clone(); let pooler_name = format!("{}-pooler", name); - let namespace = test.namespace.clone(); - let default_params = ListParams::default(); // Generate very simple CoreDB JSON definitions. The first will be for // initializing and starting the cluster, and the second for stopping @@ -5862,185 +5850,6 @@ CREATE EVENT TRIGGER pgrst_watch .await .not()); - // Assert that IngressRouteTCPs are removed after hibernation - let client = test.client.clone(); - let ingresses_tcp: Vec = - list_resources(client.clone(), &name, &namespace, &default_params, 0) - .await - .unwrap(); - assert_eq!( - ingresses_tcp.len(), - 0, - "IngressRouteTCPs should be removed after hibernation" - ); - - // Assert that IngressRoutes are removed after hibernation - let client = test.client.clone(); - let ingress_routes: Vec = - list_resources(client.clone(), &name, &namespace, &default_params, 0) - .await - .unwrap(); - assert_eq!( - ingress_routes.len(), - 0, - "IngressRoutes should be removed after hibernation" - ); - - // Patch the cluster to start it up again, then check to ensure it - // actually did so. This proves hibernation can be reversed. - - println!("Starting cluster after hibernation"); - println!("CoreDB: {}", cluster_start); - let _ = test.set_cluster_def(&cluster_start).await; - assert!(status_running(&test.coredbs, &name).await); - assert!(pooler_status_running(&test.poolers, &pooler_name).await); - - // Assert there are 2 IngressRouteTCPs created after starting. 1 for postgres and 1 for the pooler - let ingress_tcps: Vec = - list_resources(client.clone(), &name, &namespace, &default_params, 2) - .await - .unwrap(); - assert_eq!( - ingress_tcps.len(), - 2, - "IngressRouteTCPs should be created after starting" - ); - - // Assert there is 1 IngressRoute created after starting. This is the IngressRoute for metrics - let ingress_routes: Vec = - list_resources(client.clone(), &name, &namespace, &default_params, 1) - .await - .unwrap(); - assert_eq!( - ingress_routes.len(), - 1, - "IngressRoutes should be created after starting" - ); - - test.teardown().await; - } - - #[tokio::test] - #[ignore] - async fn functional_test_hibernate_with_app_service() { - let test_name = "test-hibernate-cnpg-with-app-service"; - let test = TestCore::new(test_name).await; - let name = test.name.clone(); - let pooler_name = format!("{}-pooler", name); - let namespace = test.namespace.clone(); - let default_params = ListParams::default(); - - // Generate very simple CoreDB JSON definitions. The first will be for - // initializing and starting the cluster with an app service, and the second for stopping - // it. We'll use a single replica to ensure _all_ parts of the cluster - // are affected by hibernate. - - let cluster_start = serde_json::json!({ - "apiVersion": API_VERSION, - "kind": "CoreDB", - "metadata": { - "name": name - }, - "spec": { - "replicas": 1, - "stop": false, - "connectionPooler": { - "enabled": true - }, - "appServices": [ - { - "name": "postgrest", - "image": "postgrest/postgrest:v10.0.0", - "env": [ - { - "name": "PGRST_DB_URI", - "valueFromPlatform": "ReadWriteConnection" - }, - { - "name": "PGRST_DB_SCHEMA", - "value": "public" - }, - { - "name": "PGRST_DB_ANON_ROLE", - "value": "postgres" - } - ], - "routing": [ - { - "port": 3000, - "ingressPath": "/", - "ingressType": "http" - } - ], - "resources": { - "requests": { - "cpu": "100m", - "memory": "256Mi" - }, - "limits": { - "cpu": "100m", - "memory": "256Mi" - } - } - } - ] - } - }); - - let mut cluster_stop = cluster_start.clone(); - cluster_stop["spec"]["stop"] = serde_json::json!(true); - - // Begin by starting the cluster and validating that it worked. - // We need this here as initial iterations of the hibernate code - // prevented the cluster from starting. - - let _ = test.set_cluster_def(&cluster_start).await; - assert!(status_running(&test.coredbs, &name).await); - assert!(pooler_status_running(&test.poolers, &pooler_name).await); - - // Assert there are 3 pods running: 1 for postgres, 1 for the pooler, and 1 for the app service - let pods: Api = Api::namespaced(test.client.clone(), &namespace); - let pods_list = pods.list(&Default::default()).await.unwrap(); - assert_eq!(pods_list.items.len(), 3); - - // Stop the cluster and check to make sure it's not running to ensure - // hibernate is doing its job. - - let _ = test.set_cluster_def(&cluster_stop).await; - let _ = wait_until_status_not_running(&test.coredbs, &name).await; - assert!(status_running(&test.coredbs, &name).await.not()); - assert!(pooler_status_running(&test.poolers, &pooler_name) - .await - .not()); - - // Assert there are no pods running - let pods_list = pods.list(&Default::default()).await.unwrap(); - assert_eq!(pods_list.items.len(), 0); - - // Assert that IngressRouteTCPs are removed after hibernation - let client = test.client.clone(); - let ingresses_tcp: Vec = - list_resources(client.clone(), &name, &namespace, &default_params, 0) - .await - .unwrap(); - assert_eq!( - ingresses_tcp.len(), - 0, - "IngressRouteTCPs should be removed after hibernation" - ); - - // Assert that IngressRoutes are removed after hibernation - let client = test.client.clone(); - let ingress_routes: Vec = - list_resources(client.clone(), &name, &namespace, &default_params, 0) - .await - .unwrap(); - assert_eq!( - ingress_routes.len(), - 0, - "IngressRoutes should be removed after hibernation" - ); - // Patch the cluster to start it up again, then check to ensure it // actually did so. This proves hibernation can be reversed. @@ -6050,32 +5859,6 @@ CREATE EVENT TRIGGER pgrst_watch assert!(status_running(&test.coredbs, &name).await); assert!(pooler_status_running(&test.poolers, &pooler_name).await); - // Assert there are 3 pods running: 1 for postgres, 1 for the pooler, and 1 for the app service - let pods_list = pods.list(&Default::default()).await.unwrap(); - assert_eq!(pods_list.items.len(), 3); - - // Assert there are 2 IngressRouteTCPs created after starting. 1 for postgres and 1 for the pooler - let ingress_tcps: Vec = - list_resources(client.clone(), &name, &namespace, &default_params, 2) - .await - .unwrap(); - assert_eq!( - ingress_tcps.len(), - 2, - "IngressRouteTCPs should be created after starting" - ); - - // Assert there are 2 IngressRoutes created after starting. 1 for metrics and 1 for the app service - let ingress_routes: Vec = - list_resources(client.clone(), &name, &namespace, &default_params, 2) - .await - .unwrap(); - assert_eq!( - ingress_routes.len(), - 2, - "IngressRoutes should be created after starting" - ); - test.teardown().await; }