Skip to content

Commit

Permalink
added new info
Browse files Browse the repository at this point in the history
  • Loading branch information
yauheniyefimenka committed Sep 26, 2023
1 parent 5f20af7 commit e9fddd9
Show file tree
Hide file tree
Showing 20 changed files with 216 additions and 3 deletions.
1 change: 1 addition & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ services:
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.2.1
hostname: ksqldb-server
container_name: ksqldb
depends_on:
- kafka
environment:
Expand Down
155 changes: 155 additions & 0 deletions docs/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
<img src="./images/Kafka-Streams.png" width="800" alt="Kafka-Streams">

# Введение в Kafka Streams

## Применение Kafka Streams для обработки потоковых данных: Основные концепции, примеры и сравнение с ksqlDB

### Какую скрытыю проблему мы хотим решить чаще всего?

<img src="./images/spaghetti-architecture.png" width="400" alt="spaghetti-architecture">

### С помощью чего мы можем это решить?

<img src="./images/apache-kafka.png" width="400" alt="apache-kafka">

### Что такое Apache Kafka?

Начинающие пользователи Kafka, как правило, имеют несколько убедительных причин для использования
Kafka в своей инфраструктуре. Первоначальным вариантом использования может быть применение
Kafka для интеграции баз данных. Это особенно полезно при наличии тесно связанных,
но разрозненных баз данных - часто RDBMS и NoSQL, - которые могут стать единой точкой отказа
в критически важных приложениях и привести к неудачной архитектуре "спагетти".

Прежде всего, Kafka - это НЕ просто система обмена сообщениями pub/sub для передачи данных из
пункта А в пункт Б. Именно так обычно отвечают на подобный вопрос почти все люди, считающие,
что Kafka - это очередной IBM MQ или RabbitMQ. Нет.

<img src="./images/kafka-env.png" width="800" alt="kafka-env">

### Основные концепции Apache Kafka?

Одной из основных причин, по которой Apache Kafka стал стандартом де-факто для столь большого числа
различных вариантов использования, является сочетание четырех мощных концепций:

- **Publish and subscribe** на потоки событий, подобно очереди сообщений или корпоративной системе обмена
сообщениями.
- **Store** потоки событий в отказоустойчивом хранилище сколь угодно долго (часы, дни, месяцы, вечность).
- **Process** потоки событий в реальном времени, по мере их возникновения.
- **Integration** различных источников и стоков (неважно, в реальном времени, пакетно или запрос-ответ)

Именно поэтому вокруг кафки выстраивается такая мощная экасистема.

### Сравнение Kafka Streams и ksqlDB - Как выбрать

Ответ сводится к:
- сочетанию ресурсов
- способностей команды
- специфики использования

### KsqlDB

<img src="./images/ksqldb-processing.png" width="500" alt="ksqldb-processing">

KsqlDB предоставляет нам API.

Варианты использования:
- CLI
- client API:

### Минусы KsqlDB:

- Более сложный барьер для входа
- Не покрывает весь **Kafka Streams API**
- Отдельный жизненый цикл(отдельный сервер)
- Поддержка(обслуживание ложиться на плечи **DevOps**)
- Собственные логи
- Миграция скриптов(создание, хранение, деплой)
- Нет способов управлять consumer-groups(_Покрайней мере не нашел как это сделать_)
- Нет возможности написать свою кастомную логику
- псевдо язык
- тесты

### Kafka Streams

Kafka Streams имеет низкий барьер для входа: Вы можете быстро написать и запустить небольшую пробную
версию на одной машине, а для масштабирования до больших производственных нагрузок достаточно
запустить дополнительные экземпляры приложения на нескольких машинах. Kafka Streams прозрачно
справляется с распределением нагрузки между несколькими экземплярами одного и того же приложения,
используя модель параллелизма Kafka.

Вся работа с кафкой строиться на базе **Producers** and **Consumers**. Но все зависит
от того, в каком виде мы представляем эти Producers and Consumers:

- Producer and Consumer
- Spring (boot, cloud)
- Connectors
- Kafka Stream

#### Processing Data: Vanilla Kafka vs. Kafka Streams

Kafka Streams является абстракцией над продюсерами и консьюмерами, позволяющей
игнорировать низкоуровневые детали и сосредоточиться на обработке данных в Kafka.
Поскольку Kafka Streams работает на основе декларативного подхода, код обработки,
написанный с использованием Kafka Streams, намного более лаконичен, чем код,
написанный с использованием низкоуровневых клиентов Kafka.

