Skip to content

Commit

Permalink
Merge pull request #51 from joonas/chore/adapt-to-new-wadm_events-str…
Browse files Browse the repository at this point in the history
…eam-hierarchy

chore: Adapt to the new wadm_events stream hierarchy
  • Loading branch information
joonas committed Jul 2, 2024
2 parents bf8a74a + 00bc95f commit d0f810c
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 11 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ tracing-opentelemetry = "0.20"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
utoipa = { version = "4.1", features = ["axum_extras"] }
uuid = { version = "1", features = ["v5"] }
wadm = "0.12.1"
wadm = "0.12.2"
wadm-client = "0.1.2"
wadm-types = "0.1.0"
wasmcloud-operator-types = { version = "*", path = "./crates/types" }
Expand Down
15 changes: 7 additions & 8 deletions src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ use crate::controller::{

const CONSUMER_PREFIX: &str = "wasmcloud_operator_service";
// This should probably be exposed by wadm somewhere
const WADM_EVT_SUBJECT: &str = "wadm.evt";
const WADM_EVENT_STREAM_NAME: &str = "wadm_events";
const OPERATOR_STREAM_NAME: &str = "wasmcloud_operator_events";
const OPERATOR_STREAM_SUBJECT: &str = "wasmcloud_operator_events.*.>";

/// Commands that can be sent to the watcher to trigger an update or removal of a service.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -305,8 +306,6 @@ impl ServiceWatcher {
}

let js = jetstream::new(client.clone());
let source_subject = format!("{WADM_EVT_SUBJECT}.{}", lattice_id.clone());
let destination_subject = format!("wasmcloud_operator_events.{}", lattice_id.clone());

// Should we also be doing this when we first create the ServiceWatcher?
let stream = js
Expand All @@ -321,10 +320,10 @@ impl ServiceWatcher {
allow_rollup: false,
num_replicas: self.stream_replicas as usize,
mirror: Some(Source {
name: "wadm_events".to_string(),
name: WADM_EVENT_STREAM_NAME.to_string(),
subject_transforms: vec![SubjectTransform {
source: source_subject,
destination: format!("wasmcloud_operator_events.{}", lattice_id.clone()),
source: wadm::DEFAULT_WADM_EVENTS_TOPIC.to_string(),
destination: OPERATOR_STREAM_SUBJECT.replacen('*', "{{wildcard(1)}}", 1),
}],
..Default::default()
}),
Expand All @@ -343,7 +342,7 @@ impl ServiceWatcher {
ack_wait: std::time::Duration::from_secs(2),
max_deliver: 3,
deliver_policy: async_nats::jetstream::consumer::DeliverPolicy::All,
filter_subject: destination_subject.clone(),
filter_subject: OPERATOR_STREAM_SUBJECT.replacen('*', &lattice_id, 1),
..Default::default()
},
)
Expand Down Expand Up @@ -623,7 +622,7 @@ fn http_server_component(manifest: &Manifest) -> Option<HttpServerComponent> {
for p in props.source_config.iter() {
if let Some(config_props) = &p.properties {
if let Some(addr) = config_props.get("address") {
details.address = addr.clone();
details.address.clone_from(addr);
should_create_service = true;
};
}
Expand Down

0 comments on commit d0f810c

Please sign in to comment.