Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move all test fixtures to their own folder #687

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 20 additions & 89 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,90 +1,21 @@
import logging.config
from collections import namedtuple
from typing import Generator

import pytest
from testcontainers.core.network import Network

from .containerhelper import ContainerHelper
from .logging import LOGGING_CONFIG, patch_logger_class

# Define list of files with fixtures for pytest autodiscovery
pytest_plugins = [
"tests.test_quixstreams.test_dataframe.fixtures",
"tests.test_quixstreams.fixtures",
"tests.test_quixstreams.test_models.test_serializers.fixtures",
"tests.test_quixstreams.test_platforms.test_quix.fixtures",
"tests.test_quixstreams.test_state.fixtures",
"tests.test_quixstreams.test_state.test_rocksdb.test_windowed.fixtures",
]

KafkaContainer = namedtuple(
"KafkaContainer",
["broker_address", "internal_broker_address"],
)
SchemaRegistryContainer = namedtuple(
"SchemaRegistryContainer",
["schema_registry_address"],
import pkgutil
from pathlib import Path


def list_modules_in_dir(dir_path, current_module="", modules=None) -> list[str]:
if not modules:
modules = []
for finder, name, pkg in pkgutil.iter_modules([dir_path]):
full_module_name = current_module + ("." if current_module else "") + name
if pkg:
modules = list_modules_in_dir(
finder.path + f"/{name}", full_module_name, modules
)
else:
modules.append(full_module_name)
return modules


pytest_plugins = list_modules_in_dir(
f"{Path(__file__).parent.absolute()}/fixtures", "tests.fixtures"
)

test_logger = logging.getLogger("quixstreams.tests")


@pytest.fixture(autouse=True, scope="session")
def configure_logging():
logging.config.dictConfig(LOGGING_CONFIG)
patch_logger_class()


@pytest.fixture(autouse=True)
def log_test_progress(request: pytest.FixtureRequest):
test_logger.debug("Starting test %s", request.node.nodeid)


@pytest.fixture(scope="session")
def network():
with Network() as network:
yield network


@pytest.fixture(scope="session")
def schema_registry_container(
network: Network, kafka_container: KafkaContainer
) -> Generator[SchemaRegistryContainer, None, None]:
container, schema_registry_address = (
ContainerHelper.create_schema_registry_container(
network, kafka_container.internal_broker_address
)
)
test_logger.debug(
f"Starting Schema Registry container on {schema_registry_address}"
)
ContainerHelper.start_schema_registry_container(container)
test_logger.debug(f"Started Schema Registry container on {schema_registry_address}")
yield SchemaRegistryContainer(schema_registry_address=schema_registry_address)
container.stop()


@pytest.fixture(scope="session")
def kafka_container_factory(network: Network) -> KafkaContainer:
def factory():
(
kafka_container,
internal_broker_address,
external_broker_address,
) = ContainerHelper.create_kafka_container(network)
test_logger.debug(f"Starting Kafka container on {external_broker_address}")
ContainerHelper.start_kafka_container(kafka_container)
test_logger.debug(f"Started Kafka container on {external_broker_address}")
yield KafkaContainer(
broker_address=external_broker_address,
internal_broker_address=internal_broker_address,
)
kafka_container.stop()

return factory


@pytest.fixture(scope="session")
def kafka_container(kafka_container_factory) -> KafkaContainer:
yield from kafka_container_factory()
Empty file added tests/fixtures/__init__.py
Empty file.
62 changes: 62 additions & 0 deletions tests/fixtures/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from typing import Optional
from unittest.mock import PropertyMock, patch

import pytest

from quixstreams.app import Application, MessageProcessedCallback, ProcessingGuarantee
from quixstreams.error_callbacks import (
ConsumerErrorCallback,
ProcessingErrorCallback,
ProducerErrorCallback,
)
from quixstreams.kafka import AutoOffsetReset
from quixstreams.models.topics import TopicManager


@pytest.fixture()
def app_factory(kafka_container, random_consumer_group, tmp_path, store_type):
def factory(
consumer_group: Optional[str] = None,
auto_offset_reset: AutoOffsetReset = "latest",
commit_interval: float = 5.0,
commit_every: int = 0,
consumer_extra_config: Optional[dict] = None,
producer_extra_config: Optional[dict] = None,
on_consumer_error: Optional[ConsumerErrorCallback] = None,
on_producer_error: Optional[ProducerErrorCallback] = None,
on_processing_error: Optional[ProcessingErrorCallback] = None,
on_message_processed: Optional[MessageProcessedCallback] = None,
state_dir: Optional[str] = None,
auto_create_topics: bool = True,
use_changelog_topics: bool = True,
topic_manager: Optional[TopicManager] = None,
processing_guarantee: ProcessingGuarantee = "at-least-once",
request_timeout: float = 30,
) -> Application:
state_dir = state_dir or (tmp_path / "state").absolute()
return Application(
broker_address=kafka_container.broker_address,
consumer_group=consumer_group or random_consumer_group,
auto_offset_reset=auto_offset_reset,
commit_interval=commit_interval,
commit_every=commit_every,
consumer_extra_config=consumer_extra_config,
producer_extra_config=producer_extra_config,
on_consumer_error=on_consumer_error,
on_producer_error=on_producer_error,
on_processing_error=on_processing_error,
on_message_processed=on_message_processed,
state_dir=state_dir,
auto_create_topics=auto_create_topics,
use_changelog_topics=use_changelog_topics,
topic_manager=topic_manager,
processing_guarantee=processing_guarantee,
request_timeout=request_timeout,
)

with patch(
"quixstreams.state.manager.StateStoreManager.default_store_type",
new_callable=PropertyMock,
) as m:
m.return_value = store_type
yield factory
41 changes: 41 additions & 0 deletions tests/fixtures/checkpointing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from typing import Optional

import pytest

from quixstreams.checkpointing import Checkpoint
from quixstreams.kafka import Consumer
from quixstreams.processing import PausingManager
from quixstreams.rowproducer import RowProducer
from quixstreams.sinks import SinkManager
from quixstreams.state import StateStoreManager


@pytest.fixture()
def checkpoint_factory(state_manager, consumer, row_producer_factory):
def factory(
commit_interval: float = 1,
commit_every: int = 0,
consumer_: Optional[Consumer] = None,
producer_: Optional[RowProducer] = None,
state_manager_: Optional[StateStoreManager] = None,
sink_manager_: Optional[SinkManager] = None,
pausing_manager_: Optional[PausingManager] = None,
exactly_once: bool = False,
):
consumer_ = consumer_ or consumer
sink_manager_ = sink_manager_ or SinkManager()
pausing_manager_ = pausing_manager_ or PausingManager(consumer=consumer)
producer_ = producer_ or row_producer_factory(transactional=exactly_once)
state_manager_ = state_manager_ or state_manager
return Checkpoint(
commit_interval=commit_interval,
commit_every=commit_every,
producer=producer_,
consumer=consumer_,
state_manager=state_manager_,
sink_manager=sink_manager_,
pausing_manager=pausing_manager_,
exactly_once=exactly_once,
)

return factory
76 changes: 76 additions & 0 deletions tests/fixtures/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import uuid
from typing import Optional

import pytest

from quixstreams.error_callbacks import ConsumerErrorCallback
from quixstreams.kafka import AutoOffsetReset, Consumer
from quixstreams.rowconsumer import RowConsumer


@pytest.fixture()
def random_consumer_group() -> str:
return str(uuid.uuid4())


@pytest.fixture()
def consumer_factory(kafka_container, random_consumer_group):
def factory(
broker_address: str = kafka_container.broker_address,
consumer_group: Optional[str] = None,
auto_offset_reset: AutoOffsetReset = "latest",
auto_commit_enable: bool = True,
extra_config: dict = None,
) -> Consumer:
consumer_group = consumer_group or random_consumer_group
extras = {
# Make consumers to refresh cluster metadata often
# to react on re-assignment changes faster
"topic.metadata.refresh.interval.ms": 3000,
# Keep rebalances as simple as possible for testing
"partition.assignment.strategy": "range",
}
extras.update((extra_config or {}))

return Consumer(
broker_address=broker_address,
consumer_group=consumer_group,
auto_commit_enable=auto_commit_enable,
auto_offset_reset=auto_offset_reset,
extra_config=extras,
)

return factory


@pytest.fixture()
def consumer(consumer_factory) -> Consumer:
return consumer_factory()


@pytest.fixture()
def row_consumer_factory(kafka_container, random_consumer_group):
def factory(
broker_address: str = kafka_container.broker_address,
consumer_group: Optional[str] = None,
auto_offset_reset: AutoOffsetReset = "latest",
auto_commit_enable: bool = True,
extra_config: dict = None,
on_error: Optional[ConsumerErrorCallback] = None,
) -> RowConsumer:
extra_config = extra_config or {}
consumer_group = consumer_group or random_consumer_group

# Make consumers to refresh cluster metadata often
# to react on re-assignment changes faster
extra_config["topic.metadata.refresh.interval.ms"] = 3000
return RowConsumer(
broker_address=broker_address,
consumer_group=consumer_group,
auto_commit_enable=auto_commit_enable,
auto_offset_reset=auto_offset_reset,
extra_config=extra_config,
on_error=on_error,
)

return factory
File renamed without changes.
49 changes: 49 additions & 0 deletions tests/fixtures/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pytest

from quixstreams.models import MessageContext
from quixstreams.models.rows import Row


@pytest.fixture()
def message_context_factory():
def factory(
topic: str = "test",
) -> MessageContext:
return MessageContext(
topic=topic,
partition=0,
offset=0,
size=0,
)

return factory


@pytest.fixture()
def row_factory():
"""
This factory includes only the fields typically handed to a producer when
producing a message; more generally, the fields you would likely
need to validate upon producing/consuming.
"""

def factory(
value,
topic="input-topic",
key=b"key",
timestamp: int = 0,
headers=None,
partition: int = 0,
offset: int = 0,
) -> Row:
context = MessageContext(
topic=topic,
partition=partition,
offset=offset,
size=0,
)
return Row(
value=value, key=key, timestamp=timestamp, context=context, headers=headers
)

return factory
Loading