Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions examples/http.kafka.avro.json/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# http.kafka.avro.json

This example illustrates how to configure the Karapace Schema Registry in Zilla to validate messages during produce and fetch to a Kafka topic.
It also includes a demonstration of how to use basic authentication for the Schema Registry.

## Requirements

Expand All @@ -18,6 +19,8 @@ docker compose up -d

```bash
curl 'http://localhost:8081/subjects/items-snapshots-value/versions' \
--basic \
--user 'user1:password1' \
--header 'Content-Type: application/json' \
--data '{
"schema":
Expand All @@ -35,11 +38,15 @@ output:
## Validate created Schema

```bash
curl 'http://localhost:8081/schemas/ids/1'
curl 'http://localhost:8081/schemas/ids/1' \
--basic \
--user 'user1:password1'
```

```bash
curl 'http://localhost:8081/subjects/items-snapshots-value/versions/latest'
curl 'http://localhost:8081/subjects/items-snapshots-value/versions/latest' \
--basic \
--user 'user1:password1'
```

## Verify behavior for a valid event
Expand Down
17 changes: 11 additions & 6 deletions examples/http.kafka.avro.json/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ services:
restart: unless-stopped
hostname: zilla.examples.dev
ports:
- 7114:7114
- 7314:7114
healthcheck:
interval: 5s
timeout: 3s
Expand All @@ -14,6 +14,8 @@ services:
environment:
KAFKA_BOOTSTRAP_SERVER: kafka.examples.dev:29092
SCHEMA_REGISTRY_SERVER: http://karapace-registry:8081
SCHEMA_REGISTRY_USERNAME: user1
SCHEMA_REGISTRY_PASSWORD: password1
ZILLA_INCUBATOR_ENABLED: "true"
volumes:
- ./etc:/etc/zilla
Expand All @@ -24,7 +26,7 @@ services:
restart: unless-stopped
hostname: kafka.examples.dev
ports:
- 9092:9092
- 9392:9092
healthcheck:
test: /opt/bitnami/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server kafka.examples.dev:29092 || exit 1
interval: 1s
Expand Down Expand Up @@ -70,7 +72,7 @@ services:
image: ghcr.io/kafbat/kafka-ui:v1.0.0
restart: unless-stopped
ports:
- 8080:8080
- 8380:8080
depends_on:
kafka:
condition: service_healthy
Expand All @@ -90,19 +92,22 @@ services:
build:
context: ..
dockerfile: container/Dockerfile
entrypoint:
entrypoint:
- /bin/bash
- /opt/karapace/start.sh
- /opt/karapace/start.sh
- registry
volumes:
- ./users.json:/tmp/users.json:ro
depends_on:
- kafka
ports:
- 8081:8081
- 8381:8081
environment:
KARAPACE_ADVERTISED_HOSTNAME: karapace-registry
KARAPACE_BOOTSTRAP_URI: kafka.examples.dev:29092
KARAPACE_PORT: 8081
KARAPACE_HOST: 0.0.0.0
KARAPACE_REGISTRY_AUTHFILE: /tmp/users.json
KARAPACE_CLIENT_ID: karapace
KARAPACE_GROUP_ID: karapace-registry
KARAPACE_MASTER_ELIGIBILITY: "true"
Expand Down
4 changes: 4 additions & 0 deletions examples/http.kafka.avro.json/etc/zilla.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ catalogs:
type: schema-registry
options:
url: ${{env.SCHEMA_REGISTRY_SERVER}}
credentials:
basic:
username: ${{env.SCHEMA_REGISTRY_USERNAME}}
password: ${{env.SCHEMA_REGISTRY_PASSWORD}}
context: default
bindings:
north_tcp_server:
Expand Down
17 changes: 17 additions & 0 deletions examples/http.kafka.avro.json/users.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"users": [
{
"username": "user1",
"algorithm": "sha512",
"salt": "KHOXN_9AmhX17BaUT1CPww",
"password_hash": "N9AReOAyqHYuCXZ4w5hTr7BIj6NguPfl1EoMZaRqOWD\/\/jnBlRL6V3cDnTYF5ZEaVKKZu76PNnYnq8HJBDz9xQ=="
}
],
"permissions": [
{
"username": "user1",
"operation": "Write",
"resource": ".*"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ public static <T> KarapaceOptionsConfigBuilder<T> builder(
List<String> keys,
List<String> trust,
boolean trustcacerts,
String authorization)
String authorization,
String username,
String password)
{
super(url, context, maxAge, keys, trust, trustcacerts, authorization);
super(url, context, maxAge, keys, trust, trustcacerts, authorization, username, password);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,18 @@ protected KarapaceOptionsConfig newOptionsConfig(
List<String> keys,
List<String> trust,
boolean trustcacerts,
String authorization)
String authorization,
String username,
String password)
{
return new KarapaceOptionsConfig(url, context, maxAge, keys, trust, trustcacerts, authorization);
return new KarapaceOptionsConfig(url,
context,
maxAge,
keys,
trust,
trustcacerts,
authorization,
username,
password);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public abstract class AbstractSchemaRegistryOptionsConfigAdapter<T extends Abstr
private static final String AUTHORIZATION_NAME = "authorization";
private static final String AUTHORIZATION_CREDENTIALS_NAME = "credentials";
private static final String AUTHORIZATION_CREDENTIALS_HEADERS_NAME = "headers";
private static final String AUTHORIZATION_CREDENTIALS_BASIC_NAME = "basic";
private static final String AUTHORIZATION_CREDENTIALS_BASIC_USERNAME_NAME = "username";
private static final String AUTHORIZATION_CREDENTIALS_BASIC_PASSWORD_NAME = "password";
private static final String TLS_NAME = "tls";

private final String type;
Expand Down Expand Up @@ -187,8 +190,22 @@ public OptionsConfig adaptFromJson(
JsonObject credentials = object.getJsonObject(AUTHORIZATION_CREDENTIALS_NAME);

JsonObject headers = credentials.getJsonObject(AUTHORIZATION_CREDENTIALS_HEADERS_NAME);
JsonObject basic = credentials.getJsonObject(AUTHORIZATION_CREDENTIALS_BASIC_NAME);

options.authorization(headers.getString(AUTHORIZATION_NAME));

if (headers != null)
{
String authorization = headers.getString(AUTHORIZATION_NAME, null);
options.authorization(authorization);
}

if (basic != null)
{
String username = basic.getString(AUTHORIZATION_CREDENTIALS_BASIC_USERNAME_NAME, null);
options.username(username);
String password = basic.getString(AUTHORIZATION_CREDENTIALS_BASIC_PASSWORD_NAME, null);
options.password(password);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,19 @@ public abstract class AbstractSchemaRegistryOptionsConfig extends OptionsConfig
public final List<String> trust;
public final boolean trustcacerts;
public final String authorization;
public final String username;
public final String password;

protected AbstractSchemaRegistryOptionsConfig(
String url,
String context,
Duration maxAge,
List<String> keys,
List<String> trust,
boolean trustcacerts,
String authorization)
String url,
String context,
Duration maxAge,
List<String> keys,
List<String> trust,
boolean trustcacerts,
String authorization,
String username,
String password)
{
this.url = url;
this.context = context;
Expand All @@ -45,5 +49,7 @@ protected AbstractSchemaRegistryOptionsConfig(
this.trust = trust;
this.trustcacerts = trustcacerts;
this.authorization = authorization;
this.username = username;
this.password = password;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public abstract class AbstractSchemaRegistryOptionsConfigBuilder<T, B extends Ab
private List<String> trust;
private Boolean trustcacerts;
private String authorization;
private String username;
private String password;

public B url(
String url)
Expand Down Expand Up @@ -85,12 +87,24 @@ public B authorization(
return thisType().cast(this);
}

public B username(String username)
{
this.username = username;
return thisType().cast(this);
}

public B password(String password)
{
this.password = password;
return thisType().cast(this);
}

@Override
public T build()
{
Duration maxAge = (this.maxAge != null) ? this.maxAge : MAX_AGE_DEFAULT;
final boolean trustcacerts = this.trustcacerts == null ? this.trust == null : this.trustcacerts;
return mapper.apply(newOptionsConfig(url, context, maxAge, keys, trust, trustcacerts, authorization));
return mapper.apply(newOptionsConfig(url, context, maxAge, keys, trust, trustcacerts, authorization, username, password));
}

protected AbstractSchemaRegistryOptionsConfigBuilder(
Expand All @@ -106,5 +120,7 @@ protected abstract AbstractSchemaRegistryOptionsConfig newOptionsConfig(
List<String> keys,
List<String> trust,
boolean trustcacerts,
String authorization);
String authorization,
String username,
String password);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ public static <T> SchemaRegistryOptionsConfigBuilder<T> builder(
List<String> keys,
List<String> trust,
boolean trustcacerts,
String authorization)
String authorization,
String username,
String password)
{
super(url, context, maxAge, keys, trust, trustcacerts, authorization);
super(url, context, maxAge, keys, trust, trustcacerts, authorization, username, password);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,19 @@ protected SchemaRegistryOptionsConfig newOptionsConfig(
List<String> keys,
List<String> trust,
boolean trustcacerts,
String authorization)
String authorization,
String username,
String password)
{
return new SchemaRegistryOptionsConfig(url, context, maxAge, keys, trust, trustcacerts, authorization);
return new SchemaRegistryOptionsConfig(url,
context,
maxAge,
keys,
trust,
trustcacerts,
authorization,
username,
password);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.ByteOrder;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -143,7 +144,16 @@ public SchemaRegistryCatalogHandler(
this.catalogId = catalog.id;
this.cachedSchemas = catalog.cache.schemas;
this.cachedSchemaIds = catalog.cache.schemaIds;
this.authorization = options.authorization;
if (options.username != null && options.password != null)
{
String base64Creds =
Base64.getEncoder().encodeToString((options.username + ":" + options.password).getBytes());
this.authorization = "Basic " + base64Creds;
}
else
{
this.authorization = options.authorization;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,33 @@ public void shouldReadOptions()
assertThat(catalog.authorization, equalTo("Basic dXNlcjpzZWNyZXQ="));
}

@Test
public void shouldReadOptionsWithBasicCredentials()
{
String text = """
{
"url": "http://localhost:8081",
"context": "default",
"credentials":
{
"basic":
{
"username": "user",
"password": "secret"
}
}
}
""";

SchemaRegistryOptionsConfig catalog = jsonb.fromJson(text, SchemaRegistryOptionsConfig.class);

assertThat(catalog, not(nullValue()));
assertThat(catalog.url, equalTo("http://localhost:8081"));
assertThat(catalog.context, equalTo("default"));
assertThat(catalog.username, equalTo("user"));
assertThat(catalog.password, equalTo("secret"));
}

@Test
public void shouldWriteOptions()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ catalogs:
url: http://localhost:8081
context: default
max-age: 30
credentials:
basic:
username: user1
password: password1
Loading
Loading