Skip to content
This repository was archived by the owner on Mar 17, 2025. It is now read-only.

Commit 6d62abb

Browse files
authored
Add asyncapi.mqtt proxy and asyncapi.mqtt.kafka proxy (#109)
1 parent 8503df5 commit 6d62abb

File tree

28 files changed

+849
-57
lines changed

28 files changed

+849
-57
lines changed

http.proxy/http/asyncapi.yaml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
asyncapi: 3.0.0
2+
info:
3+
title: HTTP Zilla Proxy
4+
version: 1.0.0
5+
license:
6+
name: Aklivity Community License
7+
servers:
8+
secure:
9+
host: https://localhost:7143
10+
protocol: http
11+
protocolVersion: '1.1'
12+
defaultContentType: application/json
13+
14+
channels:
15+
all:
16+
address: /
17+
18+
operations:
19+
getEvents:
20+
action: receive
21+
bindings:
22+
http:
23+
type: request
24+
method: GET
25+
channel:
26+
$ref: '#/channels/all'
27+
28+
components:
29+
correlationIds:
30+
itemsCorrelationId:
31+
location: '$message.header#/idempotency-key'
32+
messages:
33+
item:
34+
name: event
35+
title: An event
36+
correlationId:
37+
$ref: "#/components/correlationIds/itemsCorrelationId"
38+
headers:
39+
type: object
40+
properties:
41+
idempotency-key:
42+
description: Unique identifier for a given event
43+
type: string
44+
contentType: application/json

mqtt.kafka.asyncapi.proxy/README.md

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# mqtt.kafka.asyncapi.proxy
2+
3+
In this guide, you create Kafka topics and use Zilla to mediate MQTT broker messages onto those topics.
4+
Zilla implements MQTT API defined in AsyncAPI specifications and uses Kafka API defined AsyncAPI proxy MQTT messages to Kafka.
5+
6+
## Running locally
7+
8+
This example can be run using Docker compose or Kubernetes. The setup scripts are in the [compose](./docker/compose) and [helm](./k8s/helm) folders respectively and work the same way.
9+
10+
You will need a running kafka broker. To start one locally you will find instructions in the [kafka.broker](../kafka.broker) folder.
11+
12+
### Setup
13+
14+
Wether you chose [compose](./docker/compose) or [helm](./k8s/helm), the `setup.sh` script will:
15+
16+
- create the necessary kafka topics
17+
- create an MQTT broker at `mqtt://localhost:7183`
18+
19+
```bash
20+
./setup.sh
21+
```
22+
23+
### Using this example
24+
25+
Using eclipse-mosquitto subscribe to the sensors/# topic.
26+
27+
```bash
28+
mosquitto_sub -V '5' -t 'sensors/#' -d -p 7183
29+
```
30+
31+
output:
32+
33+
```
34+
Client null sending CONNECT
35+
Client 7f37e11e-8f79-458a-a4b3-3ffb36a9e08b received CONNACK (0)
36+
Client 7f37e11e-8f79-458a-a4b3-3ffb36a9e08b sending SUBSCRIBE (Mid: 1, Topic: sensors/#, QoS: 0, Options: 0x00)
37+
Client 7f37e11e-8f79-458a-a4b3-3ffb36a9e08b received SUBACK
38+
Subscribed (mid: 1): 0
39+
Client 7f37e11e-8f79-458a-a4b3-3ffb36a9e08b received PUBLISH (d0, q0, r0, m0, 'sensors/1', ... (24 bytes))
40+
{"id":"1","status":"on"}
41+
```
42+
43+
In a separate session, publish a valid message on the sensors/1 topic.
44+
45+
```bash
46+
mosquitto_pub -V '5' -t 'sensors/1' -m '{"id":"1","status":"on"}' -d -p 7183
47+
```
48+
49+
output:
50+
51+
```
52+
Client null sending CONNECT
53+
Client 2fc05cdc-5e1d-4e00-be18-0026ae47e749 received CONNACK (0)
54+
Client 2fc05cdc-5e1d-4e00-be18-0026ae47e749 sending PUBLISH (d0, q0, r0, m1, 'sensors/1', ... (24 bytes))
55+
Client 2fc05cdc-5e1d-4e00-be18-0026ae47e749 sending DISCONNECT
56+
```
57+
58+
Now attempt to publish an invalid message, with property `stat` instead of `status`.
59+
60+
```bash
61+
mosquitto_pub -V '5' -t 'sensors/1' -m '{"id":"1","stat":"off"}' -d -p 7183 --repeat 2 --repeat-delay 3
62+
```
63+
64+
output:
65+
66+
```
67+
Client null sending CONNECT
68+
Client cd166c27-de75-4a2e-b3c7-f16631bda2a9 received CONNACK (0)
69+
Client cd166c27-de75-4a2e-b3c7-f16631bda2a9 sending PUBLISH (d0, q0, r0, m1, 'sensors/1', ... (23 bytes))
70+
Received DISCONNECT (153)
71+
Error: The client is not currently connected.
72+
```
73+
74+
### Teardown
75+
76+
The `teardown.sh` script will remove any resources created.
77+
78+
```bash
79+
./teardown.sh
80+
```
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
version: '3'
2+
services:
3+
zilla:
4+
image: ghcr.io/aklivity/zilla
5+
container_name: zilla
6+
pull_policy: always
7+
restart: unless-stopped
8+
ports:
9+
- 7183:7183
10+
environment:
11+
KEYSTORE_PASSWORD: generated
12+
KAFKA_HOST: ${KAFKA_HOST}
13+
KAFKA_PORT: ${KAFKA_PORT}
14+
JAVA_OPTIONS: -Dzilla.incubator.enabled=true
15+
volumes:
16+
- ../../zilla.yaml:/etc/zilla/zilla.yaml
17+
- ../../mqtt/asyncapi.yaml:/etc/zilla/mqtt/mqtt-asyncapi.yaml
18+
- ../../kafka/asyncapi.yaml:/etc/zilla/kafka/kafka-asyncapi.yaml
19+
command: start -v -e
20+
21+
networks:
22+
default:
23+
driver: bridge
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#!/bin/bash
2+
set -e
3+
4+
if [[ -z "$KAFKA_HOST" && -z "$KAFKA_PORT" ]]; then
5+
export KAFKA_HOST=host.docker.internal
6+
export KAFKA_PORT=9092
7+
echo "==== This example requires env vars KAFKA_HOST and KAFKA_PORT for a running kafka instance. Setting to the default ($KAFKA_HOST:$KAFKA_PORT) ===="
8+
fi
9+
10+
NAMESPACE=zilla-mqtt-kafka-asyncapi-proxy
11+
12+
# Start or restart Zilla
13+
if [[ -z $(docker-compose -p $NAMESPACE ps -q zilla) ]]; then
14+
docker-compose -p $NAMESPACE up -d
15+
16+
# Create the mqtt topics in Kafka
17+
docker run --rm bitnami/kafka:3.2 bash -c "
18+
echo 'Creating topics for $KAFKA_HOST:$KAFKA_PORT'
19+
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --create --if-not-exists --topic mqtt-messages
20+
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --create --if-not-exists --topic sensors
21+
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --create --if-not-exists --topic mqtt-retained --config cleanup.policy=compact
22+
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --create --if-not-exists --topic mqtt-sessions --config cleanup.policy=compact
23+
"
24+
25+
else
26+
docker-compose -p $NAMESPACE restart --no-deps zilla
27+
fi
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#!/bin/bash
2+
set -e
3+
4+
NAMESPACE=zilla-mqtt-kafka-asyncapi-proxy
5+
docker-compose -p $NAMESPACE down --remove-orphans
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#!/bin/bash
2+
set -e
3+
4+
if [[ -z "$KAFKA_HOST" && -z "$KAFKA_PORT" ]]; then
5+
export KAFKA_HOST=kafka.zilla-kafka-broker.svc.cluster.local
6+
export KAFKA_PORT=9092
7+
echo "==== This example requires env vars KAFKA_HOST and KAFKA_PORT for a running kafka instance. Setting to the default ($KAFKA_HOST:$KAFKA_PORT) ===="
8+
fi
9+
10+
# Install Zilla to the Kubernetes cluster with helm and wait for the pod to start up
11+
NAMESPACE=zilla-mqtt-kafka-asyncapi-proxy
12+
ZILLA_CHART=oci://ghcr.io/aklivity/charts/zilla
13+
echo "Installing $ZILLA_CHART to $NAMESPACE with Kafka at $KAFKA_HOST:$KAFKA_PORT"
14+
helm upgrade --install zilla $ZILLA_CHART --namespace $NAMESPACE --create-namespace --wait \
15+
--values values.yaml \
16+
--set extraEnv[1].value="\"$KAFKA_HOST\"",extraEnv[2].value="\"$KAFKA_PORT\"" \
17+
--set-file zilla\\.yaml=../../zilla.yaml \
18+
--set-file configMaps.mqtt.data.mqtt-asyncapi\\.yaml=../../mqtt/asyncapi.yaml \
19+
--set-file configMaps.kafka.data.kafka-asyncapi\\.yaml=../../kafka/asyncapi.yaml
20+
21+
# Create the mqtt topics in Kafka
22+
kubectl run kafka-init-pod --image=bitnami/kafka:3.2 --namespace $NAMESPACE --rm --restart=Never -i -t -- /bin/sh -c "
23+
echo 'Creating topics for $KAFKA_HOST:$KAFKA_PORT'
24+
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --create --if-not-exists --topic mqtt-messages
25+
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --create --if-not-exists --topic sensors
26+
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --create --if-not-exists --topic mqtt-retained --config cleanup.policy=compact
27+
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --create --if-not-exists --topic mqtt-sessions --config cleanup.policy=compact
28+
"
29+
kubectl wait --namespace $NAMESPACE --for=delete pod/kafka-init-pod
30+
31+
# Start port forwarding
32+
SERVICE_PORTS=$(kubectl get svc --namespace $NAMESPACE zilla --template "{{ range .spec.ports }}{{.port}} {{ end }}")
33+
eval "kubectl port-forward --namespace $NAMESPACE service/zilla $SERVICE_PORTS" > /tmp/kubectl-zilla.log 2>&1 &
34+
35+
if [[ -x "$(command -v nc)" ]]; then
36+
until nc -z localhost 7183; do sleep 1; done
37+
fi
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#!/bin/bash
2+
set -x
3+
4+
# Stop port forwarding
5+
pgrep kubectl && killall kubectl
6+
7+
# Uninstall Zilla and Kafka
8+
NAMESPACE=zilla-mqtt-kafka-asyncapi-proxy
9+
helm uninstall zilla --namespace $NAMESPACE
10+
kubectl delete namespace $NAMESPACE
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
extraEnv:
2+
- name: KEYSTORE_PASSWORD
3+
value: generated
4+
- name: KAFKA_HOST
5+
value: ""
6+
- name: KAFKA_PORT
7+
value: ""
8+
- name: JAVA_OPTIONS
9+
value: "-Dzilla.incubator.enabled=true"
10+
11+
livenessProbePort: 7183
12+
readinessProbePort: 7183
13+
14+
service:
15+
ports:
16+
- port: 7183
17+
name: mqtt
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#
2+
# Copyright 2021-2023 Aklivity Inc.
3+
#
4+
# Aklivity licenses this file to you under the Apache License,
5+
# version 2.0 (the "License"); you may not use this file except in compliance
6+
# with the License. You may obtain a copy of the License at:
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
# License for the specific language governing permissions and limitations
14+
# under the License.
15+
#
16+
17+
asyncapi: 3.0.0
18+
info:
19+
title: Zilla Kafka Proxy
20+
version: 1.0.0
21+
license:
22+
name: Aklivity Community License
23+
servers:
24+
plain:
25+
host: host.docker.internal:9092
26+
protocol: kafka
27+
28+
operations:
29+
onSensorData:
30+
action: receive
31+
channel:
32+
$ref: '#/channels/sensorData'
33+
toSensorData:
34+
action: send
35+
channel:
36+
$ref: '#/channels/sensorData'
37+
38+
channels:
39+
sensorData:
40+
description: This channel contains a message for sensors.
41+
address: sensors
42+
messages:
43+
sensorData:
44+
$ref: '#/components/messages/sensorData'
45+
mqttSessions:
46+
description: This channel contains MQTT sessions.
47+
address: mqtt-sessions
48+
mqttMessages:
49+
description: This channel contains MQTT messages.
50+
address: mqtt-messages
51+
mqttRetained:
52+
description: This channel contains MQTT retained messages.
53+
address: mqtt-retained
54+
55+
components:
56+
messages:
57+
sensorData:
58+
payload:
59+
type: object
60+
properties:
61+
sensorId:
62+
type: integer
63+
description: This property describes the id of the sensor
64+
message:
65+
type: string
66+
description: This property describes message of the sensor
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#
2+
# Copyright 2021-2023 Aklivity Inc.
3+
#
4+
# Aklivity licenses this file to you under the Apache License,
5+
# version 2.0 (the "License"); you may not use this file except in compliance
6+
# with the License. You may obtain a copy of the License at:
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
# License for the specific language governing permissions and limitations
14+
# under the License.
15+
#
16+
17+
asyncapi: 3.0.0
18+
info:
19+
title: Zilla MQTT Proxy
20+
version: 1.0.0
21+
license:
22+
name: Aklivity Community License
23+
servers:
24+
plain:
25+
host: localhost:7183
26+
protocol: mqtt
27+
defaultContentType: application/json
28+
29+
channels:
30+
sensors:
31+
address: "sensors/{sensorId}"
32+
title: MQTT Topic to produce & consume topic.
33+
parameters:
34+
streetlightId:
35+
$ref: '#/components/parameters/sensorId'
36+
messages:
37+
item:
38+
$ref: '#/components/messages/item'
39+
40+
operations:
41+
sendEvents:
42+
action: send
43+
channel:
44+
$ref: '#/channels/sensors'
45+
46+
receiveEvents:
47+
action: receive
48+
channel:
49+
$ref: '#/channels/sensors'
50+
51+
components:
52+
parameters:
53+
sensorId:
54+
description: Sensor ID
55+
location: $message.header#/id
56+
messages:
57+
item:
58+
name: event
59+
title: An event
60+
contentType: application/json
61+
payload:
62+
type: object
63+
properties:
64+
item:
65+
$ref: "#/components/schemas/item"
66+
schemas:
67+
item:
68+
type: object
69+
properties:
70+
id:
71+
type: string
72+
status:
73+
type: string
74+
required:
75+
- id
76+
- status

0 commit comments

Comments
 (0)