diff --git a/implementations/rust/ockam/ockam_api/src/kafka/mod.rs b/implementations/rust/ockam/ockam_api/src/kafka/mod.rs index 59470dd55c2..cd5d245e613 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/mod.rs @@ -13,7 +13,7 @@ mod secure_channel_map; pub(crate) use inlet_controller::KafkaInletController; use ockam::identity::Identifier; -use ockam_abac::expr::{eq, ident, str}; +use ockam_abac::expr::{eq, ident, or, str}; use ockam_abac::Expr; use ockam_abac::{ABAC_HAS_CREDENTIAL_KEY, ABAC_IDENTIFIER_KEY, SUBJECT_KEY}; use ockam_core::Address; @@ -35,8 +35,11 @@ pub fn kafka_default_policy_expression() -> Expr { } pub fn kafka_policy_expression(project_identifier: &Identifier) -> Expr { - eq([ - ident(format!("{}.{}", SUBJECT_KEY, ABAC_IDENTIFIER_KEY)), - str(project_identifier.to_string()), + or([ + eq([ + ident(format!("{}.{}", SUBJECT_KEY, ABAC_IDENTIFIER_KEY)), + str(project_identifier.to_string()), + ]), + kafka_default_policy_expression(), ]) } diff --git a/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map.rs b/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map.rs index 21722d1b267..c15ea1eef89 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map.rs @@ -90,7 +90,10 @@ impl NodeManagerRelayCreator { relay_service: MultiAddr, alias: String, ) -> Result<()> { - let is_rust = !relay_service.starts_with(Project::CODE); + let is_rust = { + // we might create a relay in the producer passing through a project relay + !(relay_service.starts_with(Project::CODE) && relay_service.len() == 1) + }; let buffer: Vec = context .send_and_receive( diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs index 60c2fb27b68..f63d62e0c27 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs @@ -14,6 +14,7 @@ impl DefaultAddress { pub const OKTA_IDENTITY_PROVIDER: &'static str = "okta"; pub const KAFKA_OUTLET: &'static str = "kafka_outlet"; pub const KAFKA_CONSUMER: &'static str = "kafka_consumer"; + pub const KAFKA_INLET: &'static str = "kafka_inlet"; pub const KAFKA_PRODUCER: &'static str = "kafka_producer"; pub const KAFKA_DIRECT: &'static str = "kafka_direct"; diff --git a/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs new file mode 100644 index 00000000000..365b447215c --- /dev/null +++ b/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs @@ -0,0 +1,189 @@ +use async_trait::async_trait; +use std::net::SocketAddr; +use std::sync::Arc; + +use clap::{command, Args}; +use colorful::Colorful; +use miette::miette; +use ockam_api::colors::OckamColor; +use ockam_api::nodes::models::services::{ + StartKafkaDirectRequest, StartKafkaRequest, StartServiceRequest, +}; +use ockam_api::nodes::BackgroundNodeClient; +use ockam_api::{fmt_log, fmt_ok}; +use tokio::sync::Mutex; +use tokio::try_join; + +use ockam_api::port_range::PortRange; +use ockam_core::api::Request; +use ockam_multiaddr::MultiAddr; +use ockam_node::Context; + +use crate::kafka::util::make_brokers_port_range; +use crate::node::util::initialize_default_node; +use crate::service::start::start_service_impl; +use crate::util::process_nodes_multiaddr; +use crate::{ + kafka::{kafka_default_consumer_server, kafka_inlet_default_addr}, + node::NodeOpts, + util::parsers::socket_addr_parser, + Command, CommandGlobalOpts, +}; + +/// Create a new Kafka Inlet. Kafka clients v3.7.0 and earlier are supported. You can find the version you have with 'kafka-topics.sh --version'. +#[derive(Clone, Debug, Args)] +pub struct CreateCommand { + #[command(flatten)] + pub node_opts: NodeOpts, + /// The local address of the service + #[arg(long, default_value_t = kafka_inlet_default_addr())] + pub addr: String, + /// The address where to bind and where the client will connect to alongside its port,
:. + /// In case just a port is specified, the default loopback address (127.0.0.1:4000) will be used + #[arg(long, default_value_t = kafka_default_consumer_server(), value_parser = socket_addr_parser)] + pub from: SocketAddr, + /// The address of the kafka bootstrap broker, conflicts with --to + #[arg(long, required_unless_present = "to")] + pub bootstrap_server: Option, + /// Local port range dynamically allocated to kafka brokers, must not overlap with the + /// bootstrap port + #[arg(long)] + pub brokers_port_range: Option, + /// The route to the Kafka outlet node, either the project in ockam orchestrator or a rust node, expected something like /project/. + /// Conflicts with --bootstrap-server + #[arg(long, required_unless_present = "bootstrap_server", conflicts_with_all = ["bootstrap_server","consumer"])] + pub to: Option, + /// The route to the Kafka consumer node, valid only when --bootstrap-server is specified + #[arg(long, requires = "bootstrap_server")] + pub consumer: Option, +} + +#[async_trait] +impl Command for CreateCommand { + const NAME: &'static str = "kafka-inlet create"; + + async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { + let is_finished = Arc::new(Mutex::new(false)); + initialize_default_node(ctx, &opts).await?; + + let brokers_port_range = self + .brokers_port_range + .unwrap_or_else(|| make_brokers_port_range(&self.from)); + let at_node = self.node_opts.at_node.clone(); + let addr = self.addr.clone(); + + let direct_future; + let consumer_future; + if let Some(bootstrap_server) = self.bootstrap_server { + // TODO: allow arbitrary endpoints + let bootstrap_server = bootstrap_server.parse().unwrap(); + let is_finished = is_finished.clone(); + + if self.to.is_some() { + return Err(miette!("Cannot specify both --bootstrap-server and --to").into()); + } + + // Creates a direct kafka service + consumer_future = None; + direct_future = Some(async move { + let node = BackgroundNodeClient::create(ctx, &opts.state, &at_node).await?; + + let payload = StartKafkaDirectRequest::new( + self.from, + bootstrap_server, + brokers_port_range, + self.consumer.clone(), + ); + let payload = StartServiceRequest::new(payload, &addr); + let req = Request::post("/node/services/kafka_direct").body(payload); + start_service_impl(ctx, &node, "KafkaDirect", req).await?; + + *is_finished.lock().await = true; + + Ok(()) + }); + } else if let Some(to) = self.to { + let is_finished = is_finished.clone(); + let to = process_nodes_multiaddr(&to, &opts.state).await?; + let is_finished = is_finished.clone(); + + // Creates a Kafka consumer service (can also be used as a producer) + direct_future = None; + consumer_future = Some(async move { + let node = BackgroundNodeClient::create(ctx, &opts.state, &at_node).await?; + + let payload = StartKafkaRequest::new(self.from, brokers_port_range, to); + let payload = StartServiceRequest::new(payload, &addr); + let req = Request::post("/node/services/kafka_consumer").body(payload); + start_service_impl(ctx, &node, "KafkaConsumer", req).await?; + + *is_finished.lock().await = true; + + Ok(()) + }); + } else { + return Err(miette!("Must specify either --bootstrap-server or --to").into()); + }; + + opts.terminal + .write_line(&fmt_log!("Creating KafkaInlet service...\n"))?; + + let msgs = vec![ + format!( + "Building KafkaInlet service {}", + &self + .addr + .to_string() + .color(OckamColor::PrimaryResource.color()) + ), + format!( + "Creating KafkaInlet service at {}", + &self + .from + .to_string() + .color(OckamColor::PrimaryResource.color()) + ), + format!( + "Setting brokers port range to {}", + &brokers_port_range + .to_string() + .color(OckamColor::PrimaryResource.color()) + ), + ]; + let progress_output = opts.terminal.progress_output(&msgs, &is_finished); + if let Some(direct_future) = direct_future { + let (_, _) = try_join!(direct_future, progress_output)?; + } else { + let (_, _) = try_join!(consumer_future.unwrap(), progress_output)?; + } + + opts.terminal + .stdout() + .plain( + fmt_ok!( + "KafkaInlet service started at {}\n", + &self + .from + .to_string() + .color(OckamColor::PrimaryResource.color()) + ) + &fmt_log!( + "Brokers port range set to {}\n\n", + &brokers_port_range + .to_string() + .color(OckamColor::PrimaryResource.color()) + ) + &fmt_log!( + "{}\n", + "Kafka clients v3.7.0 and earlier are supported." + .color(OckamColor::FmtWARNBackground.color()) + ) + &fmt_log!( + "{}: '{}'.\n", + "You can find the version you have with" + .color(OckamColor::FmtWARNBackground.color()), + "kafka-topics.sh --version".color(OckamColor::Success.color()) + ), + ) + .write_line()?; + + Ok(()) + } +} diff --git a/implementations/rust/ockam/ockam_command/src/kafka/inlet/mod.rs b/implementations/rust/ockam/ockam_command/src/kafka/inlet/mod.rs new file mode 100644 index 00000000000..87fa0a5587c --- /dev/null +++ b/implementations/rust/ockam/ockam_command/src/kafka/inlet/mod.rs @@ -0,0 +1,33 @@ +use clap::{command, Args, Subcommand}; + +use crate::kafka::inlet::create::CreateCommand; +use crate::{Command, CommandGlobalOpts}; + +pub(crate) mod create; + +/// Manage Kafka Inlets +#[derive(Clone, Debug, Args)] +#[command(arg_required_else_help = true, subcommand_required = true)] +pub struct KafkaInletCommand { + #[command(subcommand)] + pub(crate) subcommand: KafkaInletSubcommand, +} + +#[derive(Clone, Debug, Subcommand)] +pub enum KafkaInletSubcommand { + Create(CreateCommand), +} + +impl KafkaInletCommand { + pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> { + match self.subcommand { + KafkaInletSubcommand::Create(c) => c.run(opts), + } + } + + pub fn name(&self) -> String { + match &self.subcommand { + KafkaInletSubcommand::Create(c) => c.name(), + } + } +} diff --git a/implementations/rust/ockam/ockam_command/src/kafka/mod.rs b/implementations/rust/ockam/ockam_command/src/kafka/mod.rs index 2913deb6a2d..5348d5adc14 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/mod.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/mod.rs @@ -5,6 +5,7 @@ use ockam_multiaddr::MultiAddr; pub(crate) mod consumer; pub(crate) mod direct; +pub(crate) mod inlet; pub(crate) mod outlet; pub(crate) mod producer; pub(crate) mod util; @@ -22,6 +23,10 @@ fn kafka_consumer_default_addr() -> String { DefaultAddress::KAFKA_CONSUMER.to_string() } +fn kafka_inlet_default_addr() -> String { + DefaultAddress::KAFKA_INLET.to_string() +} + fn kafka_direct_default_addr() -> String { DefaultAddress::KAFKA_DIRECT.to_string() } diff --git a/implementations/rust/ockam/ockam_command/src/kafka/outlet/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/outlet/create.rs index fadb8363212..0ecaf1e99ac 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/outlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/outlet/create.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use std::net::SocketAddr; use clap::{command, Args}; @@ -13,39 +14,31 @@ use ockam_api::{fmt_log, fmt_ok}; use ockam_core::api::Request; use crate::node::util::initialize_default_node; -use crate::util::async_cmd; use crate::{ kafka::{kafka_default_outlet_addr, kafka_default_outlet_server}, node::NodeOpts, service::start::start_service_impl, - CommandGlobalOpts, + Command, CommandGlobalOpts, }; /// Create a new Kafka Outlet #[derive(Clone, Debug, Args)] pub struct CreateCommand { #[command(flatten)] - node_opts: NodeOpts, + pub node_opts: NodeOpts, /// The local address of the service #[arg(long, default_value_t = kafka_default_outlet_addr())] - addr: String, + pub addr: String, /// The address of the kafka bootstrap broker #[arg(long, default_value_t = kafka_default_outlet_server())] - bootstrap_server: SocketAddr, + pub bootstrap_server: SocketAddr, } -impl CreateCommand { - pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> { - async_cmd(&self.name(), opts.clone(), |ctx| async move { - self.async_run(&ctx, opts).await - }) - } - - pub fn name(&self) -> String { - "create kafka outlet".into() - } +#[async_trait] +impl Command for CreateCommand { + const NAME: &'static str = "kafka-outlet create"; - async fn async_run(&self, ctx: &Context, opts: CommandGlobalOpts) -> miette::Result<()> { + async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { initialize_default_node(ctx, &opts).await?; opts.terminal .write_line(&fmt_log!("Creating KafkaOutlet service"))?; diff --git a/implementations/rust/ockam/ockam_command/src/kafka/outlet/mod.rs b/implementations/rust/ockam/ockam_command/src/kafka/outlet/mod.rs index 342ec6a54cc..ac88cf171e9 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/outlet/mod.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/outlet/mod.rs @@ -1,17 +1,15 @@ -use clap::{command, Args, Subcommand}; - -use crate::CommandGlobalOpts; +pub(crate) mod create; use self::create::CreateCommand; - -mod create; +use crate::{Command, CommandGlobalOpts}; +use clap::{command, Args, Subcommand}; /// Manage Kafka Outlets #[derive(Clone, Debug, Args)] #[command(arg_required_else_help = true, subcommand_required = true)] pub struct KafkaOutletCommand { #[command(subcommand)] - subcommand: KafkaOutletSubcommand, + pub(crate) subcommand: KafkaOutletSubcommand, } #[derive(Clone, Debug, Subcommand)] diff --git a/implementations/rust/ockam/ockam_command/src/node/create/config.rs b/implementations/rust/ockam/ockam_command/src/node/create/config.rs index e8e284e63f7..f40e47b5f22 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/config.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/config.rs @@ -53,6 +53,10 @@ pub struct NodeConfig { #[serde(flatten)] pub tcp_inlets: TcpInlets, #[serde(flatten)] + pub kafka_inlet: KafkaInlet, + #[serde(flatten)] + pub kafka_outlet: KafkaOutlet, + #[serde(flatten)] pub relays: Relays, } @@ -123,6 +127,8 @@ impl NodeConfig { self.policies.parse_commands(overrides)?.into(), self.tcp_outlets.parse_commands(overrides)?.into(), self.tcp_inlets.parse_commands(overrides)?.into(), + self.kafka_inlet.parse_commands(overrides)?.into(), + self.kafka_outlet.parse_commands(overrides)?.into(), ]; // Run commands diff --git a/implementations/rust/ockam/ockam_command/src/node/create/demo_config_files/2.kafka.single-machine.yaml b/implementations/rust/ockam/ockam_command/src/node/create/demo_config_files/2.kafka.single-machine.yaml new file mode 100644 index 00000000000..08465917269 --- /dev/null +++ b/implementations/rust/ockam/ockam_command/src/node/create/demo_config_files/2.kafka.single-machine.yaml @@ -0,0 +1,8 @@ +name: kafka-portal + +kafka-inlet: + from: 127.0.0.1:19092 + to: /secure/api + +kafka-outlet: + bootstrap-server: 127.0.0.1:9092 diff --git a/implementations/rust/ockam/ockam_command/src/run/config.rs b/implementations/rust/ockam/ockam_command/src/run/config.rs index c4554eb442a..34f704b5d47 100644 --- a/implementations/rust/ockam/ockam_command/src/run/config.rs +++ b/implementations/rust/ockam/ockam_command/src/run/config.rs @@ -34,6 +34,10 @@ pub struct Config { #[serde(flatten)] pub tcp_inlets: TcpInlets, #[serde(flatten)] + pub kafka_inlet: KafkaInlet, + #[serde(flatten)] + pub kafka_outlet: KafkaOutlet, + #[serde(flatten)] pub relays: Relays, } @@ -65,6 +69,8 @@ impl Config { self.policies.parse_commands(overrides)?.into(), self.tcp_outlets.parse_commands(overrides)?.into(), self.tcp_inlets.parse_commands(overrides)?.into(), + self.kafka_inlet.parse_commands(overrides)?.into(), + self.kafka_outlet.parse_commands(overrides)?.into(), ]; // Run commands @@ -135,6 +141,16 @@ mod tests { ti2: from: 6061 + kafka-inlet: + from: 9092 + at: n + to: /project/project_name + port-range: 1000-2000 + + kafka-outlet: + bootstrap-server: 192.168.1.1:9092 + at: n + relays: - r1 - r2 @@ -259,6 +275,28 @@ mod tests { .collect::>(), })), }, + kafka_inlet: KafkaInlet { + kafka_inlet: Some(UnnamedResources::Single(Args { + args: vec![ + ("from".to_string(), "9092".into()), + ("at".to_string(), "n".into()), + ("to".to_string(), "/project/project_name".into()), + ("port-range".to_string(), "1000-2000".into()), + ] + .into_iter() + .collect(), + })), + }, + kafka_outlet: KafkaOutlet { + kafka_outlet: Some(UnnamedResources::Single(Args { + args: vec![ + ("bootstrap-server".to_string(), "192.168.1.1:9092".into()), + ("at".to_string(), "n".into()), + ] + .into_iter() + .collect(), + })), + }, relays: Relays { relays: Some(ResourcesContainer::List(vec![ ResourceNameOrMap::Name("r1".to_string()), @@ -303,6 +341,8 @@ mod tests { policies: Policies { policies: None }, tcp_outlets: TcpOutlets { tcp_outlets: None }, tcp_inlets: TcpInlets { tcp_inlets: None }, + kafka_inlet: KafkaInlet { kafka_inlet: None }, + kafka_outlet: KafkaOutlet { kafka_outlet: None }, relays: Relays { relays: None }, }; assert_eq!(expected, parsed); diff --git a/implementations/rust/ockam/ockam_command/src/run/demo_config_files/4.kafka.inlet.yaml b/implementations/rust/ockam/ockam_command/src/run/demo_config_files/4.kafka.inlet.yaml new file mode 100644 index 00000000000..363773107e0 --- /dev/null +++ b/implementations/rust/ockam/ockam_command/src/run/demo_config_files/4.kafka.inlet.yaml @@ -0,0 +1,6 @@ +ticket: kafka.ticket +nodes: kafka_inlet_node + +kafka-inlet: + from: 127.0.0.1:9092 + to: /dnsaddr/ockam-node/tcp/6000/secure/api diff --git a/implementations/rust/ockam/ockam_command/src/run/demo_config_files/4.kafka.outlet.yaml b/implementations/rust/ockam/ockam_command/src/run/demo_config_files/4.kafka.outlet.yaml new file mode 100644 index 00000000000..7d24b346b04 --- /dev/null +++ b/implementations/rust/ockam/ockam_command/src/run/demo_config_files/4.kafka.outlet.yaml @@ -0,0 +1,5 @@ +ticket: kafka.ticket +nodes: kafka + +kafka-outlet: + bootstrap-server: 127.0.0.1:9092 diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_inlet.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_inlet.rs new file mode 100644 index 00000000000..ec89f6ab859 --- /dev/null +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_inlet.rs @@ -0,0 +1,98 @@ +use crate::kafka::inlet::create::CreateCommand; +use crate::run::parser::building_blocks::{ArgsToCommands, UnnamedResources}; +use crate::run::parser::resource::traits::CommandsParser; +use crate::run::parser::resource::utils::parse_cmd_from_args; +use crate::run::parser::resource::ValuesOverrides; +use crate::{kafka::inlet, Command, OckamSubcommand}; +use async_trait::async_trait; +use miette::{miette, Result}; +use ockam_api::colors::color_primary; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct KafkaInlet { + #[serde(alias = "kafka-inlet")] + pub kafka_inlet: Option, +} + +impl KafkaInlet { + fn get_subcommand(args: &[String]) -> Result { + if let OckamSubcommand::KafkaInlet(cmd) = parse_cmd_from_args(CreateCommand::NAME, args)? { + #[allow(irrefutable_let_patterns)] + if let inlet::KafkaInletSubcommand::Create(c) = cmd.subcommand { + return Ok(c); + } + } + Err(miette!(format!( + "Failed to parse {} command", + color_primary(CreateCommand::NAME) + ))) + } +} + +#[async_trait] +impl CommandsParser for KafkaInlet { + fn parse_commands(self, overrides: &ValuesOverrides) -> Result> { + match self.kafka_inlet { + Some(c) => { + let mut cmds = c.into_commands(Self::get_subcommand)?; + if let Some(node_name) = overrides.override_node_name.as_ref() { + for cmd in cmds.iter_mut() { + cmd.node_opts.at_node = Some(node_name.clone()) + } + } + Ok(cmds) + } + None => Ok(vec![]), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ockam_core::env::FromString; + use ockam_multiaddr::MultiAddr; + use std::net::SocketAddr; + use std::str::FromStr; + + #[test] + fn kafka_inlet_config() { + let named = r#" + kafka-inlet: + from: 127.0.0.1:9092 + to: /project/default + at: node_name + "#; + let parsed: KafkaInlet = serde_yaml::from_str(named).unwrap(); + let cmds = parsed.parse_commands(&ValuesOverrides::default()).unwrap(); + assert_eq!(cmds.len(), 1); + assert_eq!( + cmds[0].from, + SocketAddr::from_str("127.0.0.1:9092").unwrap() + ); + assert_eq!( + cmds[0].to.as_ref().unwrap(), + &MultiAddr::from_string("/project/default").unwrap(), + ); + assert_eq!(cmds[0].node_opts.at_node.as_ref().unwrap(), "node_name"); + + let unnamed = r#" + kafka-inlet: + bootstrap-server: my-kafka.example.com:9092 + consumer: /dnsaddr/kafka-outlet.local/tcp/5000 + "#; + let parsed: KafkaInlet = serde_yaml::from_str(unnamed).unwrap(); + let cmds = parsed.parse_commands(&ValuesOverrides::default()).unwrap(); + assert_eq!(cmds.len(), 1); + assert_eq!( + cmds[0].bootstrap_server.as_ref().unwrap(), + "my-kafka.example.com:9092" + ); + assert_eq!( + cmds[0].consumer.as_ref().unwrap(), + &MultiAddr::from_string("/dnsaddr/kafka-outlet.local/tcp/5000").unwrap(), + ); + assert!(cmds[0].node_opts.at_node.is_none()); + } +} diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_outlet.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_outlet.rs new file mode 100644 index 00000000000..4d63100cdf8 --- /dev/null +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_outlet.rs @@ -0,0 +1,76 @@ +use async_trait::async_trait; +use miette::{miette, Result}; +use ockam_api::colors::color_primary; +use serde::{Deserialize, Serialize}; + +use crate::kafka::outlet; +use crate::kafka::outlet::create::CreateCommand; +use crate::run::parser::building_blocks::{ArgsToCommands, UnnamedResources}; +use crate::run::parser::resource::traits::CommandsParser; +use crate::run::parser::resource::utils::parse_cmd_from_args; +use crate::run::parser::resource::ValuesOverrides; +use crate::{Command, OckamSubcommand}; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct KafkaOutlet { + #[serde(alias = "kafka-outlet")] + pub kafka_outlet: Option, +} + +impl KafkaOutlet { + fn get_subcommand(args: &[String]) -> Result { + if let OckamSubcommand::KafkaOutlet(cmd) = parse_cmd_from_args(CreateCommand::NAME, args)? { + #[allow(irrefutable_let_patterns)] + if let outlet::KafkaOutletSubcommand::Create(c) = cmd.subcommand { + return Ok(c); + } + } + Err(miette!(format!( + "Failed to parse {} command", + color_primary(CreateCommand::NAME) + ))) + } +} + +#[async_trait] +impl CommandsParser for KafkaOutlet { + fn parse_commands(self, overrides: &ValuesOverrides) -> Result> { + match self.kafka_outlet { + Some(c) => { + let mut cmds = c.into_commands(Self::get_subcommand)?; + if let Some(node_name) = overrides.override_node_name.as_ref() { + for cmd in cmds.iter_mut() { + cmd.node_opts.at_node = Some(node_name.clone()) + } + } + Ok(cmds) + } + None => Ok(vec![]), + } + } +} + +#[cfg(test)] +mod tests { + use std::net::SocketAddr; + use std::str::FromStr; + + use super::*; + + #[test] + fn kafka_outlet_config() { + let named = r#" + kafka-outlet: + bootstrap-server: 192.168.0.100:9092 + at: node_name + "#; + let parsed: KafkaOutlet = serde_yaml::from_str(named).unwrap(); + let cmds = parsed.parse_commands(&ValuesOverrides::default()).unwrap(); + assert_eq!(cmds.len(), 1); + assert_eq!( + cmds[0].bootstrap_server, + SocketAddr::from_str("192.168.0.100:9092").unwrap() + ); + assert_eq!(cmds[0].node_opts.at_node.as_ref().unwrap(), "node_name"); + } +} diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/mod.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/mod.rs index 68528b34698..34ac7a1ee9e 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/mod.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/mod.rs @@ -1,4 +1,6 @@ pub use identities::Identities; +pub use kafka_inlet::KafkaInlet; +pub use kafka_outlet::KafkaOutlet; pub use node::Node; pub use nodes::Nodes; pub use policies::Policies; @@ -10,6 +12,8 @@ pub use traits::*; pub use vaults::Vaults; mod identities; +mod kafka_inlet; +mod kafka_outlet; mod node; mod nodes; mod policies; diff --git a/implementations/rust/ockam/ockam_command/src/subcommand.rs b/implementations/rust/ockam/ockam_command/src/subcommand.rs index 91484415224..4f80676835d 100644 --- a/implementations/rust/ockam/ockam_command/src/subcommand.rs +++ b/implementations/rust/ockam/ockam_command/src/subcommand.rs @@ -23,6 +23,7 @@ use crate::flow_control::FlowControlCommand; use crate::identity::IdentityCommand; use crate::kafka::consumer::KafkaConsumerCommand; use crate::kafka::direct::KafkaDirectCommand; +use crate::kafka::inlet::KafkaInletCommand; use crate::kafka::outlet::KafkaOutletCommand; use crate::kafka::producer::KafkaProducerCommand; use crate::lease::LeaseCommand; @@ -81,7 +82,9 @@ pub enum OckamSubcommand { TcpOutlet(TcpOutletCommand), TcpInlet(TcpInletCommand), + KafkaInlet(KafkaInletCommand), KafkaOutlet(KafkaOutletCommand), + KafkaConsumer(KafkaConsumerCommand), KafkaDirect(KafkaDirectCommand), KafkaProducer(KafkaProducerCommand), @@ -135,6 +138,7 @@ impl OckamSubcommand { OckamSubcommand::TcpOutlet(c) => c.run(opts), OckamSubcommand::TcpInlet(c) => c.run(opts), + OckamSubcommand::KafkaInlet(c) => c.run(opts), OckamSubcommand::KafkaConsumer(c) => c.run(opts), OckamSubcommand::KafkaProducer(c) => c.run(opts), OckamSubcommand::KafkaDirect(c) => c.run(opts), @@ -263,6 +267,7 @@ impl OckamSubcommand { OckamSubcommand::TcpConnection(c) => c.name(), OckamSubcommand::TcpOutlet(c) => c.name(), OckamSubcommand::TcpInlet(c) => c.name(), + OckamSubcommand::KafkaInlet(c) => c.name(), OckamSubcommand::KafkaOutlet(c) => c.name(), OckamSubcommand::KafkaConsumer(c) => c.name(), OckamSubcommand::KafkaDirect(c) => c.name(),