Skip to content

Commit

Permalink
Fix container tests with integration-harness feature for tremor-conne…
Browse files Browse the repository at this point in the history
…ctors

Signed-off-by: Darach Ennis <darach@gmail.com>
  • Loading branch information
darach committed May 14, 2024
1 parent e476b14 commit 4623e12
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 67 deletions.
15 changes: 5 additions & 10 deletions tremor-connectors/tests/clickhouse/more_complex_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ use chrono::DateTime;
use chrono_tz::Tz;
use clickhouse_rs::Pool;
use log::error;
use testcontainers::runners::AsyncRunner;
use testcontainers::{GenericImage, RunnableImage};
use tremor_common::ports::IN;
use tremor_connectors::{harness::Harness, impls::clickhouse};
use tremor_connectors_test_helpers::free_port;

use tremor_system::{
controlplane::CbAction,
event::{Event, EventId},
Expand All @@ -83,22 +83,17 @@ macro_rules! assert_row_equals {
#[allow(clippy::too_many_lines)]
#[tokio::test(flavor = "multi_thread")]
async fn test() -> Result<()> {
let docker = clients::Cli::default();

// The following lines spin up a regular ClickHouse container and wait for
// the database to be up and running.

let image = GenericImage::new(utils::CONTAINER_NAME, utils::CONTAINER_VERSION);
// We want to access the container from the host, so we need to make the
// corresponding port available.
let local = free_port::find_free_tcp_port().await?;
let port_to_expose = Port {
internal: utils::SERVER_PORT,
local,
};
let port_to_expose = (local, utils::SERVER_PORT);
let image = RunnableImage::from(image).with_mapped_port(port_to_expose);
let container = docker.run(image);
let port = container.get_host_port_ipv4(9000);
let container = image.start().await;
let port = container.get_host_port_ipv4(9000).await;
utils::wait_for_ok(port).await?;

// Once the database is available, we use the regular client to create the
Expand Down Expand Up @@ -360,7 +355,7 @@ async fn test() -> Result<()> {
);
}

container.stop();
container.stop().await;
Ok(())
}

Expand Down
14 changes: 5 additions & 9 deletions tremor-connectors/tests/clickhouse/simple_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::utils;
use clickhouse_rs::Pool;
use log::error;
use std::time::{Duration, Instant};
use testcontainers::runners::AsyncRunner;
use testcontainers::{GenericImage, RunnableImage};
use tremor_common::ports::IN;
use tremor_connectors::{harness::Harness, impls::clickhouse};
Expand All @@ -35,22 +36,17 @@ use tremor_value::literal;
// ensure that all the data was actually written.
#[tokio::test(flavor = "multi_thread")]
async fn simple_insertion() -> anyhow::Result<()> {
let docker = clients::Cli::default();

// The following lines spin up a regular ClickHouse container and wait for
// the database to be up and running.

let image = GenericImage::new(utils::CONTAINER_NAME, utils::CONTAINER_VERSION);
// We want to access the container from the host, so we need to make the
// corresponding port available.
let local = free_port::find_free_tcp_port().await?;
let port_to_expose = Port {
internal: utils::SERVER_PORT,
local,
};
let port_to_expose = (local, utils::SERVER_PORT);
let image = RunnableImage::from(image).with_mapped_port(port_to_expose);
let container = docker.run(image);
let port = container.get_host_port_ipv4(9000);
let container = image.start().await;
let port = container.get_host_port_ipv4(9000).await;
utils::wait_for_ok(port).await?;

// Once the database is available, we use the regular client to create the
Expand Down Expand Up @@ -174,7 +170,7 @@ async fn simple_insertion() -> anyhow::Result<()> {

assert_eq!(ages, [42, 101]);

container.stop();
container.stop().await;

Ok(())
}
Expand Down
40 changes: 18 additions & 22 deletions tremor-connectors/tests/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use log::error;
use serial_test::serial;
use std::path::Path;
use std::time::{Duration, Instant};
use testcontainers::core::Mount;
use testcontainers::runners::AsyncRunner;
use testcontainers::{core::WaitFor, GenericImage, RunnableImage};
use tokio::process;
use tremor_common::ports::IN;
Expand Down Expand Up @@ -62,7 +64,6 @@ fn default_image() -> GenericImage {
#[tokio::test(flavor = "multi_thread")]
#[serial(elastic)]
async fn connector_elastic() -> anyhow::Result<()> {
let docker = clients::Cli::default();
let port = find_free_tcp_port().await?;
let image = RunnableImage::from(
default_image()
Expand All @@ -71,8 +72,8 @@ async fn connector_elastic() -> anyhow::Result<()> {
)
.with_mapped_port((port, 9200_u16));

let container = docker.run(image);
let port = container.get_host_port_ipv4(9200);
let container = image.start().await;
let port = container.get_host_port_ipv4(9200).await;

// wait for the image to be reachable
let elastic = Elasticsearch::new(Transport::single_node(
Expand Down Expand Up @@ -423,7 +424,7 @@ async fn connector_elastic() -> anyhow::Result<()> {
);

// check what happens when ES isnt reachable
container.stop();
container.stop().await;

let event = Event {
id: EventId::new(0, 0, 2, 2),
Expand Down Expand Up @@ -477,7 +478,6 @@ async fn connector_elastic() -> anyhow::Result<()> {
#[tokio::test(flavor = "multi_thread")]
#[serial(elastic)]
async fn elastic_routing() -> anyhow::Result<()> {
let docker = clients::Cli::default();
let port = find_free_tcp_port().await?;
let image = RunnableImage::from(
default_image()
Expand All @@ -486,8 +486,8 @@ async fn elastic_routing() -> anyhow::Result<()> {
)
.with_mapped_port((port, 9200_u16));

let container = docker.run(image);
let port = container.get_host_port_ipv4(9200);
let container = image.start().await;
let port = container.get_host_port_ipv4(9200).await;

// wait for the image to be reachable
let elastic = Elasticsearch::new(Transport::single_node(
Expand Down Expand Up @@ -793,7 +793,6 @@ async fn elastic_routing() -> anyhow::Result<()> {
#[tokio::test(flavor = "multi_thread")]
#[serial(elastic)]
async fn auth_basic() -> anyhow::Result<()> {
let docker = clients::Cli::default();
let port = find_free_tcp_port().await?;
let password = "snot";
let image = RunnableImage::from(
Expand All @@ -803,8 +802,8 @@ async fn auth_basic() -> anyhow::Result<()> {
)
.with_mapped_port((port, 9200_u16));

let container = docker.run(image);
let port = container.get_host_port_ipv4(9200);
let container = image.start().await;
let port = container.get_host_port_ipv4(9200).await;
let conn_pool = SingleNodeConnectionPool::new(format!("http://127.0.0.1:{port}").parse()?);
let transport = TransportBuilder::new(conn_pool).auth(Credentials::Basic(
"elastic".to_string(),
Expand Down Expand Up @@ -849,7 +848,6 @@ async fn auth_basic() -> anyhow::Result<()> {
#[tokio::test(flavor = "multi_thread")]
#[serial(elastic)]
async fn auth_api_key() -> anyhow::Result<()> {
let docker = clients::Cli::default();
let port = find_free_tcp_port().await?;
let password = "snot";
let image = RunnableImage::from(
Expand All @@ -860,8 +858,8 @@ async fn auth_api_key() -> anyhow::Result<()> {
)
.with_mapped_port((port, 9200_u16));

let container = docker.run(image);
let port = container.get_host_port_ipv4(9200);
let container = image.start().await;
let port = container.get_host_port_ipv4(9200).await;
let conn_pool = SingleNodeConnectionPool::new(format!("http://127.0.0.1:{port}").parse()?);
let transport = TransportBuilder::new(conn_pool).auth(Credentials::Basic(
"elastic".to_string(),
Expand Down Expand Up @@ -933,7 +931,6 @@ async fn auth_client_cert() -> anyhow::Result<()> {
tmp.canonicalize()?
};

let docker = clients::Cli::default();
let port = find_free_tcp_port().await?;
let password = "snot";
let image = RunnableImage::from(
Expand All @@ -955,18 +952,18 @@ async fn auth_client_cert() -> anyhow::Result<()> {
"/usr/share/elasticsearch/config/certificates/localhost.cert",
),
)
.with_volume((
.with_mount(Mount::bind_mount(
tests_dir.display().to_string(),
"/usr/share/elasticsearch/config/certificates",
"/usr/share/elasticsearch/config/certificates".to_string(),
))
.with_mapped_port((port, 9200_u16));
let mut cafile = tests_dir.clone();
cafile.push("localhost.cert");
let mut keyfile = tests_dir.clone();
keyfile.push("localhost.key");

let container = docker.run(image);
let port = container.get_host_port_ipv4(9200);
let container = image.start().await;
let port = container.get_host_port_ipv4(9200).await;
let conn_pool = SingleNodeConnectionPool::new(format!("https://localhost:{port}").parse()?);
let ca = tokio::fs::read_to_string(&cafile).await?;
let mut cert = tokio::fs::read(&cafile).await?;
Expand Down Expand Up @@ -1065,7 +1062,6 @@ async fn elastic_https() -> anyhow::Result<()> {
tmp.canonicalize()?
};

let docker = clients::Cli::default();
let port = find_free_tcp_port().await?;
let password = "snot";
let image = RunnableImage::from(
Expand All @@ -1089,16 +1085,16 @@ async fn elastic_https() -> anyhow::Result<()> {
// .with_wait_for(WaitFor::message_on_stdout("[YELLOW] to [GREEN]")),
.with_wait_for(WaitFor::message_on_stdout("license mode is")),
)
.with_volume((
.with_mount(Mount::bind_mount(
tests_dir.display().to_string(),
"/usr/share/elasticsearch/config/certificates",
))
.with_mapped_port((port, 9200_u16));
let mut cafile = tests_dir.clone();
cafile.push("localhost.cert");

let container = docker.run(image);
let port = container.get_host_port_ipv4(9200);
let container = image.start().await;
let port = container.get_host_port_ipv4(9200).await;
let conn_pool = SingleNodeConnectionPool::new(format!("https://localhost:{port}").parse()?);
let ca = tokio::fs::read_to_string(&cafile).await?;
let transport = TransportBuilder::new(conn_pool).cert_validation(CertificateValidation::Full(
Expand Down
7 changes: 4 additions & 3 deletions tremor-connectors/tests/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ mod kafka {
mod producer;
}
use std::time::Duration;
use testcontainers::{core::WaitFor, Cli as DockerCli, GenericImage, RunnableImage};
use testcontainers::runners::AsyncRunner;
use testcontainers::{core::WaitFor, ContainerAsync, GenericImage, RunnableImage};
use tremor_connectors_test_helpers::free_port::find_free_tcp_port;

const IMAGE: &str = "vectorized/redpanda";
const VERSION: &str = "v22.1.7";
pub(crate) const PRODUCE_TIMEOUT: Duration = Duration::from_secs(5);

async fn redpanda_container(docker: &DockerCli) -> anyhow::Result<Container<GenericImage>> {
async fn redpanda_container() -> anyhow::Result<ContainerAsync<GenericImage>> {
let kafka_port = find_free_tcp_port().await?;
let args = vec![
"redpanda",
Expand Down Expand Up @@ -59,5 +60,5 @@ async fn redpanda_container(docker: &DockerCli) -> anyhow::Result<Container<Gene
// .with_mapped_port((free_port::find_free_tcp_port().await?, 8081_u16))
// .with_mapped_port((free_port::find_free_tcp_port().await?, 8082_u16))
.with_mapped_port((kafka_port, 9092_u16));
Ok(docker.run(image))
Ok(image.start().await)
}
31 changes: 12 additions & 19 deletions tremor-connectors/tests/kafka/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use rdkafka::{
use serial_test::serial;
use std::collections::HashMap;
use std::time::Duration;
use testcontainers::Cli as DockerCli;
use tokio::time::timeout;
use tremor_connectors::{harness::Harness, impls::kafka};
use tremor_connectors_test_helpers::free_port;
Expand All @@ -40,10 +39,9 @@ use crate::{redpanda_container, PRODUCE_TIMEOUT};
#[tokio::test(flavor = "multi_thread")]
// #[serial(kafka)]
async fn transactional_retry() -> anyhow::Result<()> {
let docker = DockerCli::default();
let container = redpanda_container(&docker).await?;
let container = redpanda_container().await?;

let port = container.get_host_port_ipv4(9092);
let port = container.get_host_port_ipv4(9092).await;
let broker = format!("127.0.0.1:{port}");
let topic = "tremor_test";
let group_id = "transactional_retry";
Expand Down Expand Up @@ -266,10 +264,9 @@ async fn transactional_retry() -> anyhow::Result<()> {
#[serial(kafka)]

async fn custom_no_retry() -> anyhow::Result<()> {
let docker = DockerCli::default();
let container = redpanda_container(&docker).await?;
let container = redpanda_container().await?;

let port = container.get_host_port_ipv4(9092);
let port = container.get_host_port_ipv4(9092).await;
let broker = format!("127.0.0.1:{port}");
let topic = "tremor_test_no_retry";
let group_id = "test1";
Expand Down Expand Up @@ -476,10 +473,9 @@ async fn custom_no_retry() -> anyhow::Result<()> {
#[serial(kafka)]

async fn performance() -> anyhow::Result<()> {
let docker = DockerCli::default();
let container = redpanda_container(&docker).await?;
let container = redpanda_container().await?;
let port = container.get_host_port_ipv4(9092).await;

let port = container.get_host_port_ipv4(9092);
let broker = format!("127.0.0.1:{port}");
let topic = "tremor_test_no_retry";
let group_id = "group123";
Expand Down Expand Up @@ -767,10 +763,9 @@ async fn invalid_rdkafka_options() -> anyhow::Result<()> {
#[tokio::test(flavor = "multi_thread")]
#[serial(kafka)]
async fn connector_kafka_consumer_pause_resume() -> anyhow::Result<()> {
let docker = DockerCli::default();
let container = redpanda_container(&docker).await?;
let container = redpanda_container().await?;

let port = container.get_host_port_ipv4(9092);
let port = container.get_host_port_ipv4(9092).await;

let broker = format!("127.0.0.1:{port}");
let topic = "tremor_test_pause_resume";
Expand Down Expand Up @@ -861,10 +856,9 @@ async fn connector_kafka_consumer_pause_resume() -> anyhow::Result<()> {
#[tokio::test(flavor = "multi_thread")]
#[serial(kafka)]
async fn transactional_store_offset_handling() -> anyhow::Result<()> {
let docker = DockerCli::default();
let container = redpanda_container(&docker).await?;
let container = redpanda_container().await?;

let port = container.get_host_port_ipv4(9092);
let port = container.get_host_port_ipv4(9092).await;

let broker = format!("127.0.0.1:{port}");
let topic = "tremor_test_store_offsets";
Expand Down Expand Up @@ -1062,10 +1056,9 @@ async fn transactional_store_offset_handling() -> anyhow::Result<()> {
#[tokio::test(flavor = "multi_thread")]
#[serial(kafka)]
async fn transactional_commit_offset_handling() -> anyhow::Result<()> {
let docker = DockerCli::default();
let container = redpanda_container(&docker).await?;
let container = redpanda_container().await?;

let port = container.get_host_port_ipv4(9092);
let port = container.get_host_port_ipv4(9092).await;
let broker = format!("127.0.0.1:{port}");
let topic = "tremor_test_commit_offset";
let group_id = "group_commit_offset";
Expand Down
6 changes: 2 additions & 4 deletions tremor-connectors/tests/kafka/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use rdkafka::{
};
use serial_test::serial;
use std::time::Duration;
use testcontainers::Cli as DockerCli;
use tokio::time::timeout;
use tremor_common::ports::IN;
use tremor_connectors::{harness::Harness, impls::kafka};
Expand All @@ -37,10 +36,9 @@ use tremor_value::literal;
#[tokio::test(flavor = "multi_thread")]
#[serial(kafka)]
async fn connector_kafka_producer() -> anyhow::Result<()> {
let docker = DockerCli::default();
let container = redpanda_container(&docker).await?;
let container = redpanda_container().await?;
let port = container.get_host_port_ipv4(9092).await;

let port = container.get_host_port_ipv4(9092);
let mut admin_config = ClientConfig::new();
let broker = format!("127.0.0.1:{port}");
let topic = "tremor_test";
Expand Down

0 comments on commit 4623e12

Please sign in to comment.