From 00bc95f293cbc6d02e87e0326c5cd73667ae4b3d Mon Sep 17 00:00:00 2001 From: Joonas Bergius Date: Sat, 29 Jun 2024 19:49:35 -0500 Subject: [PATCH] chore: Adapt to the new wadm_events stream hierarchy Signed-off-by: Joonas Bergius --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- src/services.rs | 15 +++++++-------- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2aef72b..d59fae1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3354,9 +3354,9 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "wadm" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53c29efcb1a7d257c49cfc34db0d81c4e18f75f3317ac10d0c0ba4cc64545fba" +checksum = "2d20c671aabb8fee1d7a688a81be4ebe14cbc4ae3a01db5dd62202ad39538fa5" dependencies = [ "anyhow", "async-nats", diff --git a/Cargo.toml b/Cargo.toml index 73a9340..00e4a0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,7 +92,7 @@ tracing-opentelemetry = "0.20" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } utoipa = { version = "4.1", features = ["axum_extras"] } uuid = { version = "1", features = ["v5"] } -wadm = "0.12.1" +wadm = "0.12.2" wadm-client = "0.1.2" wadm-types = "0.1.0" wasmcloud-operator-types = { version = "*", path = "./crates/types" } diff --git a/src/services.rs b/src/services.rs index 5247377..6a89d20 100644 --- a/src/services.rs +++ b/src/services.rs @@ -37,8 +37,9 @@ use crate::controller::{ const CONSUMER_PREFIX: &str = "wasmcloud_operator_service"; // This should probably be exposed by wadm somewhere -const WADM_EVT_SUBJECT: &str = "wadm.evt"; +const WADM_EVENT_STREAM_NAME: &str = "wadm_events"; const OPERATOR_STREAM_NAME: &str = "wasmcloud_operator_events"; +const OPERATOR_STREAM_SUBJECT: &str = "wasmcloud_operator_events.*.>"; /// Commands that can be sent to the watcher to trigger an update or removal of a service. #[derive(Clone, Debug)] @@ -305,8 +306,6 @@ impl ServiceWatcher { } let js = jetstream::new(client.clone()); - let source_subject = format!("{WADM_EVT_SUBJECT}.{}", lattice_id.clone()); - let destination_subject = format!("wasmcloud_operator_events.{}", lattice_id.clone()); // Should we also be doing this when we first create the ServiceWatcher? let stream = js @@ -321,10 +320,10 @@ impl ServiceWatcher { allow_rollup: false, num_replicas: self.stream_replicas as usize, mirror: Some(Source { - name: "wadm_events".to_string(), + name: WADM_EVENT_STREAM_NAME.to_string(), subject_transforms: vec![SubjectTransform { - source: source_subject, - destination: format!("wasmcloud_operator_events.{}", lattice_id.clone()), + source: wadm::DEFAULT_WADM_EVENTS_TOPIC.to_string(), + destination: OPERATOR_STREAM_SUBJECT.replacen('*', "{{wildcard(1)}}", 1), }], ..Default::default() }), @@ -343,7 +342,7 @@ impl ServiceWatcher { ack_wait: std::time::Duration::from_secs(2), max_deliver: 3, deliver_policy: async_nats::jetstream::consumer::DeliverPolicy::All, - filter_subject: destination_subject.clone(), + filter_subject: OPERATOR_STREAM_SUBJECT.replacen('*', &lattice_id, 1), ..Default::default() }, ) @@ -623,7 +622,7 @@ fn http_server_component(manifest: &Manifest) -> Option { for p in props.source_config.iter() { if let Some(config_props) = &p.properties { if let Some(addr) = config_props.get("address") { - details.address = addr.clone(); + details.address.clone_from(addr); should_create_service = true; }; }