Skip to content

Commit

Permalink
Fix cluster init job and bookkeeper metadata prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
visortelle committed Apr 25, 2024
1 parent 662ee25 commit 844ca84
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 41 deletions.
88 changes: 56 additions & 32 deletions src/docker_compose/docker_compose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,22 @@ pub struct ClusterOutput {
impl PrintInfo for ClusterOutput {
fn print_info(&self) {
println!("Cluster index: {}", self.cluster_index);
println!("Pulsar broker service URL: {}", self.broker_service_host_url.clone().unwrap_or("-".to_string()));
println!("Pulsar web service URL: {}", self.web_service_host_url.clone().unwrap_or("-".to_string()));
println!(
"Pulsar broker service URL: {}",
self.broker_service_host_url
.clone()
.unwrap_or("-".to_string())
);
println!(
"Pulsar web service URL: {}",
self.web_service_host_url.clone().unwrap_or("-".to_string())
);

if self.dekaf_host_url.is_some() {
println!("Dekaf management UI is available URL: {:?}", self.dekaf_host_url);
println!(
"Dekaf management UI is available URL: {:?}",
self.dekaf_host_url
);
}
}
}
Expand Down Expand Up @@ -213,7 +224,8 @@ pub fn generate_cluster(
None
};

let dekaf_template = dekaf_output.clone()
let dekaf_template = dekaf_output
.clone()
.map(|dekaf| dekaf.docker_compose_template)
.unwrap_or("".to_string());

Expand Down Expand Up @@ -255,7 +267,8 @@ pub fn generate_pulsar_proxy(
) -> Result<PulsarProxyOutput> {
let pulsar_version = instance_config.pulsar_version;

let depends_on_broker_template = format!("████████████broker-{cluster_name}-0:\n████████████████condition: service_healthy");
let depends_on_broker_template =
format!("████████████broker-{cluster_name}-0:\n████████████████condition: service_healthy");

let metadata_store_url = (0..instance_config.num_zookeepers)
.map(|i| format!("zk:zookeeper-{i}:2181"))
Expand Down Expand Up @@ -295,7 +308,6 @@ pub fn generate_pulsar_proxy(
████████deploy:
████████████resources:
████████████████limits:
████████████████████cpus: '0.5'
████████████████████memory: 256M
████████networks:
████████████- pulsar-net-{instance_name}
Expand Down Expand Up @@ -352,7 +364,6 @@ pub fn generate_zookeeper_template(
████████deploy:
████████████resources:
████████████████limits:
████████████████████cpus: '2'
████████████████████memory: 256M
████████networks:
████████████- pulsar-net-{instance_name}
Expand All @@ -375,20 +386,32 @@ pub fn generate_pulsar_init_job_template(
.collect::<Vec<String>>()
.join("\n");

let init_pulsar_cluster_script = format!("bin/pulsar initialize-cluster-metadata --cluster {cluster_name} --metadata-store zk:zookeeper-0:2181/{cluster_name} --configuration-metadata-store zk:zookeeper-0:2181/{cluster_name} --web-service-url {web_service_url} --broker-service-url {broker_service_url};");
let init_bookkeeper_cluster_script = "bin/bookkeeper shell initnewcluster;".to_string();

let metadata_service_uri = format!(
"zk+hierarchical://{}",
(0..zookeepers_per_cluster)
.map(|i| format!("zookeeper-{i}:2181"))
.collect::<Vec<String>>()
.join(";")
) + &format!("/bookkeeper-{cluster_name}");

format! {"
████# Pulsar init job for cluster {cluster_name}
████pulsar-init-job-{cluster_name}:
████████image: apachepulsar/pulsar:{pulsar_version}
████████user: pulsar
████████command: bash -c \"bin/apply-config-from-env.py conf/pulsar_env.sh; bin/pulsar initialize-cluster-metadata --cluster {cluster_name} --metadata-store zk:zookeeper-0:2181/{cluster_name} --configuration-metadata-store zk:zookeeper-0:2181/{cluster_name} --web-service-url {web_service_url} --broker-service-url {broker_service_url}\"
████████command: bash -c \"bin/apply-config-from-env.py conf/pulsar_env.sh; bin/apply-config-from-env.py conf/bookkeeper.conf; {init_pulsar_cluster_script} {init_bookkeeper_cluster_script} \"
████████environment:
████████████- clusterName={cluster_name}
████████████- metadataServiceUri={metadata_service_uri}
████████████- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=256m -XX:+ExitOnOutOfMemoryError
████████depends_on:
{depends_on_zookeeper_template}
████████deploy:
████████████resources:
████████████████limits:
████████████████████cpus: '2'
████████████████████memory: 512M
████████networks:
████████████- pulsar-net-{instance_name}
Expand Down Expand Up @@ -450,7 +473,6 @@ pub fn generate_post_cluster_create_job_template(
████████deploy:
████████████resources:
████████████████limits:
████████████████████cpus: '2'
████████████████████memory: 512M
████████networks:
████████████- pulsar-net-{instance_name}
Expand Down Expand Up @@ -490,7 +512,6 @@ pub fn generate_dekaf(instance_name: String, cluster_index: u32) -> Result<Dekaf
████████deploy:
████████████resources:
████████████████limits:
████████████████████cpus: '1'
████████████████████memory: 256M
████████networks:
████████████- pulsar-net-{instance_name}
Expand Down Expand Up @@ -522,13 +543,24 @@ pub fn generate_broker_template(
.collect::<Vec<String>>()
.join("\n");

let depends_on_bookie_template = format!("████████████bookie-{cluster_name}-0:\n████████████████condition: service_healthy");
let depends_on_bookies_template = (0..instance_config.num_bookies)
.map(|i| format!("████████████bookie-{cluster_name}-{i}:\n████████████████condition: service_healthy"))
.collect::<Vec<String>>()
.join("\n");

let metadata_store_url = (0..zookeepers_per_cluster)
.map(|i| format!("zk:zookeeper-{i}:2181"))
.collect::<Vec<String>>()
.join(",");

let bookkeeper_metadata_service_uri = format!(
"zk://{}",
(0..zookeepers_per_cluster)
.map(|i| format!("zookeeper-{i}:2181"))
.collect::<Vec<String>>()
.join(";")
) + &format!("/bookkeeper-{cluster_name}");

format! {"
████# Pulsar broker for cluster {cluster_name}
████broker-{cluster_name}-{broker_index}:
Expand All @@ -539,6 +571,7 @@ pub fn generate_broker_template(
████████████- clusterName={cluster_name}
████████████- metadataStoreUrl={metadata_store_url}
████████████- configurationMetadataStoreUrl={metadata_store_url}
████████████- bookkeeperMetadataServiceUri={bookkeeper_metadata_service_uri}
████████████- managedLedgerDefaultEnsembleSize={managed_ledger_default_ensemble_size}
████████████- managedLedgerDefaultWriteQuorum={managed_ledger_default_write_quorum}
████████████- managedLedgerDefaultAckQuorum={managed_ledger_default_ack_quorum}
Expand All @@ -552,11 +585,10 @@ pub fn generate_broker_template(
████████████retries: 20
████████depends_on:
{depends_on_zookeeper_template}
{depends_on_bookie_template}
{depends_on_bookies_template}
████████deploy:
████████████resources:
████████████████limits:
████████████████████cpus: '2'
████████████████████memory: 512M
████████networks:
████████████- pulsar-net-{instance_name}
Expand All @@ -572,18 +604,6 @@ pub fn generate_bookie_template(
bookie_index: u32,
) -> String {
let pulsar_version = instance_config.pulsar_version;
let depends_on_bookie: Option<u32> = if bookie_index == 0 {
None
} else {
Some(bookie_index - 1)
};

let depends_on_bookies_template = match depends_on_bookie {
Some(i) => format!(
"████████████bookie-{cluster_name}-{i}:\n████████████████condition: service_healthy"
),
None => "".to_string(),
};

let zookeepers_per_cluster = instance_config.num_zookeepers;

Expand All @@ -593,12 +613,12 @@ pub fn generate_bookie_template(
.join("\n");

let metadata_service_uri = format!(
"zk://{}",
"zk+hierarchical://{}",
(0..zookeepers_per_cluster)
.map(|i| format!("zookeeper-{i}:2181"))
.collect::<Vec<String>>()
.join(";")
) + "/ledgers";
) + &format!("/bookkeeper-{cluster_name}");

format! {"
████# Bookie for cluster {cluster_name}
Expand All @@ -614,24 +634,28 @@ pub fn generate_bookie_template(
████████████- BOOKIE_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=256m
████████████- dbStorage_writeCacheMaxSizeMb=16
████████████- dbStorage_readAheadCacheMaxSizeMb=16
████████████- dbStorage_rocksDB_writeBufferSizeMB=4
████████████- dbStorage_rocksDB_sstSizeInMB=4
████████████- dbStorage_rocksDB_blockSize=4096
████████████- dbStorage_rocksDB_bloomFilterBitsPerKey=10
████████████- dbStorage_rocksDB_numLevels=-1
████████████- dbStorage_rocksDB_numFilesInLevel0=4
████████████- dbStorage_rocksDB_maxSizeInLevel1MB=256
████████depends_on:
████████████pulsar-init-job-{cluster_name}:
████████████████condition: service_completed_successfully
{depends_on_zookeeper_template}
{depends_on_bookies_template}
████████command: bash -c \"set -e; bin/apply-config-from-env.py conf/bookkeeper.conf; bin/apply-config-from-env.py conf/pulsar_env.sh; if bin/bookkeeper shell whatisinstanceid; then echo bookkeeper_cluster_already_initialized; else echo init_new_bookkeeper_cluster_start; bin/bookkeeper shell initnewcluster; echo init_new_bookkeeper_cluster_end; fi; exec bin/pulsar bookie\"
████████command: bash -c \"bin/apply-config-from-env.py conf/bookkeeper.conf; bin/apply-config-from-env.py conf/pulsar_env.sh; exec bin/pulsar bookie\"
████████volumes:
████████████- bookie-data-{cluster_name}-{bookie_index}:/pulsar/data
████████healthcheck:
████████████test: [\"CMD\", \"/pulsar/bin/bookkeeper\", \"shell\", \"bookiesanity\"]
████████████interval: 10s
████████████timeout: 30s
████████████retries: 30
████████████start_period: 60s
████████deploy:
████████████resources:
████████████████limits:
████████████████████cpus: '2'
████████████████████memory: 512M
████████networks:
████████████- pulsar-net-{instance_name}
Expand Down
34 changes: 25 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,13 @@ pub struct LogsCommandArgs {
pub struct StartCommandArgs {
pub instance_name: Option<String>,

/// Follow container logs and not detach the process
/// Follow container logs
#[arg(long, default_value_t = false)]
pub debug: bool,
pub follow: bool,

/// Keep containers running even if instance start failed
#[arg(long, default_value_t = false)]
pub no_kill: bool,

/// Disable opening the browser after starting the instance
#[arg(long, default_value_t = false)]
Expand Down Expand Up @@ -631,8 +635,10 @@ fn start_cmd(args: StartCommandArgs) -> Result<InstanceOutput> {
"always",
];

if !args.debug {
if !args.follow {
docker_compose_args.push("--wait");
docker_compose_args.push("--wait-timeout");
docker_compose_args.push("600");
docker_compose_args.push("--detach");
}

Expand All @@ -653,10 +659,12 @@ fn start_cmd(args: StartCommandArgs) -> Result<InstanceOutput> {
println!("Pulsar instance \"{instance_name}\" {event_name} in {seconds_elapsed} seconds");

if !exit_status.success() {
stop_cmd(StopCommandArgs {
instance_name: Some(instance_name.clone()),
all: false,
})?;
if !args.no_kill {
stop_cmd(StopCommandArgs {
instance_name: Some(instance_name.clone()),
all: false,
})?;
}

println!();
println!("- If you see that some container in the \"Error\" state, check the logs using `docker logs <container_name>`");
Expand Down Expand Up @@ -883,13 +891,21 @@ fn main() -> Result<()> {
if !args.no_open_browser {
for cluster_output in instance_output.clusters {
if let Some(url) = cluster_output.dekaf_host_url.clone() {
println!("Opening Dekaf UI: {}", cluster_output.dekaf_host_url.clone().unwrap_or("".to_string()));
println!(
"Opening Dekaf UI: {}",
cluster_output
.dekaf_host_url
.clone()
.unwrap_or("".to_string())
);
webbrowser::open(&url).unwrap();
}
}
}

println!("See the `puls describe` command to display instance information again");
println!(
"See the `puls describe` command to display instance information again"
);
}
Err(err) => {
println!("{}", err);
Expand Down

0 comments on commit 844ca84

Please sign in to comment.