Skip to content

Commit

Permalink
test: add integration test using Kafka endpoint and secret
Browse files Browse the repository at this point in the history
(cherry picked from commit bb5711c)
  • Loading branch information
remibaptistegio authored and phiz71 committed Jan 15, 2025
1 parent 5a29f26 commit 9d5d67e
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,52 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.graviteesource.entrypoint.http.get.HttpGetEntrypointConnectorFactory;
import com.graviteesource.secretprovider.hcvault.HCVaultSecretProvider;
import com.graviteesource.secretprovider.hcvault.HCVaultSecretProviderFactory;
import com.graviteesource.secretprovider.hcvault.config.manager.VaultConfig;
import com.graviteesource.service.secrets.SecretsService;
import io.github.jopenlibs.vault.VaultException;
import io.gravitee.apim.gateway.tests.sdk.annotations.DeployApi;
import io.gravitee.apim.gateway.tests.sdk.annotations.GatewayTest;
import io.gravitee.apim.gateway.tests.sdk.configuration.GatewayConfigurationBuilder;
import io.gravitee.apim.gateway.tests.sdk.connector.EntrypointBuilder;
import io.gravitee.apim.gateway.tests.sdk.policy.PolicyBuilder;
import io.gravitee.apim.gateway.tests.sdk.secrets.SecretProviderBuilder;
import io.gravitee.apim.integration.tests.messages.AbstractKafkaEndpointIntegrationTest;
import io.gravitee.apim.integration.tests.secrets.SecuredVaultContainer;
import io.gravitee.common.http.MediaType;
import io.gravitee.common.service.AbstractService;
import io.gravitee.gateway.api.http.HttpHeaderNames;
import io.gravitee.gateway.reactive.api.qos.Qos;
import io.gravitee.node.secrets.plugins.SecretProviderPlugin;
import io.gravitee.plugin.entrypoint.EntrypointConnectorPlugin;
import io.gravitee.plugin.policy.PolicyPlugin;
import io.gravitee.policy.assignattributes.AssignAttributesPolicy;
import io.gravitee.policy.assignattributes.configuration.AssignAttributesPolicyConfiguration;
import io.gravitee.secrets.api.plugin.SecretManagerConfiguration;
import io.gravitee.secrets.api.plugin.SecretProviderFactory;
import io.reactivex.rxjava3.core.Single;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.core.http.HttpClient;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayNameGeneration;
import org.junit.jupiter.api.DisplayNameGenerator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.vault.VaultContainer;

