Skip to content

Feature suggestion: Kafka Streaming for High-Frequency Attribute Ingestion #1898

@cfreyfh

Description

@cfreyfh

Together with your new DevOps (Claude) I created a concept for Kafka Streaming input in Orion-LD. What do you think, would this be a good place to start?


Overview

This document proposes a Kafka-based streaming consumer for Orion-LD to efficiently handle high-frequency attribute updates (e.g., 100+ values per second) without the overhead of individual REST API calls.

Problem Statement

  • Current REST API approach: 1 HTTP request per attribute update
  • At 100+ updates/sec: Massive network overhead, high latency, significant CPU usage
  • No batching: Each update triggers individual MongoDB/PostgreSQL writes
  • Better suited for: Batch operations, but requires explicit grouping by client

Solution: Kafka Consumer with Batch Flushing

Instead of individual REST calls, stream attribute updates via Kafka with automatic batching and optimized database writes.


Architecture

Data Flow

┌─────────────────────────────────────────────────────────────────┐
│                    Kafka Topic                                  │
│         (entity-attributes-stream, partitioned)                 │
└──────────────────────────┬──────────────────────────────────────┘
                           │
                    ┌──────v──────────┐
                    │  Consumer Group │
                    │ (orionld-stream)│
                    └──────┬──────────┘
                           │
   ┌───────────────────────┼───────────────────────┐
   │                       │                       │
   v                       v                       v
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Worker Thread 1  │ │ Worker Thread 2  │ │ Worker Thread N  │
│ (Partition 0)    │ │ (Partition 1)    │ │ (Partition N)    │
├──────────────────┤ ├──────────────────┤ ├──────────────────┤
│ - Poll Kafka     │ │ - Poll Kafka     │ │ - Poll Kafka     │
│ - Validate JSON  │ │ - Validate JSON  │ │ - Validate JSON  │
│ - Buffer Messages│ │ - Buffer Messages│ │ - Buffer Messages│
│ - Batch Flush    │ │ - Batch Flush    │ │ - Batch Flush    │
└──────┬───────────┘ └──────┬───────────┘ └──────┬───────────┘
       │                    │                    │
       └────────────────────┼────────────────────┘
                            │
              ┌─────────────┴─────────────┐
              │                           │
         ┌────v─────────────┐   ┌────────v──────────┐
         │    MongoDB       │   │  PostgreSQL+TRoE  │
         │  (entities/attrs)│   │ (temporal tables) │
         └──────────────────┘   └───────────────────┘

Message Format (Kafka Topic: entity-attributes-stream)

{
  "entityId": "urn:ngsi-ld:Sensor:temp-01",
  "entityType": "TemperatureSensor",
  "attributes": [
    {
      "name": "temperature",
      "type": "Property",
      "value": 25.3,
      "unitCode": "CEL",
      "observedAt": "2026-02-05T10:30:00.123Z"
    },
    {
      "name": "location",
      "type": "GeoProperty",
      "value": {
        "type": "Point",
        "coordinates": [40.123, -3.456]
      }
    }
  ],
  "timestamp": 1707128400123,
  "@context": ["https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld"]
}

Implementation Details

1. New Module Structure

src/lib/orionld/kafka/
├── kafkaConsumerInit.h/cpp         # Consumer initialization
├── kafkaMessageHandler.h/cpp        # Message parsing & validation
├── kafkaAttributeBatch.h/cpp        # Batching logic
├── kafkaWorkerThread.h/cpp          # Worker thread main loop
├── kafkaMongoFlush.h/cpp            # MongoDB batch writes
├── kafkaPostgresFlush.h/cpp         # PostgreSQL/TRoE batch writes
└── CMakeLists.txt

2. Data Structures

StreamMessage

typedef struct {
  char*        entityId;              // Entity URI
  char*        entityType;            // Entity type
  KjNode*      attributesP;           // Parsed attributes (KjNode tree)
  char*        contextP;              // @context (optional)
  long long    timestamp;             // Message timestamp (ms)
  char*        datasetId;             // Optional dataset ID
} StreamMessage;

AttributeBatch

typedef struct {
  int                capacity;        // Max messages in batch
  int                count;           // Current message count
  StreamMessage*     messagesV;       // Array of messages
  long long          createdAt;       // When batch was created (ms)
  pthread_mutex_t    lock;            // Thread-safe access
  KAlloc             kalloc;          // Request-scoped allocator for batch
} AttributeBatch;

3. Core Functions

kafkaConsumerInit()

