Skip to content

Commit

Permalink
Merge branch 'main' into TEM-2735
Browse files Browse the repository at this point in the history
  • Loading branch information
shahadarsh authored Feb 1, 2024
2 parents 60200cb + 763aca7 commit c342eae
Show file tree
Hide file tree
Showing 25 changed files with 176 additions and 115 deletions.
2 changes: 1 addition & 1 deletion charts/tembo-operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: tembo-operator
description: 'Helm chart to deploy the tembo-operator'
type: application
icon: https://cloud.tembo.io/images/TemboElephant.png
version: 0.2.4
version: 0.2.5
home: https://tembo.io
sources:
- https://github.com/tembo-io/tembo-stacks
Expand Down
2 changes: 1 addition & 1 deletion charts/tembo-operator/templates/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1591,7 +1591,7 @@ spec:
nullable: true
type: array
image:
default: quay.io/tembo/standard-cnpg:15.3.0-1-0c19c7e
default: quay.io/tembo/standard-cnpg:15.3.0-1-3d9a853
description: |-
The Postgres image to use for the CoreDB instance deployment. This should be a valid Postgres image that is compatible with the [https://tembo.io](https://tembo.io) platform. For more information please visit our [tembo-images](https://github.com/tembo-io/tembo-images) repository.
Expand Down
2 changes: 1 addition & 1 deletion conductor/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion conductor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,11 @@ pub async fn delete(client: Client, namespace: &str, name: &str) -> Result<(), C
let coredb_api: Api<CoreDB> = Api::namespaced(client, namespace);
let params = DeleteParams::default();
info!("\nDeleting CoreDB: {}", name);
let _o = coredb_api
let _ = coredb_api
.delete(name, &params)
.await
.map_err(ConductorError::KubeError);

Ok(())
}

Expand Down Expand Up @@ -189,6 +190,7 @@ pub async fn delete_namespace(client: Client, name: &str) -> Result<(), Conducto
.delete(name, &params)
.await
.map_err(ConductorError::KubeError);

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion conductor/testdata/operator-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ controller:
image:
tag: latest
extraEnv:
- name: ENABLE_INITIAL_BACKUP
- name: ENABLE_BACKUP
value: "false"
- name: RUST_LOG
value: info,kube=info,controller=info
Expand Down
126 changes: 74 additions & 52 deletions conductor/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

#[cfg(test)]
mod test {
use chrono::Utc;
use k8s_openapi::{
api::{core::v1::Namespace, core::v1::PersistentVolumeClaim, core::v1::Pod},
apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition,
Expand All @@ -22,10 +21,8 @@ mod test {
};
use pgmq::{Message, PGMQueueExt};

use conductor::{
get_coredb_error_without_status, restart_coredb,
types::{self, StateToControlPlane},
};
use conductor::get_coredb_error_without_status;
use conductor::types::{self, StateToControlPlane};
use controller::extensions::types::{Extension, ExtensionInstallLocation};
use controller::{
apis::coredb_types::{CoreDB, CoreDBSpec},
Expand Down Expand Up @@ -154,7 +151,7 @@ mod test {
image: "default-image-value".to_string()
})
});
let spec: CoreDBSpec = serde_json::from_value(spec_js).unwrap();
let mut spec: CoreDBSpec = serde_json::from_value(spec_js).unwrap();

