Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bitnami/kafka] Kafka streaming for the Inference Server: kafka.errors.NoBrokersAvailable: NoBrokersAvailable #78349

Open
danilyef opened this issue Feb 25, 2025 · 1 comment
Assignees
Labels
kafka tech-issues The user has a technical issue about an application triage Triage is needed

Comments

@danilyef
Copy link

Name and Version

bitnami/kafka:latest

What architecture are you using?

amd64

What steps will reproduce the bug?

I want to create a dockerized version of streaming, which:

  1. Sends request to the queue "Request" through post method in FastAPI.
  2. Inference worker classifies sent text (sentiment analysis: negative or positive)
  3. The result is sent back to another queue "Predictions" and retrieve via get method.

My docker-compose file:

version: '2'
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    hostname: zookeeper
    restart: unless-stopped
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

      
  
  kafka-1:
    image: bitnami/kafka:latest
    container_name: kafka-1
    ports:
      - "9094:9094"
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094
      - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://localhost:9094
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

  kafka-2:
    image: bitnami/kafka:latest
    container_name: kafka-2
    ports:
      - "9095:9095"
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,OUTSIDE://0.0.0.0:9095
      - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9093,OUTSIDE://localhost:9095
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper



  fastapi-server:
    build: ./server
    container_name: fastapi-server
    ports:
      - "8000:8000"
    depends_on:
      - kafka-1
      - kafka-2

  worker:
    build: ./worker
    container_name: sentiment-worker
    depends_on:
      - kafka-1
      - kafka-2

When I use docker-compose up command my fastAPI sever files with an error: "kafka.errors.NoBrokersAvailable: NoBrokersAvailable". I don't understand exactly why, because I specifically defined 2 brokers for 2 queues.

Server file:

from fastapi import FastAPI
from kafka import KafkaProducer, KafkaConsumer
import json
import uuid
import os

app = FastAPI()


KAFKA_BROKER = os.getenv("KAFKA_BROKER", "kafka-2:9093")
REQUEST_TOPIC = "requests"
PREDICTION_TOPIC = "predictions"


consumer = KafkaConsumer(
    PREDICTION_TOPIC,
    bootstrap_servers=KAFKA_BROKER,
    value_deserializer=lambda x: json.loads(x.decode("utf-8")),
    group_id="result_consumer",
    auto_offset_reset="earliest"
)


producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)


@app.post("/predict")
async def send_request(text: str):
    request_id = str(uuid.uuid4())  
    message = {"id": request_id, "text": text}
    producer.send(REQUEST_TOPIC, message)
    return {"message": "Request sent", "request_id": request_id}


@app.get("/result/{request_id}")
async def get_result(request_id: str):
    messages = consumer.poll(timeout_ms=2000)
    if messages:
        for _, records in messages.items():
            for record in records:
                if record.value["id"] == request_id:
                    return record.value["result"]
    
    return {"error": "Result not found"}

Sentiment-analysis file:

from kafka import KafkaConsumer, KafkaProducer
from transformers import pipeline
import json
import os


KAFKA_BROKER = os.getenv("KAFKA_BROKER", "kafka-1:9092")
REQUEST_TOPIC = "requests"
PREDICTION_TOPIC = "predictions"

sentiment_pipeline = pipeline("sentiment-analysis")

consumer = KafkaConsumer(
    REQUEST_TOPIC,
    bootstrap_servers=KAFKA_BROKER,
    value_deserializer=lambda x: json.loads(x.decode("utf-8")),
    group_id="sentiment_worker",
    auto_offset_reset="earliest"
)

producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)


for message in consumer:
    request = message.value
    result = sentiment_pipeline(request["text"])[0]
    response = {"id": request["id"],"result":result}

    producer.send(PREDICTION_TOPIC,response)

What is the expected behavior?

No response

What do you see instead?

In fast-api server:

kafka.errors.NoBrokersAvailable: NoBrokersAvailable

in kafka-1:
Error processing create topic request CreatableTopic(name='requests', numPartitions=1, replicationFactor=1, assignments=[], configs=[]) (kafk

Additional information

No response

@danilyef danilyef added the tech-issues The user has a technical issue about an application label Feb 25, 2025
@github-actions github-actions bot added the triage Triage is needed label Feb 25, 2025
@javsalgar javsalgar changed the title Kafka streaming for the Inference Server: kafka.errors.NoBrokersAvailable: NoBrokersAvailable [bitnami/kafka] Kafka streaming for the Inference Server: kafka.errors.NoBrokersAvailable: NoBrokersAvailable Feb 26, 2025
@javsalgar
Copy link
Contributor

Hi,

Did you try with this sample docker-compose file? https://github.com/bitnami/containers/blob/main/bitnami/kafka/docker-compose-cluster.yml

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kafka tech-issues The user has a technical issue about an application triage Triage is needed
Projects
None yet
Development

No branches or pull requests

2 participants