/**
 * Initialize Kafka consumer and connect to broker(s)
 *
 * Parameters:
 *   brokerListP     - Comma-separated broker addresses (e.g., "localhost:9092,kafka2:9092")
 *   topicP          - Kafka topic name (e.g., "entity-attributes-stream")
 *   consumerGroupP  - Consumer group ID (e.g., "orionld-streaming-group")
 *
 * Returns:
 *   true  - Consumer initialized successfully
 *   false - Initialization failed (error details in orionldState.response)
 */
extern bool kafkaConsumerInit(const char* brokerListP,
                              const char* topicP,
                              const char* consumerGroupP);

kafkaMessageHandle()

/**
 * Parse and validate a single Kafka message
 *
 * Parameters:
 *   messageP        - Raw Kafka message bytes
 *   messageLen      - Length of message
 *   outP            - Output StreamMessage (allocated with kalloc)
 *
 * Returns:
 *   true  - Message valid and parsed
 *   false - Validation failed (entityId/type missing, invalid JSON, etc.)
 */
extern bool kafkaMessageHandle(const char* messageP,
                               int messageLen,
                               StreamMessage* outP);

kafkaAttributeBatchAdd()

/**
 * Add a message to the current batch
 *
 * Parameters:
 *   batchP          - Batch buffer
 *   msgP            - Parsed stream message
 *
 * Returns:
 *   true  - Message added to batch
 *   false - Batch full, flush required
 */
extern bool kafkaAttributeBatchAdd(AttributeBatch* batchP, StreamMessage* msgP);

kafkaAttributeBatchFlush()

/**
 * Write all buffered messages to MongoDB and PostgreSQL
 *
 * Batching strategy:
 *   1. Group messages by entity ID for efficient writes
 *   2. Perform bulk MongoDB updateMany() operations
 *   3. Append all attributes to PostgreSQL INSERT buffers
 *   4. Execute PostgreSQL transaction (all-or-nothing)
 *   5. Commit Kafka offsets after successful DB writes
 *
 * Returns:
 *   true  - All messages written successfully
 *   false - Write failed (rollback Kafka offset)
 */
extern bool kafkaAttributeBatchFlush(AttributeBatch* batchP);

kafkaWorkerThreadStart()

/**
 * Main worker thread loop
 *
 * Loop logic:
 *   - Poll Kafka consumer (timeout: 100ms)
 *   - Parse message via kafkaMessageHandle()
 *   - Add to batch via kafkaAttributeBatchAdd()
 *   - Flush batch if:
 *     a) batchSize >= KAFKA_BATCH_SIZE (e.g., 500)
 *     b) age >= KAFKA_BATCH_TIMEOUT_MS (e.g., 100ms)
 *     c) kafkaAttributeBatchAdd() returns false (batch full)
 *   - On error: Log, skip message, continue
 *   - Graceful shutdown on signal
 */
extern void* kafkaWorkerThreadStart(void* argP);

4. Batch Flush Implementation

bool kafkaAttributeBatchFlush(AttributeBatch* batchP)
{
  if (batchP->count == 0)
    return true;

  KT_I("Flushing Kafka batch: %d messages", batchP->count);

  // 1. Group by entity for efficient bulk writes
  OrionldEntityGroup* groupsP = groupByEntity(batchP);

  // 2. MongoDB: Batch UpdateMany operations
  for (OrionldEntityGroup* gP = groupsP; gP != NULL; gP = gP->next)
  {
    if (orionldState.kafkaStoreInMongo == true)
    {
      if (mongodbBatchUpdateAttributes(gP->entityId,
                                       gP->attributesV,
                                       gP->attrCount) == false)
      {
        KT_E("MongoDB write failed for entity: %s", gP->entityId);
        return false;  // Rollback Kafka offset
      }
    }
  }

  // 3. PostgreSQL/TRoE: Batch INSERT
  if (orionldState.kafkaStoreInTroe == true)
  {
    PgAppendBuffer entities, attributes, subAttributes;
    pgAppendInit(&entities, 16*1024);
    pgAppendInit(&attributes, 64*1024);
    pgAppendInit(&subAttributes, 16*1024);

    pgAppend(&entities,      PG_ENTITY_INSERT_START, 0);
    pgAppend(&attributes,    PG_ATTRIBUTE_INSERT_START, 0);
    pgAppend(&subAttributes, PG_SUB_ATTRIBUTE_INSERT_START, 0);

    // Append all attributes from all entities
    for (OrionldEntityGroup* gP = groupsP; gP != NULL; gP = gP->next)
    {
      for (int i = 0; i < gP->attrCount; i++)
      {
        pgEntityAppend(&entities, gP->entityId, gP->entityType, ...);
        pgAttributeAppend(&attributes, gP->entityId, gP->attributesV[i], ...);
        // pgSubAttributeAppend for nested attributes
      }
    }

    // 4. Commit transaction
    if (pgTransactionBegin() == false ||
        pgCommands(&entities, &attributes, &subAttributes) == false ||
        pgTransactionCommit() == false)
    {
      KT_E("PostgreSQL write failed");
      pgTransactionRollback();
      return false;  // Rollback Kafka offset
    }
  }

  // 5. Commit Kafka offsets (only after successful DB writes)
  if (kafkaOffsetCommit() == false)
  {
    KT_W("Failed to commit Kafka offset (messages may be reprocessed)");
    // Continue anyway - data is already in DB
  }

  batchP->count = 0;
  return true;
}

