Skip to content

Commit

Permalink
feat: Refactor configuration into a module
Browse files Browse the repository at this point in the history
Also add support for setting the number of stream replicas when creating
the stream mirror for the operator.
  • Loading branch information
protochron committed Apr 9, 2024
1 parent b933320 commit 557e05d
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 18 deletions.
26 changes: 25 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ axum-server = {workspace = true}
anyhow = {workspace = true}
ctrlc = {workspace = true}
cloudevents-sdk = {workspace = true}
config = {workspace = true}
futures = {workspace = true}
handlebars = {workspace = true}
json-patch = {workspace = true}
Expand Down Expand Up @@ -58,6 +59,7 @@ async-nats = "0.33"
axum = { version = "0.6", features = ["headers"] }
axum-server = { version = "0.4", features = ["tls-rustls"] }
anyhow = "1"
config = {version = "0.14", default-features = false, features = ["convert-case", "async"]}
cloudevents-sdk = "0.7"
ctrlc = "3"
futures = "0.3"
Expand Down
13 changes: 13 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use serde::{Deserialize, Serialize};

/// Configuration for the operator. If you are configuring the operator using environment variables
/// then all values need to be prefixed with "WASMCLOUD_OPERATOR".
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
pub struct OperatorConfig {
#[serde(default = "default_stream_replicas")]
pub stream_replicas: u16,
}

fn default_stream_replicas() -> u16 {
1
}
16 changes: 6 additions & 10 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
docker_secret::DockerConfigJson, resources::application::get_client, services::ServiceWatcher,
Error, Result,
config::OperatorConfig, docker_secret::DockerConfigJson, resources::application::get_client,
services::ServiceWatcher, Error, Result,
};
use anyhow::bail;
use futures::StreamExt;
Expand All @@ -24,7 +24,6 @@ use kube::{
Resource, ResourceExt,
};
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::{BTreeMap, HashMap};
use std::str::from_utf8;
Expand All @@ -50,7 +49,7 @@ pub const WASMCLOUD_OPERATOR_MANAGED_BY_LABEL_REQUIREMENT: &str =

pub struct Context {
pub client: Client,
pub wasmcloud_config: WasmcloudConfig,
pub wasmcloud_config: OperatorConfig,
pub nats_creds: Arc<RwLock<HashMap<NameNamespace, SecretString>>>,
service_watcher: ServiceWatcher,
}
Expand Down Expand Up @@ -90,9 +89,6 @@ impl Secrets {
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
pub struct WasmcloudConfig {}

pub async fn reconcile(cluster: Arc<WasmCloudHostConfig>, ctx: Arc<Context>) -> Result<Action> {
let cluster_configs: Api<WasmCloudHostConfig> =
Api::namespaced(ctx.client.clone(), &cluster.namespace().unwrap());
Expand Down Expand Up @@ -854,11 +850,11 @@ impl NameNamespace {
#[derive(Clone, Default)]
pub struct State {
pub nats_creds: Arc<RwLock<HashMap<NameNamespace, SecretString>>>,
pub config: WasmcloudConfig,
pub config: OperatorConfig,
}

impl State {
pub fn new(config: WasmcloudConfig) -> Self {
pub fn new(config: OperatorConfig) -> Self {
Self {
config,
..Default::default()
Expand All @@ -880,7 +876,7 @@ pub async fn run(state: State) -> anyhow::Result<()> {
let services = Api::<Service>::all(client.clone());
let pods = Api::<Pod>::all(client.clone());

let watcher = ServiceWatcher::new(client.clone());
let watcher = ServiceWatcher::new(client.clone(), state.config.stream_replicas);
let config = Config::default();
let ctx = Context {
client,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl IntoResponse for Error {
}
}

pub mod config;
pub mod controller;
pub mod discovery;
pub mod docker_secret;
Expand Down
15 changes: 11 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use anyhow::{anyhow, Result};
use axum_server::{tls_rustls::RustlsConfig, Handle};
use controller::{State, WasmcloudConfig};
use controller::{config::OperatorConfig, State};

use config::Config;
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use k8s_openapi::kube_aggregator::pkg::apis::apiregistration::v1::{
Expand Down Expand Up @@ -31,9 +32,15 @@ async fn main() -> Result<()> {
error!("Failed to configure tracing: {}", e);
e
})?;
info!("Starting controller");

let config = WasmcloudConfig {};
info!("Starting operator");

let cfg = Config::builder()
.add_source(config::Environment::with_prefix("WASMCLOUD_OPERATOR"))
.build()
.map_err(|e| anyhow!("Failed to build config: {}", e))?;
let config: OperatorConfig = cfg
.try_deserialize()
.map_err(|e| anyhow!("Failed to parse config: {}", e))?;

let client = Client::try_default().await?;
install_crd(&client).await?;
Expand Down
8 changes: 5 additions & 3 deletions src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl Watcher {
let manifest = mp.manifest;
if let Some(httpserver_service) = http_server_component(&manifest) {
if let Some(address) = find_address(&manifest, httpserver_service.name.as_str()) {
debug!(address = address, "Found address");
debug!(address, "Found address");
if let Ok(addr) = address.parse::<SocketAddr>() {
debug!("Upserting service for manifest: {}", manifest.metadata.name);
self.tx
Expand Down Expand Up @@ -224,11 +224,12 @@ impl Watcher {
pub struct ServiceWatcher {
watchers: Arc<RwLock<HashMap<String, Watcher>>>,
sender: mpsc::UnboundedSender<WatcherCommand>,
stream_replicas: u16,
}

impl ServiceWatcher {
/// Creates a new service watcher.
pub fn new(k8s_client: KubeClient) -> Self {
pub fn new(k8s_client: KubeClient, stream_replicas: u16) -> Self {
let (tx, mut rx) = mpsc::unbounded_channel::<WatcherCommand>();

let client = k8s_client.clone();
Expand Down Expand Up @@ -264,6 +265,7 @@ impl ServiceWatcher {
Self {
watchers: Arc::new(RwLock::new(HashMap::new())),
sender: tx,
stream_replicas,
}
}

Expand Down Expand Up @@ -324,7 +326,7 @@ impl ServiceWatcher {
retention: RetentionPolicy::WorkQueue,
storage: StorageType::File,
allow_rollup: false,
num_replicas: 1,
num_replicas: self.stream_replicas as usize,
mirror: Some(Source {
name: "wadm_events".to_string(),
subject_transforms: vec![SubjectTransform {
Expand Down

0 comments on commit 557e05d

Please sign in to comment.