Skip to content

Commit

Permalink
feat(rust): added kafka-inlet command and relative config side
Browse files Browse the repository at this point in the history
  • Loading branch information
davide-baldo committed Apr 12, 2024
1 parent 8aa187e commit 12fe562
Show file tree
Hide file tree
Showing 17 changed files with 500 additions and 27 deletions.
11 changes: 7 additions & 4 deletions implementations/rust/ockam/ockam_api/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod secure_channel_map;

pub(crate) use inlet_controller::KafkaInletController;
use ockam::identity::Identifier;
use ockam_abac::expr::{eq, ident, str};
use ockam_abac::expr::{eq, ident, or, str};
use ockam_abac::Expr;
use ockam_abac::{ABAC_HAS_CREDENTIAL_KEY, ABAC_IDENTIFIER_KEY, SUBJECT_KEY};
use ockam_core::Address;
Expand All @@ -35,8 +35,11 @@ pub fn kafka_default_policy_expression() -> Expr {
}

pub fn kafka_policy_expression(project_identifier: &Identifier) -> Expr {
eq([
ident(format!("{}.{}", SUBJECT_KEY, ABAC_IDENTIFIER_KEY)),
str(project_identifier.to_string()),
or([
eq([
ident(format!("{}.{}", SUBJECT_KEY, ABAC_IDENTIFIER_KEY)),
str(project_identifier.to_string()),
]),
kafka_default_policy_expression(),
])
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ impl NodeManagerRelayCreator {
relay_service: MultiAddr,
alias: String,
) -> Result<()> {
let is_rust = !relay_service.starts_with(Project::CODE);
let is_rust = {
// we might create a relay in the producer passing through a project relay
!(relay_service.starts_with(Project::CODE) && relay_service.len() == 1)
};

let buffer: Vec<u8> = context
.send_and_receive(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ impl DefaultAddress {
pub const OKTA_IDENTITY_PROVIDER: &'static str = "okta";
pub const KAFKA_OUTLET: &'static str = "kafka_outlet";
pub const KAFKA_CONSUMER: &'static str = "kafka_consumer";
pub const KAFKA_INLET: &'static str = "kafka_inlet";
pub const KAFKA_PRODUCER: &'static str = "kafka_producer";
pub const KAFKA_DIRECT: &'static str = "kafka_direct";

Expand Down
189 changes: 189 additions & 0 deletions implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use async_trait::async_trait;
use std::net::SocketAddr;
use std::sync::Arc;

use clap::{command, Args};
use colorful::Colorful;
use miette::miette;
use ockam_api::colors::OckamColor;
use ockam_api::nodes::models::services::{
StartKafkaDirectRequest, StartKafkaRequest, StartServiceRequest,
};
use ockam_api::nodes::BackgroundNodeClient;
use ockam_api::{fmt_log, fmt_ok};
use tokio::sync::Mutex;
use tokio::try_join;

use ockam_api::port_range::PortRange;
use ockam_core::api::Request;
use ockam_multiaddr::MultiAddr;
use ockam_node::Context;

use crate::kafka::util::make_brokers_port_range;
use crate::node::util::initialize_default_node;
use crate::service::start::start_service_impl;
use crate::util::process_nodes_multiaddr;
use crate::{
kafka::{kafka_default_consumer_server, kafka_inlet_default_addr},
node::NodeOpts,
util::parsers::socket_addr_parser,
Command, CommandGlobalOpts,
};

/// Create a new Kafka Inlet. Kafka clients v3.7.0 and earlier are supported. You can find the version you have with 'kafka-topics.sh --version'.
#[derive(Clone, Debug, Args)]
pub struct CreateCommand {
#[command(flatten)]
pub node_opts: NodeOpts,
/// The local address of the service
#[arg(long, default_value_t = kafka_inlet_default_addr())]
pub addr: String,
/// The address where to bind and where the client will connect to alongside its port, <address>:<port>.
/// In case just a port is specified, the default loopback address (127.0.0.1:4000) will be used
#[arg(long, default_value_t = kafka_default_consumer_server(), value_parser = socket_addr_parser)]
pub from: SocketAddr,
/// The address of the kafka bootstrap broker, conflicts with --to
#[arg(long, required_unless_present = "to")]
pub bootstrap_server: Option<String>,
/// Local port range dynamically allocated to kafka brokers, must not overlap with the
/// bootstrap port
#[arg(long)]
pub brokers_port_range: Option<PortRange>,
/// The route to the Kafka outlet node, either the project in ockam orchestrator or a rust node, expected something like /project/<name>.
/// Conflicts with --bootstrap-server
#[arg(long, required_unless_present = "bootstrap_server", conflicts_with_all = ["bootstrap_server","consumer"])]
pub to: Option<MultiAddr>,
/// The route to the Kafka consumer node, valid only when --bootstrap-server is specified
#[arg(long, requires = "bootstrap_server")]
pub consumer: Option<MultiAddr>,
}

#[async_trait]
impl Command for CreateCommand {
const NAME: &'static str = "kafka-inlet create";

async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> {
let is_finished = Arc::new(Mutex::new(false));
initialize_default_node(ctx, &opts).await?;

let brokers_port_range = self
.brokers_port_range
.unwrap_or_else(|| make_brokers_port_range(&self.from));
let at_node = self.node_opts.at_node.clone();
let addr = self.addr.clone();

let direct_future;
let consumer_future;
if let Some(bootstrap_server) = self.bootstrap_server {
// TODO: allow arbitrary endpoints
let bootstrap_server = bootstrap_server.parse().unwrap();
let is_finished = is_finished.clone();

if self.to.is_some() {
return Err(miette!("Cannot specify both --bootstrap-server and --to").into());
}

// Creates a direct kafka service
consumer_future = None;
direct_future = Some(async move {
let node = BackgroundNodeClient::create(ctx, &opts.state, &at_node).await?;

let payload = StartKafkaDirectRequest::new(
self.from,
bootstrap_server,
brokers_port_range,
self.consumer.clone(),
);
let payload = StartServiceRequest::new(payload, &addr);
let req = Request::post("/node/services/kafka_direct").body(payload);
start_service_impl(ctx, &node, "KafkaDirect", req).await?;

*is_finished.lock().await = true;

Ok(())
});
} else if let Some(to) = self.to {
let is_finished = is_finished.clone();
let to = process_nodes_multiaddr(&to, &opts.state).await?;
let is_finished = is_finished.clone();

// Creates a Kafka consumer service (can also be used as a producer)
direct_future = None;
consumer_future = Some(async move {
let node = BackgroundNodeClient::create(ctx, &opts.state, &at_node).await?;

let payload = StartKafkaRequest::new(self.from, brokers_port_range, to);
let payload = StartServiceRequest::new(payload, &addr);
let req = Request::post("/node/services/kafka_consumer").body(payload);
start_service_impl(ctx, &node, "KafkaConsumer", req).await?;

*is_finished.lock().await = true;

Ok(())
});
} else {
return Err(miette!("Must specify either --bootstrap-server or --to").into());
};

opts.terminal
.write_line(&fmt_log!("Creating KafkaInlet service...\n"))?;

let msgs = vec![
format!(
"Building KafkaInlet service {}",
&self
.addr
.to_string()
.color(OckamColor::PrimaryResource.color())
),
format!(
"Creating KafkaInlet service at {}",
&self
.from
.to_string()
.color(OckamColor::PrimaryResource.color())
),
format!(
"Setting brokers port range to {}",
&brokers_port_range
.to_string()
.color(OckamColor::PrimaryResource.color())
),
];
let progress_output = opts.terminal.progress_output(&msgs, &is_finished);
if let Some(direct_future) = direct_future {
let (_, _) = try_join!(direct_future, progress_output)?;
} else {
let (_, _) = try_join!(consumer_future.unwrap(), progress_output)?;
}

opts.terminal
.stdout()
.plain(
fmt_ok!(
"KafkaInlet service started at {}\n",
&self
.from
.to_string()
.color(OckamColor::PrimaryResource.color())
) + &fmt_log!(
"Brokers port range set to {}\n\n",
&brokers_port_range
.to_string()
.color(OckamColor::PrimaryResource.color())
) + &fmt_log!(
"{}\n",
"Kafka clients v3.7.0 and earlier are supported."
.color(OckamColor::FmtWARNBackground.color())
) + &fmt_log!(
"{}: '{}'.\n",
"You can find the version you have with"
.color(OckamColor::FmtWARNBackground.color()),
"kafka-topics.sh --version".color(OckamColor::Success.color())
),
)
.write_line()?;

Ok(())
}
}
33 changes: 33 additions & 0 deletions implementations/rust/ockam/ockam_command/src/kafka/inlet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use clap::{command, Args, Subcommand};

use crate::kafka::inlet::create::CreateCommand;
use crate::{Command, CommandGlobalOpts};

pub(crate) mod create;

/// Manage Kafka Inlets
#[derive(Clone, Debug, Args)]
#[command(arg_required_else_help = true, subcommand_required = true)]
pub struct KafkaInletCommand {
#[command(subcommand)]
pub(crate) subcommand: KafkaInletSubcommand,
}

#[derive(Clone, Debug, Subcommand)]
pub enum KafkaInletSubcommand {
Create(CreateCommand),
}

impl KafkaInletCommand {
pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> {
match self.subcommand {
KafkaInletSubcommand::Create(c) => c.run(opts),
}
}

pub fn name(&self) -> String {
match &self.subcommand {
KafkaInletSubcommand::Create(c) => c.name(),
}
}
}
5 changes: 5 additions & 0 deletions implementations/rust/ockam/ockam_command/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use ockam_multiaddr::MultiAddr;

pub(crate) mod consumer;
pub(crate) mod direct;
pub(crate) mod inlet;
pub(crate) mod outlet;
pub(crate) mod producer;
pub(crate) mod util;
Expand All @@ -22,6 +23,10 @@ fn kafka_consumer_default_addr() -> String {
DefaultAddress::KAFKA_CONSUMER.to_string()
}

fn kafka_inlet_default_addr() -> String {
DefaultAddress::KAFKA_INLET.to_string()
}

fn kafka_direct_default_addr() -> String {
DefaultAddress::KAFKA_DIRECT.to_string()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use async_trait::async_trait;
use std::net::SocketAddr;

use clap::{command, Args};
Expand All @@ -13,39 +14,31 @@ use ockam_api::{fmt_log, fmt_ok};
use ockam_core::api::Request;

use crate::node::util::initialize_default_node;
use crate::util::async_cmd;
use crate::{
kafka::{kafka_default_outlet_addr, kafka_default_outlet_server},
node::NodeOpts,
service::start::start_service_impl,
CommandGlobalOpts,
Command, CommandGlobalOpts,
};

/// Create a new Kafka Outlet
#[derive(Clone, Debug, Args)]
pub struct CreateCommand {
#[command(flatten)]
node_opts: NodeOpts,
pub node_opts: NodeOpts,
/// The local address of the service
#[arg(long, default_value_t = kafka_default_outlet_addr())]
addr: String,
pub addr: String,
/// The address of the kafka bootstrap broker
#[arg(long, default_value_t = kafka_default_outlet_server())]
bootstrap_server: SocketAddr,
pub bootstrap_server: SocketAddr,
}

impl CreateCommand {
pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> {
async_cmd(&self.name(), opts.clone(), |ctx| async move {
self.async_run(&ctx, opts).await
})
}

pub fn name(&self) -> String {
"create kafka outlet".into()
}
#[async_trait]
impl Command for CreateCommand {
const NAME: &'static str = "kafka-outlet create";

async fn async_run(&self, ctx: &Context, opts: CommandGlobalOpts) -> miette::Result<()> {
async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> {
initialize_default_node(ctx, &opts).await?;
opts.terminal
.write_line(&fmt_log!("Creating KafkaOutlet service"))?;
Expand Down
10 changes: 4 additions & 6 deletions implementations/rust/ockam/ockam_command/src/kafka/outlet/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use clap::{command, Args, Subcommand};

use crate::CommandGlobalOpts;
pub(crate) mod create;

use self::create::CreateCommand;

mod create;
use crate::{Command, CommandGlobalOpts};
use clap::{command, Args, Subcommand};

/// Manage Kafka Outlets
#[derive(Clone, Debug, Args)]
#[command(arg_required_else_help = true, subcommand_required = true)]
pub struct KafkaOutletCommand {
#[command(subcommand)]
subcommand: KafkaOutletSubcommand,
pub(crate) subcommand: KafkaOutletSubcommand,
}

#[derive(Clone, Debug, Subcommand)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ pub struct NodeConfig {
#[serde(flatten)]
pub tcp_inlets: TcpInlets,
#[serde(flatten)]
pub kafka_inlet: KafkaInlet,
#[serde(flatten)]
pub kafka_outlet: KafkaOutlet,
#[serde(flatten)]
pub relays: Relays,
}

Expand Down Expand Up @@ -123,6 +127,8 @@ impl NodeConfig {
self.policies.parse_commands(overrides)?.into(),
self.tcp_outlets.parse_commands(overrides)?.into(),
self.tcp_inlets.parse_commands(overrides)?.into(),
self.kafka_inlet.parse_commands(overrides)?.into(),
self.kafka_outlet.parse_commands(overrides)?.into(),
];

// Run commands
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
name: kafka-portal

kafka-inlet:
from: 127.0.0.1:19092
to: /secure/api

kafka-outlet:
bootstrap-server: 127.0.0.1:9092
Loading

0 comments on commit 12fe562

Please sign in to comment.