5. Integration with Main Broker

src/lib/orionld/config/

Add to orionldConfig.h:

typedef struct {
  // ... existing fields ...

  // Kafka Streaming Configuration
  bool       kafkaEnabled;            // Enable/disable streaming consumer
  char*      kafkaBrokers;            // "localhost:9092,kafka2:9092"
  char*      kafkaConsumerGroup;      // "orionld-streaming-group"
  char*      kafkaTopic;              // "entity-attributes-stream"
  int        kafkaBatchSize;          // Max messages per batch (default: 500)
  int        kafkaBatchTimeoutMs;     // Max time before flush (default: 100)
  bool       kafkaStoreInMongo;       // Write to MongoDB (default: true)
  bool       kafkaStoreInTroe;        // Write to PostgreSQL/TRoE (default: true)
  int        kafkaWorkerThreads;      // Thread pool size (default: 1 per partition)
} OrionldConfig;

src/app/orionld/orionld.cpp

int main(int argc, char* argv[])
{
  // ... existing initialization ...

  // Parse command-line arguments for Kafka (existing argparse logic)
  // Examples:
  //   -kafka                          (enable)
  //   -kafkaBrokers "kafka1:9092"
  //   -kafkaTopic "entity-attrs"
  //   -kafkaBatchSize 500
  //   -kafkaBatchTimeoutMs 100

  // Start Kafka consumer if enabled
  if (orionldState.kafkaEnabled == true)
  {
    if (kafkaConsumerInit(orionldState.kafkaBrokers,
                          orionldState.kafkaTopic,
                          orionldState.kafkaConsumerGroup) == false)
    {
      KT_E("Failed to initialize Kafka consumer");
      return 1;
    }

    pthread_t kafkaWorkerThread;
    if (pthread_create(&kafkaWorkerThread, NULL, kafkaWorkerThreadStart, NULL) != 0)
    {
      KT_E("Failed to create Kafka worker thread");
      return 1;
    }

    KT_I("Kafka streaming consumer started");
    KT_I("  Brokers: %s", orionldState.kafkaBrokers);
    KT_I("  Topic: %s", orionldState.kafkaTopic);
    KT_I("  Batch Size: %d, Timeout: %dms", orionldState.kafkaBatchSize, orionldState.kafkaBatchTimeoutMs);
  }

  // Start REST API server (existing logic, unaffected)
  mhdServerStart();

  // ... cleanup ...
}

Configuration Examples

Command Line

# Enable Kafka streaming with defaults
orionld -kafka

# Custom broker and topic
orionld -kafka \
  -kafkaBrokers "kafka1:9092,kafka2:9092,kafka3:9092" \
  -kafkaTopic "entity-attributes-stream" \
  -kafkaBatchSize 1000 \
  -kafkaBatchTimeoutMs 200

# Streaming to TRoE only (temporal data)
orionld -kafka \
  -kafkaBrokers "kafka:9092" \
  -kafkaStoreInMongo false \
  -kafkaStoreInTroe true

Docker Compose

version: '3.8'

services:
  kafka:
    image: docker.io/bitnami/kafka:3.5
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092

  orion-ld:
    image: fiware/orion-ld:latest
    ports:
      - "1026:1026"
    depends_on:
      - mongo
      - postgres
      - kafka
    environment:
      KAFKA_ENABLED: "true"
      KAFKA_BROKERS: "kafka:9092"
      KAFKA_TOPIC: "entity-attributes-stream"
      KAFKA_BATCH_SIZE: "500"

Performance Comparison

Metric REST API Kafka Streaming
Throughput ~100 req/s ~10,000-50,000 msgs/s
Latency (p99) 20-50ms 100-200ms (batch window)
Network Overhead 1 HTTP per update 1 Kafka record per batch (500x smaller)
Database Writes 1 write per update 1 bulk write per batch
CPU Usage High (parsing, response) Low (validation only, batched writes)
Scalability O(1) linear degradation O(log n) with partitions

Example: 100 updates/sec

REST API approach:

  • 100 HTTP requests/sec
  • 100 MongoDB writes/sec
  • 100 PostgreSQL writes/sec
  • Network: 100 * 5KB = 500 KB/sec

