Skip to content

Commit

Permalink
added smoke tests against the new rest-proxy standalone module
Browse files Browse the repository at this point in the history
  • Loading branch information
nosahama committed Nov 19, 2024
1 parent 1ea3ad5 commit bb39a79
Show file tree
Hide file tree
Showing 19 changed files with 169 additions and 20 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/container-smoke-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ jobs:

- name: Smoke test REST proxy
run: bin/smoke-test-rest.sh

- name: Smoke test REST proxy standalone
run: bin/smoke-test-rest-proxy.sh
20 changes: 20 additions & 0 deletions bin/smoke-test-rest-proxy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash

retries=5

for ((i = 0; i <= retries; i++)); do
response=$(curl --silent --verbose --fail http://localhost:8086/topics)

if [[ $response == '["_schemas","__consumer_offsets"]' ]]; then
echo "Ok!"
break
fi

if ((i == retries)); then
echo "Still failing after $i retries, giving up."
exit 1
fi

echo "Smoke test failed, retrying in 5 seconds ..."
sleep 5
done
18 changes: 18 additions & 0 deletions container/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,24 @@ services:
KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false
KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true

karapace-rest-new:
image: ghcr.io/aiven-open/karapace:develop
build:
context: ..
dockerfile: container/Dockerfile
entrypoint:
- python3
- -m
- src.rest_proxy
- /opt/karapace/rest.config.json
depends_on:
- kafka
- karapace-registry
volumes:
- ./rest.config.json:/opt/karapace/rest.config.json
ports:
- "8086:8086"

prometheus:
image: prom/prometheus
volumes:
Expand Down
15 changes: 15 additions & 0 deletions container/rest.config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"rest": 1,
"advertised_hostname": "karapace-rest-proxy",
"bootstrap_uri": "kafka:29092",
"registry_host": "karapace-registry",
"registry_port": 8081,
"host": "0.0.0.0",
"port": 8086,
"admin_metadata_max_age": 0,
"log_level": "DEBUG",
"statsd_host": "statsd-exporter",
"statsd_port": 8125,
"kafka_schema_reader_strict_mode": false,
"kafka_retriable_errors_silenced": true
}
4 changes: 2 additions & 2 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ ignore_errors = True
[mypy-karapace.serialization]
ignore_errors = True

[mypy-karapace.kafka_rest_apis.consumer_manager]
[mypy-rest_proxy.consumer_manager]
ignore_errors = True

[mypy-karapace.kafka_rest_apis]
[mypy-rest_proxy]
ignore_errors = True

# Third-party libraries with no stubs available. Before adding libraries here,
Expand Down
2 changes: 1 addition & 1 deletion src/karapace/karapace_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
from karapace import version as karapace_version
from karapace.config import Config, read_config
from karapace.instrumentation.prometheus import PrometheusInstrumentation
from karapace.kafka_rest_apis import KafkaRest
from karapace.rapu import RestApp
from karapace.schema_registry_apis import KarapaceSchemaRegistryController
from karapace.utils import DebugAccessLogger
from rest_proxy import KafkaRest

import argparse
import logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@
from karapace.errors import InvalidSchema
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.producer import AsyncKafkaProducer
from karapace.kafka_rest_apis.authentication import (
get_auth_config_from_header,
get_expiration_time_from_header,
get_kafka_client_auth_parameters_from_config,
)
from karapace.kafka_rest_apis.consumer_manager import ConsumerManager
from karapace.kafka_rest_apis.error_codes import RESTErrorCodes
from karapace.kafka_rest_apis.schema_cache import TopicSchemaCache
from karapace.karapace import KarapaceBase
from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE
from karapace.schema_models import TypedSchema, ValidatedTypedSchema
Expand All @@ -44,6 +36,14 @@
)
from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType
from karapace.utils import convert_to_int, json_encode
from rest_proxy.authentication import (
get_auth_config_from_header,
get_expiration_time_from_header,
get_kafka_client_auth_parameters_from_config,
)
from rest_proxy.consumer_manager import ConsumerManager
from rest_proxy.error_codes import RESTErrorCodes
from rest_proxy.schema_cache import TopicSchemaCache
from typing import Callable, TypedDict

import asyncio
Expand Down
93 changes: 93 additions & 0 deletions src/rest_proxy/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from aiohttp.web_log import AccessLogger
from contextlib import closing
from karapace import version as karapace_version
from karapace.config import Config, read_config
from karapace.instrumentation.prometheus import PrometheusInstrumentation
from karapace.utils import DebugAccessLogger
from rest_proxy import KafkaRest
from typing import Final

import argparse
import logging
import sys

PROGRAM_NAME: Final[str] = "karapace_rest_proxy"


def _configure_logging(*, config: Config) -> None:
log_level = config.get("log_level", "DEBUG")
log_format = config.get("log_format", "%(name)-20s\t%(threadName)s\t%(levelname)-8s\t%(message)s")

root_handler: logging.Handler | None = None
log_handler = config.get("log_handler", None)
if "systemd" == log_handler:
from systemd import journal