```java
try(Consumer<String, Widget> consumer = new KafkaConsumer<>(consumerProperties());
Producer<String, Widget> producer = new KafkaProducer<>(producerProperties())) {
consumer.subscribe(Collections.singletonList("widgets"));
while (true) {
ConsumerRecords<String, Widget> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, Widget> record : records) {
Widget widget = record.value();
if (widget.getColour().equals("red") {
ProducerRecord<String, Widget> producerRecord = new ProducerRecord<>("widgets-red", record.key(), widget);
producer.send(producerRecord, (metadata, exception)-> {…….} );
```

```java
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("widgets", Consumed.with(stringSerde, widgetsSerde))
.filter((key, widget) -> widget.getColour.equals("red"))
.to("widgets-red", Produced.with(stringSerde, widgetsSerde));
```

#### Stream Processing Topology

<img src="./images/streams-topology.jpg" width="400" alt="streams-topology">

#### Processor API vs DSL

#### DSL

Операции которые предоставляет нам библиотека:

- stateless transformations (map, filter)
- stateful transformations (count, reduce)
- joins (leftJoin)
- windowing (session windowing, hopping windowing)

Абстракции для работы:

- KStream
- KTable
- KGlobalTable

### KTable

<img src="./images/ktable.png" width="400" alt="ktable">

### Join

<img src="./images/join.png" width="400" alt="join">

### Windowing

- Tumbling time window
<img src="./images/tumbling-time-windows.png" width="400" alt="join">
- Hopping time window
<img src="./images/hopping-time-windows.png" width="400" alt="join">
- Session window
<img src="./images/session-windows.png" width="400" alt="join">
- Sliding time window
<img src="./images/sliding-windows.png" width="400" alt="join">
6 changes: 6 additions & 0 deletions docs/_config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
title: Kafka Streams
description: "Применение Kafka Streams для обработки потоковых данных. Основные концепции, примеры и сравнение с ksqlDB."
show_downloads: true
remote_theme: pages-themes/architect@v0.2.0 # depending on the selected theme
plugins:
- jekyll-remote-theme
Binary file added docs/images/Kafka-Streams.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/apache-kafka.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/hopping-time-windows.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/join.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/kafka-env.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/ksqldb-processing.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/ktable.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/session-windows.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/sliding-windows.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/spaghetti-architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/streams-topology.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/tumbling-time-windows.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1,23 +1,53 @@
## Commands for producer
## Kafka Streams

### Commands for producer

```bash
kafka-console-producer --bootstrap-server 127.0.0.1:9092 --property "parse.key=true" --property "key.separator=:" --topic lesson1_source
```

## Commands for consumer
### Commands for consumer

```bash
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --property "print.key=true" --topic lesson1_target
```

## ksql
## KsqlDB

### Commands for producer

```bash
kafka-console-producer --bootstrap-server 127.0.0.1:9092 --property "parse.key=true" --property "key.separator=:" --topic lesson1_source
```

### Commands for consumer

```bash
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --property "print.key=true" --topic lesson1_target_ksql
```

### CLI

```bash
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
```

### Commands of KsqlDB

```bash
SHOW streams;
SHOW PROPERTIES;
DROP STREAM {name_stream}
```

### Java client

```java

String sql = "CREATE TABLE ORDERS_BY_USER AS "
+ "SELECT USER_ID, COUNT(*) as COUNT "
+ "FROM ORDERS GROUP BY USER_ID EMIT CHANGES;";
Map<String, Object> properties = Collections.singletonMap("auto.offset.reset", "earliest");
ExecuteStatementResult result = client.executeStatement(sql, properties).get();
System.out.println("Query ID: " + result.queryId().orElse("<null>"));
```
File renamed without changes.
10 changes: 10 additions & 0 deletions src/main/resources/ksql/2_add_data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-1', 'wheel', 45);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-2', 'motor', 41);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-1', 'wheel', 42);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-3', 'muffler', 42);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-3', 'muffler', 40);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-4', 'motor', 43);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-6', 'muffler', 43);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-5', 'wheel', 41);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-5', 'wheel', 42);
INSERT INTO readings (sensor, location, reading) VALUES ('sensor-4', 'motor', 41);
6 changes: 6 additions & 0 deletions src/main/resources/ksql/3_add_clear_stream.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE STREAM clean AS
SELECT sensor,
reading,
UCASE(location) AS location
FROM readings
EMIT CHANGES;
5 changes: 5 additions & 0 deletions src/main/resources/ksql/4_add_high_readings.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE STREAM high_readings AS
SELECT sensor, reading, location
FROM clean
WHERE reading > 41
EMIT CHANGES;

0 comments on commit e9fddd9

Please sign in to comment.