Skip to content
This repository has been archived by the owner on Apr 16, 2024. It is now read-only.

Commit

Permalink
[DK-2795] Tests (#8)
Browse files Browse the repository at this point in the history
* refactor to kafkaclient interface

* test add new topic

* tests for change replication factor

* Tests for partitions

* generic config tests

* the rest of configs tests

* rename

* Move test data to separate file

* test address

* test adding/removing config options

* scaffold e2e tests

* increase timeout

* don't run controller tests twice in e2e

* fix kustomization

* offset

* wait logs

* ns

* order

* 1 rec

* setup kafka

* Remove tests that are too flaky in CI environments

* 5.5.0

* 0.5.0

* add create-topic test

* describe

* more logs

* lowercase

* fix kubebuilder annotations

* change svc

* kafka-client

* assert

* fix

* fix2

* remove echo

* wait for kafka pods to be up

* fix

* add partitions

* fix

* 5m

* requeue on conn error

* fix

* assert

* assert

* echo

* remove tail

* log

* fix log

* remove log

* test change cleanup policy

* fix

* test delete topic

* catch error code

* fix

* Dk 2795 testcontainers (#7)

* replace mocks for kafka server with testcontainers

* remove unneeded ns

* Reorder steps

* revert

* cleanup

* go 16

* fixes (#9)

* try again without waiting for pods to be up

* disable uneeded kafka components

* fix break line

* longer timeout

* two timeouts

* move e2e tests

* fix paths

* path

* bash

* remove tty

* remove

* various

* go version

* go 17

* remove branch

* 17
  • Loading branch information
Tyrion85 authored Mar 1, 2022
1 parent 6b34b8e commit f882ccb
Show file tree
Hide file tree
Showing 38 changed files with 2,406 additions and 171 deletions.
88 changes: 88 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
name: e2e

on:
pull_request:
push:
branches:
- main

jobs:
kind:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Restore Go cache
uses: actions/cache@v1
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: 1.17.x
- name: Setup Kubernetes
uses: engineerd/setup-kind@v0.5.0
with:
version: v0.11.1
image: kindest/node:v1.21.1@sha256:69860bda5563ac81e3c0057d654b5253219618a22ec3a346306239bba8cfa1a6
- name: Setup Kustomize
uses: fluxcd/pkg/actions/kustomize@main
- name: Setup envtest
uses: fluxcd/pkg/actions/envtest@main
with:
version: "1.19.2"
- name: Setup Helm
uses: fluxcd/pkg/actions/helm@main
- name: Run controller tests
run: make test
- name: Check if working tree is dirty
run: |
go version
if [[ $(git diff --stat) != '' ]]; then
git --no-pager diff
echo 'run make test and commit changes'
exit 1
fi
- name: Build container image
run: make docker-build-without-tests IMG=test/k8skafka-controller:latest BUILD_PLATFORMS=linux/amd64 BUILD_ARGS=--load
- name: Load test image
run: kind load docker-image test/k8skafka-controller:latest
- name: Deploy controller
run: make deploy IMG=test/k8skafka-controller:latest
- name: Setup Kafka
env:
KAFKA_VERSION: ${{ '0.5.0' }}
run: |
kubectl create ns kafka
helm repo add confluentinc https://confluentinc.github.io/cp-helm-charts/
helm upgrade --wait -i kafka confluentinc/cp-helm-charts \
--version $KAFKA_VERSION \
--namespace kafka \
--set cp-schema-registry.enabled=false \
--set cp-kafka-rest.enabled=false \
--set cp-kafka-connect.enabled=false \
--set cp-ksql-server.enabled=false \
--set cp-control-center.enabled=false
- name: Setup Kafka client
run: |
kubectl -n kafka apply -f ./config/testdata/test-kafka-client.yaml
kubectl -n kafka wait --for=condition=ready pod -l app=kafka-client
- name: Run Kafka e2e tests
run: ./scripts/tests/e2e/test_suite.sh
shell: bash
- name: Logs
run: |
kubectl -n k8skafka-system wait --for=condition=ready pod -l app=k8skafka-controller && kubectl -n k8skafka-system logs deploy/k8skafka-controller
- name: Debug failure
if: failure()
run: |
kubectl -n kube-system describe pods
kubectl -n k8skafka-system describe pods
kubectl -n k8skafka-system get kafkatopic -oyaml
kubectl -n k8skafka-system describe kafkatopic
kubectl -n k8skafka-system get all
kubectl -n k8skafka-system logs deploy/k8skafka-controller
kubectl -n kafka get all
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM golang:1.15 as builder
FROM golang:1.17 as builder

WORKDIR /workspace
# Copy the Go Modules manifests
Expand Down
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ podTemplate(label: 'k8skafka-controller',
containers: [
containerTemplate(
name: 'golang',
image: 'bitnami/golang:1.15',
image: 'bitnami/golang:1.16',
ttyEnabled: true
),
containerTemplate(
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ all: test manager

# Run tests
test: generate fmt vet manifests
go test ./... -coverprofile cover.out
go test ./... -test.v -coverprofile cover.out

# Build manager binary
manager: generate fmt vet
Expand Down Expand Up @@ -59,6 +59,9 @@ generate: controller-gen
docker-build: test
docker build . -t ${IMG}

docker-build-without-tests: generate fmt vet manifests
docker build . -t ${IMG}

# Push the docker image
docker-push:
docker push ${IMG}
Expand Down
34 changes: 20 additions & 14 deletions api/v1beta1/kafkatopic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1beta1

import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -58,13 +59,13 @@ type KafkaTopicConfig struct {
// The default policy ("delete") will discard old segments when their retention time or size limit has been reached.
// The "compact" setting will enable log compaction on the topic.
// +optional
CleanupPolicy *CleanupPolicy `json:"cleanupPolicy,omitempty"`
CleanupPolicy *string `json:"cleanupPolicy,omitempty"`

// Final compression type for a given topic.
// Supported are standard compression codecs: 'gzip', 'snappy', 'lz4', 'zstd').
// It additionally accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the original compression codec set by the producer.
// +optional
CompressionType *CompressionType `json:"compressionType,omitempty"`
CompressionType *string `json:"compressionType,omitempty"`

// The amount of time to retain delete tombstone markers for log compacted topics. Specified in milliseconds.
// This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0
Expand Down Expand Up @@ -134,7 +135,7 @@ type KafkaTopicConfig struct {
// Define whether the timestamp in the message is message create time or log append time.
// The value should be either `CreateTime` or `LogAppendTime`
// +optional
MessageTimestampType *MessageTimestampType `json:"messageTimestampType,omitempty"`
MessageTimestampType *string `json:"messageTimestampType,omitempty"`

// This configuration controls how frequently the log compactor will attempt to clean the log (assuming LogCompaction is enabled).
// By default we will avoid cleaning a log where more than 50% of the log has been compacted.
Expand All @@ -144,12 +145,16 @@ type KafkaTopicConfig struct {
// (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) records for at least the MinCompactionLagMs duration,
// or (ii) if the log has had dirty (uncompacted) records for at most the MaxCompactionLagMs period.
// +optional
MinCleanableDirtyRatio *int64 `json:"minCleanableDirtyRatio,omitempty"`
MinCleanableDirtyRatio *resource.Quantity `json:"minCleanableDirtyRatio,omitempty"`

// The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.
// +optional
MinCompactionLagMs *int64 `json:"minCompactionLagMs,omitempty"`

// The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted.
// +optional
MaxCompactionLagMs *int64 `json:"maxCompactionLagMs,omitempty"`

// When a producer sets acks to "all" (or "-1"), this configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful.
// If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
// When used together, MinInsyncReplicas and acks allow you to enforce greater durability guarantees.
Expand Down Expand Up @@ -194,16 +199,12 @@ type KafkaTopicConfig struct {
UncleanLeaderElectionEnable *bool `json:"uncleanLeaderElectionEnable,omitempty"`
}

type CleanupPolicy string

const (
CleanupPolicyDelete = "delete"
CleanupPolicyCompact = "compact"
CleanupPolicyDeleteCompact = "delete,compact"
)

type CompressionType string

const (
CompressionTypeGZIP = "gzip"
CompressionTypeSnappy = "snappy"
Expand All @@ -213,8 +214,6 @@ const (
CompressionTypeProducer = "producer"
)

type MessageTimestampType string

const (
MessageTimestampTypeCreateTime = "CreateTime"
MessageTimestampTypeLogAppendTime = "LogAppendTime"
Expand Down Expand Up @@ -319,14 +318,14 @@ func (in *KafkaTopic) GetReplicationFactor() int64 {
return *in.Spec.ReplicationFactor
}

func (in *KafkaTopic) GetCleanupPolicy() *CleanupPolicy {
func (in *KafkaTopic) GetCleanupPolicy() *string {
if in.Spec.KafkaTopicConfig == nil {
return nil
}
return in.Spec.KafkaTopicConfig.CleanupPolicy
}

func (in *KafkaTopic) GetCompressionType() *CompressionType {
func (in *KafkaTopic) GetCompressionType() *string {
if in.Spec.KafkaTopicConfig == nil {
return nil
}
Expand Down Expand Up @@ -410,14 +409,14 @@ func (in *KafkaTopic) GetMessageTimestampDifferenceMaxMs() *int64 {
return in.Spec.KafkaTopicConfig.MessageTimestampDifferenceMaxMs
}

func (in *KafkaTopic) GetMessageTimestampType() *MessageTimestampType {
func (in *KafkaTopic) GetMessageTimestampType() *string {
if in.Spec.KafkaTopicConfig == nil {
return nil
}
return in.Spec.KafkaTopicConfig.MessageTimestampType
}

func (in *KafkaTopic) GetMinCleanableDirtyRatio() *int64 {
func (in *KafkaTopic) GetMinCleanableDirtyRatio() *resource.Quantity {
if in.Spec.KafkaTopicConfig == nil {
return nil
}
Expand All @@ -431,6 +430,13 @@ func (in *KafkaTopic) GetMinCompactionLagMs() *int64 {
return in.Spec.KafkaTopicConfig.MinCompactionLagMs
}

func (in *KafkaTopic) GetMaxCompactionLagMs() *int64 {
if in.Spec.KafkaTopicConfig == nil {
return nil
}
return in.Spec.KafkaTopicConfig.MaxCompactionLagMs
}

func (in *KafkaTopic) GetMinInsyncReplicas() *int64 {
if in.Spec.KafkaTopicConfig == nil {
return nil
Expand Down
16 changes: 11 additions & 5 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions config/crd/bases/kafka.infra.doodle.com_kafkatopics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ spec:
or alternatively the wildcard '*' can be used to throttle all
replicas for this topic.
type: string
maxCompactionLagMs:
description: The maximum time a message will remain ineligible
for compaction in the log. Only applicable for logs that are
being compacted.
format: int64
type: integer
maxMessageBytes:
description: The largest record batch size allowed by Kafka. If
this is increased and there are consumers older than 0.10.2,
Expand Down Expand Up @@ -170,6 +176,9 @@ spec:
or `LogAppendTime`
type: string
minCleanableDirtyRatio:
anyOf:
- type: integer
- type: string
description: 'This configuration controls how frequently the log
compactor will attempt to clean the log (assuming LogCompaction
is enabled). By default we will avoid cleaning a log where more
Expand All @@ -184,8 +193,8 @@ spec:
(uncompacted) records for at least the MinCompactionLagMs duration,
or (ii) if the log has had dirty (uncompacted) records for at
most the MaxCompactionLagMs period.'
format: int64
type: integer
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
minCompactionLagMs:
description: The minimum time a message will remain uncompacted
in the log. Only applicable for logs that are being compacted.
Expand Down
5 changes: 5 additions & 0 deletions config/crd/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- bases/kafka.infra.doodle.com_kafkatopics.yaml
# +kubebuilder:scaffold:crdkustomizeresource
7 changes: 7 additions & 0 deletions config/default/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: kustomize.config.k8s.io/v1beta1
namespace: k8skafka-system
bases:
- ../crd
- ../rbac
- ../manager
- namespace.yaml
6 changes: 6 additions & 0 deletions config/default/namespace.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
apiVersion: v1
kind: Namespace
metadata:
labels:
control-plane: controller
name: k8skafka-system
8 changes: 8 additions & 0 deletions config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
resources:
- manager.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: test/k8skafka-controller
newTag: latest
32 changes: 32 additions & 0 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: k8skafka-controller
labels:
control-plane: controller
spec:
selector:
matchLabels:
app: k8skafka-controller
replicas: 1
template:
metadata:
labels:
app: k8skafka-controller
spec:
containers:
- command:
- /manager
# args:
# - --enable-leader-election
image: controller:latest
imagePullPolicy: IfNotPresent
name: manager
resources:
limits:
cpu: 100m
memory: 500Mi
requests:
cpu: 100m
memory: 200Mi
terminationGracePeriodSeconds: 10
5 changes: 5 additions & 0 deletions config/rbac/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
resources:
- role.yaml
- role_binding.yaml
- leader_election_role.yaml
- leader_election_role_binding.yaml
Loading

0 comments on commit f882ccb

Please sign in to comment.