let msg = types::CRUDevent {
organization_name: org_name.clone(),
Expand All @@ -163,9 +160,11 @@ mod test {
inst_id: "inst_02s4UKVbRy34SAYVSwZq2H".to_owned(),
event_type: types::Event::Create,
dbname: dbname.clone(),
spec: Some(spec),
spec: Some(spec.clone()),
};

// println!("Message: {:?}", msg);

let msg_id = queue.send(&myqueue, &msg).await;
println!("Create msg_id: {msg_id:?}");

Expand Down Expand Up @@ -201,11 +200,11 @@ mod test {
.await
.expect("error deleting message");

let spec = msg.message.spec.expect("No spec found in message");
let passed_spec = msg.message.spec.expect("No spec found in message");

// assert that the message returned by Conductor includes the new metrics values in the spec
//println!("spec: {:?}", spec);
assert!(spec
// println!("spec: {:?}", passed_spec);
assert!(passed_spec
.metrics
.expect("no metrics in data-plane-event message")
.queries
Expand All @@ -214,10 +213,10 @@ mod test {
.contains_key("pg_postmaster"));

assert!(
!spec.extensions.is_empty(),
!passed_spec.extensions.is_empty(),
"Extension object missing from spec"
);
let extensions = spec.extensions;
let extensions = passed_spec.extensions.clone();
assert!(
!extensions.is_empty(),
"Expected at least one extension: {:?}",
Expand All @@ -234,7 +233,7 @@ mod test {
// ADD AN EXTENSION - ASSERT IT MAKES IT TO STATUS.EXTENSIONS
// conductor receives a CRUDevent from control plane
// take note of number of extensions at this point in time
let mut extensions_add = extensions.clone();
// let mut extensions_add = extensions.clone();
let _install_location = ExtensionInstallLocation::default();
let install_location = ExtensionInstallLocation {
enabled: true,
Expand All @@ -243,24 +242,21 @@ mod test {
..ExtensionInstallLocation::default()
};
let install_location = install_location.clone();
extensions_add.push(Extension {
spec.extensions.push(Extension {
name: "pg_jsonschema".to_owned(),
description: Some("fake description".to_string()),
locations: vec![install_location],
});
let num_expected_extensions = extensions_add.len();
let spec_js = serde_json::json!({
"extensions": extensions_add,
});
let spec: CoreDBSpec = serde_json::from_value(spec_js).unwrap();
let num_expected_extensions = spec.extensions.len();
// println!("Updated spec: {:?}", spec.clone());
let msg = types::CRUDevent {
organization_name: org_name.clone(),
data_plane_id: "org_02s3owPQskuGXHE8vYsGSY".to_owned(),
org_id: "org_02s3owPQskuGXHE8vYsGSY".to_owned(),
inst_id: "inst_02s4UKVbRy34SAYVSwZq2H".to_owned(),
event_type: types::Event::Update,
dbname: dbname.clone(),
spec: Some(spec),
spec: Some(spec.clone()),
};
let msg_id = queue.send(&myqueue, &msg).await;
println!("Update msg_id: {msg_id:?}");
Expand All @@ -269,7 +265,7 @@ mod test {
let mut extensions: Vec<Extension> = vec![];
while num_expected_extensions != extensions.len() {
let msg = get_dataplane_message(retries, retry_delay, &queue).await;
//println!("msg: {:?}", msg);
// println!("Update msg: {:?}", msg);
queue
.archive("myqueue_data_plane", msg.msg_id)
.await
Expand All @@ -284,10 +280,6 @@ mod test {
// we added an extension, so it should be +1 now
assert_eq!(num_expected_extensions, extensions.len());

// Installing the new extensions will cause the pods to restart, but it can take a few
// seconds for that to happen. For now lets just wait and see if that fixes the problem.
thread::sleep(time::Duration::from_secs(40));

pod_ready_and_running(pods.clone(), pod_name.clone()).await;

// Get the last time the pod was started
Expand All @@ -311,20 +303,28 @@ mod test {

println!("start_time: {:?}", stdout);

// Once CNPG is running we want to restart
let cluster_name = namespace.clone();
// Lets now test sending an Event::Restart to the queue and see if the
// pod restarts correctly.

restart_coredb(client.clone(), &namespace, &cluster_name, Utc::now())
.await
.expect("failed restarting cnpg pod");
let msg = types::CRUDevent {
organization_name: org_name.clone(),
data_plane_id: "org_02s3owPQskuGXHE8vYsGSY".to_owned(),
org_id: "org_02s3owPQskuGXHE8vYsGSY".to_owned(),
inst_id: "inst_02s4UKVbRy34SAYVSwZq2H".to_owned(),
event_type: types::Event::Restart,
dbname: dbname.clone(),
spec: Some(spec.clone()),
};
let msg_id = queue.send(&myqueue, &msg).await;
println!("Restart msg_id: {:?}", msg_id);

let mut is_ready = false;
let mut current_iteration = 0;
while !is_ready {
if current_iteration > 30 {
panic!("CNPG pod did not restart after about 150 seconds");
panic!("CNPG pod did not restart after about 300 seconds");
}
thread::sleep(time::Duration::from_secs(5));
thread::sleep(time::Duration::from_secs(10));
let current_coredb = get_coredb_error_without_status(client.clone(), &namespace)
.await
.unwrap();
Expand Down Expand Up @@ -365,6 +365,7 @@ mod test {
dbname: dbname.clone(),
spec: None,
};
// println!("DELETE msg: {:?}", msg);
let msg_id = queue.send(&myqueue, &msg).await;
println!("Delete msg_id: {msg_id:?}");

Expand Down Expand Up @@ -395,26 +396,10 @@ mod test {
let region = Region::new(aws_region);
let aws_config_state = AWSConfigState::new(region.clone()).await;
let stack_name = format!("org-{}-inst-{}-cf", org_name, dbname);
// let dcf = aws_config_state
// .delete_cloudformation_stack(&stack_name)
// .await;
// assert!(dcf);
// let exists = aws_config_state.does_stack_exist(&stack_name).await;
// assert!(!exists, "CF stack was not deleted");
match aws_config_state
.delete_cloudformation_stack(&stack_name)
.await
{
Ok(_) => {
// If deletion was successful, check if the stack still exists
let stack_exists = aws_config_state.does_stack_exist(&stack_name).await;
assert!(!stack_exists, "CloudFormation stack was not deleted");
}
Err(e) => {
// If there was an error deleting the stack, fail the test
panic!("Failed to delete CloudFormation stack: {:?}", e);
}
}

// Check to see if the cloudformation stack exists
let cf_stack_deleted = check_cf_stack_deletion(&aws_config_state, &stack_name).await;
assert!(cf_stack_deleted, "CF stack was deleted");
}

async fn kube_client() -> kube::Client {
Expand Down Expand Up @@ -513,4 +498,41 @@ mod test {
}
false
}

use conductor::aws::cloudformation::AWSConfigState;
async fn check_cf_stack_deletion(acs: &AWSConfigState, stack_name: &str) -> bool {
let max_duration = Duration::from_secs(5 * 60); // 5 minutes
let check_interval = Duration::from_secs(30); // Check every 30 seconds

let start_time = tokio::time::Instant::now();
while tokio::time::Instant::now() - start_time < max_duration {
let exists = acs.does_stack_exist(stack_name).await;
println!("Checking if CF stack {} exists: {}", stack_name, exists);

if !exists {
println!(
"CF stack {} does not exist, we assume it's deleted",
stack_name
);
return true;
} else {
match acs.delete_cloudformation_stack(stack_name).await {
Ok(_) => {
println!("CF stack {} was deleted", stack_name);
}
Err(e) => {
panic!("Failed to delete CloudFormation stack: {:?}", e);
}
}
}

tokio::time::sleep(check_interval).await;
}

println!(
"CF stack {} was not deleted within the expected time.",
stack_name
);
false
}
}
2 changes: 1 addition & 1 deletion tembo-operator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tembo-operator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "controller"
description = "Tembo Operator for Postgres"
version = "0.31.8"
version = "0.32.0"
edition = "2021"
default-run = "controller"
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion tembo-operator/src/apis/coredb_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ pub struct CoreDBSpec {
/// please visit our [tembo-images](https://github.com/tembo-io/tembo-images) repository.
///
/// **Default**: quay.io/tembo/standard-cnpg:15.3.0-1-0c19c7e
#[serde(default = "defaults::default_image")]
#[serde(default = "defaults::default_image_uri")]
pub image: String,

/// **DEPRECATED** The postgres-exporter image you want to use for the postgres-exporter deployment.
Expand Down
7 changes: 4 additions & 3 deletions tembo-operator/src/cloudnativepg/cnpg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::{
},
config::Config,
configmap::custom_metrics_configmap_settings,
defaults::{default_image, default_llm_image},
defaults::{default_dw_image_uri, default_image_uri, default_llm_image_uri},
errors::ValueError,
is_postgres_ready, patch_cdb_status_merge,
postgres_exporter::EXPORTER_CONFIGMAP_PREFIX,
Expand Down Expand Up @@ -600,8 +600,9 @@ pub fn cnpg_cluster_from_cdb(
// Check if the cdb.spec.image is set, if not then figure out which image to use.
let image = if cdb.spec.image.is_empty() {
match cdb.spec.stack.as_ref().map(|s| s.name.to_lowercase()) {
Some(ref name) if name == "machinelearning" => default_llm_image(),
_ => default_image(),
Some(ref name) if name == "machinelearning" => default_llm_image_uri(),
Some(ref name) if name == "datawarehouse" => default_dw_image_uri(),
_ => default_image_uri(),
}
} else {
cdb.spec.image.clone()
Expand Down
Loading

0 comments on commit c342eae

Please sign in to comment.