/**
* @author Yann TAVERNIER (yann.tavernier at graviteesource.com)
Expand All @@ -52,6 +73,18 @@
@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
class HttpGetEntrypointKafkaEndpointIntegrationTest extends AbstractKafkaEndpointIntegrationTest {

private static final String VAULT_TOKEN = UUID.randomUUID().toString();

@org.testcontainers.junit.jupiter.Container
protected static final VaultContainer vaultContainer = new VaultContainer<>("hashicorp/vault:1.13.3")
.withVaultToken(VAULT_TOKEN)
.dependsOn(kafka);

@AfterAll
static void cleanup() {
vaultContainer.close();
}

@Override
public void configureEntrypoints(Map<String, EntrypointConnectorPlugin<?, ?>> entrypoints) {
entrypoints.putIfAbsent("http-get", EntrypointBuilder.build("http-get", HttpGetEntrypointConnectorFactory.class));
Expand All @@ -67,6 +100,49 @@ public void configurePolicies(Map<String, PolicyPlugin> policies) {
);
}

@Override
public void configureGateway(GatewayConfigurationBuilder configurationBuilder) {
super.configureGateway(configurationBuilder);

// create a renewable token so the plugin does not start panicking
Container.ExecResult execResult;
try {
execResult = vaultContainer.execInContainer("vault", "token", "create", "-period=10m", "-field", "token");
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
String token = execResult.getStdout();

configurationBuilder.setYamlProperty("api.secrets.providers[0].plugin", "vault");
configurationBuilder.setYamlProperty("api.secrets.providers[0].configuration.enabled", true);
configurationBuilder.setYamlProperty("api.secrets.providers[0].configuration.host", vaultContainer.getHost());
configurationBuilder.setYamlProperty("api.secrets.providers[0].configuration.port", vaultContainer.getMappedPort(8200));
configurationBuilder.setYamlProperty("api.secrets.providers[0].configuration.ssl.enabled", "false");
configurationBuilder.setYamlProperty("api.secrets.providers[0].configuration.auth.method", "token");
configurationBuilder.setYamlProperty("api.secrets.providers[0].configuration.auth.config.token", token);

try {
vaultContainer.execInContainer("vault", "kv", "put", "secret/kafka", "bootstrap=" + kafka.getBootstrapServers());
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public void configureSecretProviders(
Set<SecretProviderPlugin<? extends SecretProviderFactory<?>, ? extends SecretManagerConfiguration>> secretProviderPlugins
) {
secretProviderPlugins.add(
SecretProviderBuilder.build(HCVaultSecretProvider.PLUGIN_ID, HCVaultSecretProviderFactory.class, VaultConfig.class)
);
}

@Override
public void configureServices(Set<Class<? extends AbstractService<?>>> services) {
super.configureServices(services);
services.add(SecretsService.class);
}

@Test
@DeployApi({ "/apis/v4/messages/http-get/http-get-entrypoint-kafka-endpoint.json" })
void should_receive_all_messages(HttpClient client, Vertx vertx) {
Expand Down Expand Up @@ -287,6 +363,44 @@ void should_interrupt_when_constraint_violated_with_dynamic_configuration(HttpCl
});
}

@Test
@DeployApi({ "/apis/v4/messages/http-get/http-get-entrypoint-kafka-endpoint-secret.json" })
void should_receive_all_messages_with_bootstrap_servers_from_vault_secret(HttpClient client, Vertx vertx) {
Single
.fromCallable(() -> getKafkaProducer(vertx))
.flatMapCompletable(producer ->
publishToKafka(producer, "message1").andThen(publishToKafka(producer, "message2")).doFinally(producer::close)
)
.blockingAwait();

client
.rxRequest(HttpMethod.GET, "/test/test-secret")
.flatMap(request -> {
request.putHeader(HttpHeaderNames.ACCEPT.toString(), MediaType.APPLICATION_JSON);
return request.send();
})
.flatMap(response -> {
assertThat(response.statusCode()).isEqualTo(200);
return response.body();
})
.test()
.awaitDone(30, TimeUnit.SECONDS)
.assertValue(body -> {
final JsonObject jsonResponse = new JsonObject(body.toString());
final JsonArray items = jsonResponse.getJsonArray("items");
assertThat(items).hasSize(2);
final JsonObject message1 = items.getJsonObject(0);
assertThat(message1.getString("id")).isNull();
assertThat(message1.getJsonObject("metadata").getString("topic")).isEqualTo(TEST_TOPIC);
assertThat(message1.getString("content")).isEqualTo("message1");
final JsonObject message2 = items.getJsonObject(1);
assertThat(message2.getString("id")).isNull();
assertThat(message2.getJsonObject("metadata").getString("topic")).isEqualTo(TEST_TOPIC);
assertThat(message2.getString("content")).isEqualTo("message2");
return true;
});
}

@EnumSource(value = Qos.class, names = { "AT_MOST_ONCE", "AT_LEAST_ONCE" })
@ParameterizedTest(name = "should receive all messages with {0} qos")
@DeployApi(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
{
"id": "http-get-entrypoint-kafka-endpoint-secret",
"name": "my-api-secret-conf",
"apiVersion": "1.0",
"definitionVersion": "4.0.0",
"type": "message",
"description": "api v4 using HTTP-GET entrypoint",
"listeners": [
{
"type": "http",
"paths": [
{
"path": "/test"
}
],
"entrypoints": [
{
"type": "http-get",
"configuration": {
"messagesLimitCount": 3,
"messagesLimitDurationMs": 10000,
"headersInPayload": true,
"metadataInPayload": true
}
}
]
}
],
"endpointGroups": [
{
"name": "default-group",
"type": "kafka",
"endpoints": [
{
"name": "default",
"type": "kafka",
"weight": 1,
"inheritConfiguration": false,
"configuration": {
"bootstrapServers": "{#secrets.get('/vault/secret/kafka:bootstrap')}"
},
"sharedConfigurationOverride": {
"consumer": {
"enabled": true,
"topics": ["test-topic"],
"autoOffsetReset": "earliest"
}
}
}
]
}
],
"flows": [],
"analytics": {
"enabled": false
}
}

0 comments on commit 9d5d67e

Please sign in to comment.