diff --git a/tremor-connectors/tests/clickhouse/more_complex_test.rs b/tremor-connectors/tests/clickhouse/more_complex_test.rs index 8350512b69..a026f66a38 100644 --- a/tremor-connectors/tests/clickhouse/more_complex_test.rs +++ b/tremor-connectors/tests/clickhouse/more_complex_test.rs @@ -55,11 +55,11 @@ use chrono::DateTime; use chrono_tz::Tz; use clickhouse_rs::Pool; use log::error; +use testcontainers::runners::AsyncRunner; use testcontainers::{GenericImage, RunnableImage}; use tremor_common::ports::IN; use tremor_connectors::{harness::Harness, impls::clickhouse}; use tremor_connectors_test_helpers::free_port; - use tremor_system::{ controlplane::CbAction, event::{Event, EventId}, @@ -83,8 +83,6 @@ macro_rules! assert_row_equals { #[allow(clippy::too_many_lines)] #[tokio::test(flavor = "multi_thread")] async fn test() -> Result<()> { - let docker = clients::Cli::default(); - // The following lines spin up a regular ClickHouse container and wait for // the database to be up and running. @@ -92,13 +90,10 @@ async fn test() -> Result<()> { // We want to access the container from the host, so we need to make the // corresponding port available. let local = free_port::find_free_tcp_port().await?; - let port_to_expose = Port { - internal: utils::SERVER_PORT, - local, - }; + let port_to_expose = (local, utils::SERVER_PORT); let image = RunnableImage::from(image).with_mapped_port(port_to_expose); - let container = docker.run(image); - let port = container.get_host_port_ipv4(9000); + let container = image.start().await; + let port = container.get_host_port_ipv4(9000).await; utils::wait_for_ok(port).await?; // Once the database is available, we use the regular client to create the @@ -360,7 +355,7 @@ async fn test() -> Result<()> { ); } - container.stop(); + container.stop().await; Ok(()) } diff --git a/tremor-connectors/tests/clickhouse/simple_test.rs b/tremor-connectors/tests/clickhouse/simple_test.rs index 6aefb6e8f5..fb56f6b553 100644 --- a/tremor-connectors/tests/clickhouse/simple_test.rs +++ b/tremor-connectors/tests/clickhouse/simple_test.rs @@ -19,6 +19,7 @@ use super::utils; use clickhouse_rs::Pool; use log::error; use std::time::{Duration, Instant}; +use testcontainers::runners::AsyncRunner; use testcontainers::{GenericImage, RunnableImage}; use tremor_common::ports::IN; use tremor_connectors::{harness::Harness, impls::clickhouse}; @@ -35,8 +36,6 @@ use tremor_value::literal; // ensure that all the data was actually written. #[tokio::test(flavor = "multi_thread")] async fn simple_insertion() -> anyhow::Result<()> { - let docker = clients::Cli::default(); - // The following lines spin up a regular ClickHouse container and wait for // the database to be up and running. @@ -44,13 +43,10 @@ async fn simple_insertion() -> anyhow::Result<()> { // We want to access the container from the host, so we need to make the // corresponding port available. let local = free_port::find_free_tcp_port().await?; - let port_to_expose = Port { - internal: utils::SERVER_PORT, - local, - }; + let port_to_expose = (local, utils::SERVER_PORT); let image = RunnableImage::from(image).with_mapped_port(port_to_expose); - let container = docker.run(image); - let port = container.get_host_port_ipv4(9000); + let container = image.start().await; + let port = container.get_host_port_ipv4(9000).await; utils::wait_for_ok(port).await?; // Once the database is available, we use the regular client to create the @@ -174,7 +170,7 @@ async fn simple_insertion() -> anyhow::Result<()> { assert_eq!(ages, [42, 101]); - container.stop(); + container.stop().await; Ok(()) } diff --git a/tremor-connectors/tests/elastic.rs b/tremor-connectors/tests/elastic.rs index d7dab46057..fa5bb5f58b 100644 --- a/tremor-connectors/tests/elastic.rs +++ b/tremor-connectors/tests/elastic.rs @@ -29,6 +29,8 @@ use log::error; use serial_test::serial; use std::path::Path; use std::time::{Duration, Instant}; +use testcontainers::core::Mount; +use testcontainers::runners::AsyncRunner; use testcontainers::{core::WaitFor, GenericImage, RunnableImage}; use tokio::process; use tremor_common::ports::IN; @@ -62,7 +64,6 @@ fn default_image() -> GenericImage { #[tokio::test(flavor = "multi_thread")] #[serial(elastic)] async fn connector_elastic() -> anyhow::Result<()> { - let docker = clients::Cli::default(); let port = find_free_tcp_port().await?; let image = RunnableImage::from( default_image() @@ -71,8 +72,8 @@ async fn connector_elastic() -> anyhow::Result<()> { ) .with_mapped_port((port, 9200_u16)); - let container = docker.run(image); - let port = container.get_host_port_ipv4(9200); + let container = image.start().await; + let port = container.get_host_port_ipv4(9200).await; // wait for the image to be reachable let elastic = Elasticsearch::new(Transport::single_node( @@ -423,7 +424,7 @@ async fn connector_elastic() -> anyhow::Result<()> { ); // check what happens when ES isnt reachable - container.stop(); + container.stop().await; let event = Event { id: EventId::new(0, 0, 2, 2), @@ -477,7 +478,6 @@ async fn connector_elastic() -> anyhow::Result<()> { #[tokio::test(flavor = "multi_thread")] #[serial(elastic)] async fn elastic_routing() -> anyhow::Result<()> { - let docker = clients::Cli::default(); let port = find_free_tcp_port().await?; let image = RunnableImage::from( default_image() @@ -486,8 +486,8 @@ async fn elastic_routing() -> anyhow::Result<()> { ) .with_mapped_port((port, 9200_u16)); - let container = docker.run(image); - let port = container.get_host_port_ipv4(9200); + let container = image.start().await; + let port = container.get_host_port_ipv4(9200).await; // wait for the image to be reachable let elastic = Elasticsearch::new(Transport::single_node( @@ -793,7 +793,6 @@ async fn elastic_routing() -> anyhow::Result<()> { #[tokio::test(flavor = "multi_thread")] #[serial(elastic)] async fn auth_basic() -> anyhow::Result<()> { - let docker = clients::Cli::default(); let port = find_free_tcp_port().await?; let password = "snot"; let image = RunnableImage::from( @@ -803,8 +802,8 @@ async fn auth_basic() -> anyhow::Result<()> { ) .with_mapped_port((port, 9200_u16)); - let container = docker.run(image); - let port = container.get_host_port_ipv4(9200); + let container = image.start().await; + let port = container.get_host_port_ipv4(9200).await; let conn_pool = SingleNodeConnectionPool::new(format!("http://127.0.0.1:{port}").parse()?); let transport = TransportBuilder::new(conn_pool).auth(Credentials::Basic( "elastic".to_string(), @@ -849,7 +848,6 @@ async fn auth_basic() -> anyhow::Result<()> { #[tokio::test(flavor = "multi_thread")] #[serial(elastic)] async fn auth_api_key() -> anyhow::Result<()> { - let docker = clients::Cli::default(); let port = find_free_tcp_port().await?; let password = "snot"; let image = RunnableImage::from( @@ -860,8 +858,8 @@ async fn auth_api_key() -> anyhow::Result<()> { ) .with_mapped_port((port, 9200_u16)); - let container = docker.run(image); - let port = container.get_host_port_ipv4(9200); + let container = image.start().await; + let port = container.get_host_port_ipv4(9200).await; let conn_pool = SingleNodeConnectionPool::new(format!("http://127.0.0.1:{port}").parse()?); let transport = TransportBuilder::new(conn_pool).auth(Credentials::Basic( "elastic".to_string(), @@ -933,7 +931,6 @@ async fn auth_client_cert() -> anyhow::Result<()> { tmp.canonicalize()? }; - let docker = clients::Cli::default(); let port = find_free_tcp_port().await?; let password = "snot"; let image = RunnableImage::from( @@ -955,9 +952,9 @@ async fn auth_client_cert() -> anyhow::Result<()> { "/usr/share/elasticsearch/config/certificates/localhost.cert", ), ) - .with_volume(( + .with_mount(Mount::bind_mount( tests_dir.display().to_string(), - "/usr/share/elasticsearch/config/certificates", + "/usr/share/elasticsearch/config/certificates".to_string(), )) .with_mapped_port((port, 9200_u16)); let mut cafile = tests_dir.clone(); @@ -965,8 +962,8 @@ async fn auth_client_cert() -> anyhow::Result<()> { let mut keyfile = tests_dir.clone(); keyfile.push("localhost.key"); - let container = docker.run(image); - let port = container.get_host_port_ipv4(9200); + let container = image.start().await; + let port = container.get_host_port_ipv4(9200).await; let conn_pool = SingleNodeConnectionPool::new(format!("https://localhost:{port}").parse()?); let ca = tokio::fs::read_to_string(&cafile).await?; let mut cert = tokio::fs::read(&cafile).await?; @@ -1065,7 +1062,6 @@ async fn elastic_https() -> anyhow::Result<()> { tmp.canonicalize()? }; - let docker = clients::Cli::default(); let port = find_free_tcp_port().await?; let password = "snot"; let image = RunnableImage::from( @@ -1089,7 +1085,7 @@ async fn elastic_https() -> anyhow::Result<()> { // .with_wait_for(WaitFor::message_on_stdout("[YELLOW] to [GREEN]")), .with_wait_for(WaitFor::message_on_stdout("license mode is")), ) - .with_volume(( + .with_mount(Mount::bind_mount( tests_dir.display().to_string(), "/usr/share/elasticsearch/config/certificates", )) @@ -1097,8 +1093,8 @@ async fn elastic_https() -> anyhow::Result<()> { let mut cafile = tests_dir.clone(); cafile.push("localhost.cert"); - let container = docker.run(image); - let port = container.get_host_port_ipv4(9200); + let container = image.start().await; + let port = container.get_host_port_ipv4(9200).await; let conn_pool = SingleNodeConnectionPool::new(format!("https://localhost:{port}").parse()?); let ca = tokio::fs::read_to_string(&cafile).await?; let transport = TransportBuilder::new(conn_pool).cert_validation(CertificateValidation::Full( diff --git a/tremor-connectors/tests/kafka.rs b/tremor-connectors/tests/kafka.rs index a2c8b3003a..16a2629acc 100644 --- a/tremor-connectors/tests/kafka.rs +++ b/tremor-connectors/tests/kafka.rs @@ -19,14 +19,15 @@ mod kafka { mod producer; } use std::time::Duration; -use testcontainers::{core::WaitFor, Cli as DockerCli, GenericImage, RunnableImage}; +use testcontainers::runners::AsyncRunner; +use testcontainers::{core::WaitFor, ContainerAsync, GenericImage, RunnableImage}; use tremor_connectors_test_helpers::free_port::find_free_tcp_port; const IMAGE: &str = "vectorized/redpanda"; const VERSION: &str = "v22.1.7"; pub(crate) const PRODUCE_TIMEOUT: Duration = Duration::from_secs(5); -async fn redpanda_container(docker: &DockerCli) -> anyhow::Result> { +async fn redpanda_container() -> anyhow::Result> { let kafka_port = find_free_tcp_port().await?; let args = vec![ "redpanda", @@ -59,5 +60,5 @@ async fn redpanda_container(docker: &DockerCli) -> anyhow::Result anyhow::Result<()> { - let docker = DockerCli::default(); - let container = redpanda_container(&docker).await?; + let container = redpanda_container().await?; - let port = container.get_host_port_ipv4(9092); + let port = container.get_host_port_ipv4(9092).await; let broker = format!("127.0.0.1:{port}"); let topic = "tremor_test"; let group_id = "transactional_retry"; @@ -266,10 +264,9 @@ async fn transactional_retry() -> anyhow::Result<()> { #[serial(kafka)] async fn custom_no_retry() -> anyhow::Result<()> { - let docker = DockerCli::default(); - let container = redpanda_container(&docker).await?; + let container = redpanda_container().await?; - let port = container.get_host_port_ipv4(9092); + let port = container.get_host_port_ipv4(9092).await; let broker = format!("127.0.0.1:{port}"); let topic = "tremor_test_no_retry"; let group_id = "test1"; @@ -476,10 +473,9 @@ async fn custom_no_retry() -> anyhow::Result<()> { #[serial(kafka)] async fn performance() -> anyhow::Result<()> { - let docker = DockerCli::default(); - let container = redpanda_container(&docker).await?; + let container = redpanda_container().await?; + let port = container.get_host_port_ipv4(9092).await; - let port = container.get_host_port_ipv4(9092); let broker = format!("127.0.0.1:{port}"); let topic = "tremor_test_no_retry"; let group_id = "group123"; @@ -767,10 +763,9 @@ async fn invalid_rdkafka_options() -> anyhow::Result<()> { #[tokio::test(flavor = "multi_thread")] #[serial(kafka)] async fn connector_kafka_consumer_pause_resume() -> anyhow::Result<()> { - let docker = DockerCli::default(); - let container = redpanda_container(&docker).await?; + let container = redpanda_container().await?; - let port = container.get_host_port_ipv4(9092); + let port = container.get_host_port_ipv4(9092).await; let broker = format!("127.0.0.1:{port}"); let topic = "tremor_test_pause_resume"; @@ -861,10 +856,9 @@ async fn connector_kafka_consumer_pause_resume() -> anyhow::Result<()> { #[tokio::test(flavor = "multi_thread")] #[serial(kafka)] async fn transactional_store_offset_handling() -> anyhow::Result<()> { - let docker = DockerCli::default(); - let container = redpanda_container(&docker).await?; + let container = redpanda_container().await?; - let port = container.get_host_port_ipv4(9092); + let port = container.get_host_port_ipv4(9092).await; let broker = format!("127.0.0.1:{port}"); let topic = "tremor_test_store_offsets"; @@ -1062,10 +1056,9 @@ async fn transactional_store_offset_handling() -> anyhow::Result<()> { #[tokio::test(flavor = "multi_thread")] #[serial(kafka)] async fn transactional_commit_offset_handling() -> anyhow::Result<()> { - let docker = DockerCli::default(); - let container = redpanda_container(&docker).await?; + let container = redpanda_container().await?; - let port = container.get_host_port_ipv4(9092); + let port = container.get_host_port_ipv4(9092).await; let broker = format!("127.0.0.1:{port}"); let topic = "tremor_test_commit_offset"; let group_id = "group_commit_offset"; diff --git a/tremor-connectors/tests/kafka/producer.rs b/tremor-connectors/tests/kafka/producer.rs index 7741dcb190..bfe5456d62 100644 --- a/tremor-connectors/tests/kafka/producer.rs +++ b/tremor-connectors/tests/kafka/producer.rs @@ -25,7 +25,6 @@ use rdkafka::{ }; use serial_test::serial; use std::time::Duration; -use testcontainers::Cli as DockerCli; use tokio::time::timeout; use tremor_common::ports::IN; use tremor_connectors::{harness::Harness, impls::kafka}; @@ -37,10 +36,9 @@ use tremor_value::literal; #[tokio::test(flavor = "multi_thread")] #[serial(kafka)] async fn connector_kafka_producer() -> anyhow::Result<()> { - let docker = DockerCli::default(); - let container = redpanda_container(&docker).await?; + let container = redpanda_container().await?; + let port = container.get_host_port_ipv4(9092).await; - let port = container.get_host_port_ipv4(9092); let mut admin_config = ClientConfig::new(); let broker = format!("127.0.0.1:{port}"); let topic = "tremor_test";