root_handler = journal.JournalHandler(SYSLOG_IDENTIFIER="karapace")
elif "stdout" == log_handler or log_handler is None:
root_handler = logging.StreamHandler(stream=sys.stdout)
else:
logging.basicConfig(level=logging.INFO, format=log_format)
logging.getLogger().setLevel(log_level)
logging.warning("Log handler %s not recognized, root handler not set.", log_handler)

if root_handler is not None:
root_handler.setFormatter(logging.Formatter(log_format))
root_handler.setLevel(log_level)
root_handler.set_name(name="karapace")
logging.root.addHandler(root_handler)

logging.root.setLevel(log_level)

if config.get("access_logs_debug") is True:
config["access_log_class"] = DebugAccessLogger
logging.getLogger("aiohttp.access").setLevel(logging.DEBUG)
else:
config["access_log_class"] = AccessLogger


def main() -> int:
parser = argparse.ArgumentParser(
prog=PROGRAM_NAME,
description="Karapace rest proxy: exposes an API over common Kafka operations, your Kafka essentials in one tool",
)
parser.add_argument("--version", action="version", help="show program version", version=karapace_version.__version__)
parser.add_argument("config_file", help="configuration file path", type=argparse.FileType())
arg = parser.parse_args()

with closing(arg.config_file):
config = read_config(arg.config_file)

logging.log(logging.INFO, "\n%s\\Co %s\n%s", ("=" * 50), PROGRAM_NAME, ("=" * 50))

_configure_logging(config=config)

app = KafkaRest(config=config)

logging.log(logging.INFO, "\n%s\nStarting %s\n%s", ("=" * 50), PROGRAM_NAME, ("=" * 50))

config_without_secrets = {}
for key, value in config.items():
if "password" in key:
value = "****"
config_without_secrets[key] = value
logging.log(logging.DEBUG, "Config %r", config_without_secrets)

try:
PrometheusInstrumentation.setup_metrics(app=app)
app.run() # `close` will be called by the callback `close_by_app` set by `KarapaceBase`
except Exception as ex: # pylint: disable-broad-except
app.stats.unexpected_exception(ex=ex, where=f"{PROGRAM_NAME}_main")
raise

return 0


if __name__ == "__main__":
sys.exit(main())
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
from karapace.kafka.common import translate_from_kafkaerror
from karapace.kafka.consumer import AsyncKafkaConsumer
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS, Timestamp
from karapace.kafka_rest_apis.authentication import get_kafka_client_auth_parameters_from_config
from karapace.kafka_rest_apis.error_codes import RESTErrorCodes
from karapace.karapace import empty_response, KarapaceBase
from karapace.serialization import DeserializationError, InvalidMessageHeader, InvalidPayload, SchemaRegistrySerializer
from karapace.utils import convert_to_int, json_decode, JSONDecodeError
from rest_proxy.authentication import get_kafka_client_auth_parameters_from_config
from rest_proxy.error_codes import RESTErrorCodes
from struct import error as UnpackError
from urllib.parse import urljoin

Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.consumer import AsyncKafkaConsumer, KafkaConsumer
from karapace.kafka.producer import AsyncKafkaProducer, KafkaProducer
from karapace.kafka_rest_apis import KafkaRest
from pathlib import Path
from rest_proxy import KafkaRest
from tests.conftest import KAFKA_VERSION
from tests.integration.utils.cluster import RegistryDescription, RegistryEndpoint, start_schema_registry_cluster
from tests.integration.utils.config import KafkaConfig, KafkaDescription, ZKConfig
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
from karapace.client import Client
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.producer import KafkaProducer
from karapace.kafka_rest_apis import KafkaRest, SUBJECT_VALID_POSTFIX
from karapace.schema_type import SchemaType
from karapace.version import __version__
from rest_proxy import KafkaRest, SUBJECT_VALID_POSTFIX
from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES
from tests.utils import (
new_random_name,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_rest_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from karapace.kafka_rest_apis.consumer_manager import KNOWN_FORMATS
from rest_proxy.consumer_manager import KNOWN_FORMATS
from tests.utils import (
consumer_valid_payload,
new_consumer,
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
See LICENSE for details
"""
from karapace.config import DEFAULTS
from karapace.kafka_rest_apis import UserRestProxy
from karapace.serialization import SchemaRegistrySerializer
from rest_proxy import UserRestProxy
from unittest.mock import patch

import copy
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

from http import HTTPStatus
from karapace.config import ConfigDefaults, set_config_defaults
from karapace.kafka_rest_apis.authentication import (
from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE
from rest_proxy.authentication import (
get_auth_config_from_header,
get_expiration_time_from_header,
get_kafka_client_auth_parameters_from_config,
SimpleOauthTokenProvider,
)
from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE

import base64
import datetime
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_rest_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from __future__ import annotations

from karapace.config import set_config_defaults
from karapace.kafka_rest_apis import AUTH_EXPIRY_TOLERANCE, KafkaRest, UserRestProxy
from rest_proxy import AUTH_EXPIRY_TOLERANCE, KafkaRest, UserRestProxy
from unittest.mock import call, Mock

import asyncio
Expand Down

0 comments on commit bb39a79

Please sign in to comment.