It is necessary to have a Message Broker running in order to use the Stream Interface. The available Brokers are:
kafka
bullmq
mqtt
rabbitmq
npm i @flowbuild/streamers
The required configuration object has a structure similar to:
{
"topics":{
"event-topic":{
"producesTo":["bullmq", "kafka", "mqtt", "rabbitmq"],
"consumesFrom":["bullmq", "kafka", "mqtt", "rabbitmq"],
},
},
"kafka": {
"CLIENT_ID": "my-kafka-id",
"BROKER_HOST": "localhost",
"BROKER_PORT": "9092",
"GROUP_CONSUMER_ID": "my-consumer-group",
},
"bullmq": {
"REDIS_HOST": "localhost",
"REDIS_PORT": "6379",
"REDIS_PASSWORD": "",
"REDIS_DB": 4,
},
"mqtt": {
"MQTT_HOST": "localhost",
"MQTT_PORT": "1883",
"MQTT_PROTOCOL": "http",
"MQTT_USERNAME": "username",
"MQTT_PASSWORD": "password",
},
"rabbitmq": {
"RABBITMQ_HOST": "localhost:5672",
"RABBITMQ_USERNAME": "user",
"RABBITMQ_PASSWORD": "password",
"RABBITMQ_QUEUE": "flowbuild"
}
}
In topics you must put the name of the events and a relation of Consumption and Production listing the brokers that will be used.
For each broker you want to use, you must put the necessary configuration in the respective configuration key
const stream = new StreamInterface({
"topics":{
"event-topic":{
"producesTo":["bullmq", "kafka", "mqtt", "rabbitmq"],
"consumesFrom":["bullmq", "kafka", "mqtt", "rabbitmq"],
},
},
"kafka": {
"CLIENT_ID": "my-kafka-id",
"BROKER_HOST": "localhost",
"BROKER_PORT": "9092",
"GROUP_CONSUMER_ID": "my-consumer-group",
},
"bullmq": {
"REDIS_HOST": "localhost",
"REDIS_PORT": "6379",
"REDIS_PASSWORD": "",
"REDIS_DB": 4,
},
"mqtt": {
"MQTT_HOST": "localhost",
"MQTT_PORT": "1883",
"MQTT_PROTOCOL": "http",
"MQTT_USERNAME": "username",
"MQTT_PASSWORD": "password",
},
"rabbitmq": {
"RABBITMQ_HOST": "localhost:5672",
"RABBITMQ_USERNAME": "user",
"RABBITMQ_PASSWORD": "password",
"RABBITMQ_QUEUE": "flowbuild"
}
});
const consumerCallback = (topic: string, receivedMessage: string) => {
console.log({topic, receivedMessage});
};
await stream.connect(consumerCallback);
await stream.produce(
"event-topic",
{"mensagem": "This is an test"},
);
await stream.produce(
"event-topic",
{"mensagem": "This is another test"},
);