Kafka Streaming approach:

  • ~1 Kafka poll/100ms → 10 messages/sec (if batch size = 500)
  • 1 MongoDB bulk write/100ms (500 ops in 1 request)
  • 1 PostgreSQL batch insert/100ms (500 records in 1 transaction)
  • Network: ~100 * 0.5KB = 50 KB/sec

Result: 10x lower latency, 10x less CPU, 10x less network bandwidth


Error Handling & Durability

Message Validation

  • JSON-LD syntax validation
  • Entity ID format checking (must be URI)
  • Entity type presence and format
  • Attribute type validation (Property, Relationship, GeoProperty)
  • Context expansion (using existing orionldContextCache)

Database Write Failure

  • Batch write fails → entire batch rolled back
  • Kafka offset NOT committed → message reprocessed
  • Worker thread logs error and continues
  • Optional: Dead letter queue for persistent failures

Offset Management

  • Offsets committed ONLY after successful DB writes
  • Automatic offset reset (earliest/latest) on startup
  • Manual offset reset available via admin API

Graceful Shutdown

  • Signal handler (SIGTERM) stops Kafka consumer
  • Worker thread flushes remaining batch
  • Offsets committed before shutdown
  • No message loss

Monitoring & Observability

Prometheus Metrics

# Kafka consumer metrics
kafka_messages_consumed_total{topic="entity-attributes-stream"}
kafka_messages_failed_total{topic="entity-attributes-stream"}
kafka_consumer_lag_seconds{topic="entity-attributes-stream"}

# Batch processing metrics
kafka_batch_flush_duration_seconds
kafka_batch_flush_size_count
kafka_batch_flush_errors_total

# Database write metrics
kafka_mongodb_write_duration_seconds
kafka_mongodb_write_errors_total
kafka_postgres_write_duration_seconds
kafka_postgres_write_errors_total

# Processing metrics
kafka_attribute_updates_total
kafka_entities_updated_total
kafka_worker_thread_alive{thread_id="0"}

Logging

  • KTrace levels for Kafka operations
  • Batch flush timing and size
  • Error details with entity ID for debugging

Implementation Phases

Phase 1: MVP (Core Streaming)

  • Kafka consumer initialization
  • Message parsing (JSON validation, context expansion)
  • Batch buffering (AttributeBatch structure)
  • Worker thread loop
  • MongoDB batch flush
  • Basic error handling

Phase 2: TRoE Integration

  • PostgreSQL batch flush
  • Temporal metadata handling (observedAt, creDate, modDate)
  • Transaction management
  • Offset commit after DB writes

Phase 3: Production Readiness

  • Comprehensive error handling & dead letter queue
  • Monitoring & metrics
  • Configuration validation
  • Graceful shutdown
  • Performance tuning
  • Integration tests
  • Functional tests (.test format)

Phase 4: Advanced Features

  • Multi-partition consumer (thread pool)
  • Schema validation (optional Avro/Protobuf)
  • Filtering by entity type
  • Compression support

Dependencies

  • librdkafka (C Kafka client) - already available in most Linux distros
  • Existing Orion-LD libraries: kjson, kalloc, ktrace
  • Existing database layer: mongoc, PostgreSQL client

CMakeLists.txt Addition

pkg_check_modules(LIBRDKAFKA REQUIRED rdkafka)

add_library(orionld_kafka
  src/lib/orionld/kafka/kafkaConsumerInit.cpp
  src/lib/orionld/kafka/kafkaMessageHandler.cpp
  src/lib/orionld/kafka/kafkaAttributeBatch.cpp
  src/lib/orionld/kafka/kafkaWorkerThread.cpp
  src/lib/orionld/kafka/kafkaMongoFlush.cpp
  src/lib/orionld/kafka/kafkaPostgresFlush.cpp
)

target_link_libraries(orionld_kafka
  ${LIBRDKAFKA_LIBRARIES}
  orionld_common
  orionld_db
  orionld_troe
)

Testing Strategy

Functional Tests

  • Single message ingestion (.test format)
  • Batch flushing with 500+ messages
  • MongoDB/PostgreSQL write verification
  • Kafka offset management
  • Error scenarios (broker down, write failure, etc.)

Future Enhancements

  1. Schema Registry Integration: Optional Avro schema validation
  2. Multi-partition Parallelism: Thread pool for consumer groups
  3. Attribute Filtering: Only stream specific attributes per entity
  4. Compression: Compress Kafka payloads for high-frequency streams
  5. Metrics Export: Prometheus /metrics endpoint
  6. Admin API: Runtime Kafka topic management, offset reset
  7. Replay Functionality: Re-ingest historical messages from Kafka

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions