Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 164 additions & 25 deletions en/docs/develop/integration-artifacts/event/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,11 @@ Consume messages from Apache Kafka topics with consumer group management, offset

![Kafka consumer creation form](/img/develop/integration-artifacts/event/kafka/step-creation-form.png)

**Service Configurations**


| Field | Description |
|---|---|
| **Bootstrap Servers** | Comma-separated list of Kafka broker addresses (e.g., `localhost:9092`). Required. |
| **Topic(s)** | One or more Kafka topic names to subscribe to. Required. |

**Advanced Configurations**

Expand **Advanced Configurations** to set additional options including the consumer group ID, offset reset policy, and polling interval.

4. Click **Create**.
Expand Down Expand Up @@ -92,37 +87,181 @@ service on orderListener {
</TabItem>
</Tabs>

## Offset Management Strategies
## Service configuration

| Strategy | Configuration | Behavior |
|---|---|---|
| **Auto-commit** | `autoCommit: true` | Offsets committed automatically at the polling interval |
| **Manual commit** | `autoCommit: false` | Call `caller->commit()` after processing |
| **Seek to beginning** | `autoOffsetReset: "earliest"` | Reprocess from the beginning of the topic |
| **Seek to end** | `autoOffsetReset: "latest"` | Skip to the latest messages only |
Service configuration controls the service name and the Kafka listener it is attached to.

<Tabs>
<TabItem value="ui" label="Visual Designer" default>

In the **Kafka Consumer Designer**, click **Configure** to open the **Kafka Event Integration Configuration** panel.

The left panel shows the service name and its **Attached Listeners**. Click **Kafka Listener** under **Attached Listeners** to configure the listener connection settings in the right panel.

| Field | Description |
|---|---|
| **Bootstrap Servers** | Kafka broker addresses. Accepts a Ballerina expression or a plain text value. |
| **Group Id** | Consumer group identifier. |
| **Topics** | Topics to subscribe to. |
| **Offset Reset** | Offset reset strategy when no initial offset is present. Options: `earliest`, `latest`, `none`. |

Click **+ Attach Listener** at the bottom of the panel to attach a different or existing named listener.

</TabItem>
<TabItem value="code" label="Ballerina Code">

Service configuration maps to the `kafka:ListenerConfiguration` record passed when constructing the listener:

```ballerina
listener kafka:Listener orderListener = new ({
bootstrapServers: "localhost:9092",
groupId: "order-processor",
topics: ["orders"],
autoOffsetReset: "earliest"
});

service on orderListener {
// handlers
}
```

</TabItem>
</Tabs>

## Listener configuration

The listener connects to Kafka brokers and manages topic subscriptions and consumer group coordination. You can configure the listener directly from the **Listeners** panel in the sidebar.

<Tabs>
<TabItem value="ui" label="Visual Designer" default>

In the sidebar, expand **Listeners** and click the listener name (for example, `kafkaListener`) to open the **Kafka Listener Configuration** form.

![Kafka Listener Configuration panel](/img/develop/integration-artifacts/event/kafka/listener-config-1.png)

| Field | Description |
|---|---|
| **Name** | Identifier for the listener, used in the service declaration. Required. |
| **Bootstrap Servers** | Comma-separated list of Kafka broker `host:port` addresses. Required. |
| **Topics** | Topics for the consumer to subscribe to. |
| **Group Id** | Consumer group identifier for coordinated offset tracking. |
| **Offset Reset** | Strategy when no committed offset exists: `earliest`, `latest`, or `none`. |
| **Partition Assignment Strategy** | Class that implements the partition assignment strategy among consumer group members. |
| **Metrics Recording Level** | Level of metrics recorded by the Kafka client (for example, `INFO` or `DEBUG`). |

</TabItem>
<TabItem value="code" label="Ballerina Code">

**Named listener** — declare the listener at module level and attach services to it:

```ballerina
listener kafka:Listener kafkaListener = new ({
bootstrapServers: "localhost:9092",
groupId: "order-processor",
topics: ["orders"],
pollingInterval: 1,
autoCommit: false,
autoOffsetReset: "earliest"
});

service on kafkaListener {
remote function onConsumerRecord(kafka:Caller caller, OrderEvent[] orders) returns error? {
// process messages
}
}
```

<!-- ## Common Patterns
Key `kafka:ListenerConfiguration` fields:

### Dead Letter Queue (DLQ)
| Field | Type | Default | Description |
|---|---|---|---|
| `bootstrapServers` | `string` | — | Kafka broker address list. Required. |
| `groupId` | `string` | — | Consumer group identifier. |
| `topics` | `string[]` | — | Topics to subscribe to. |
| `pollingInterval` | `decimal` | `1` | Seconds between polling cycles. |
| `autoCommit` | `boolean` | `true` | Automatically commit offsets after each poll. |
| `autoCommitInterval` | `decimal` | `5` | Auto-commit interval in seconds when `autoCommit: true`. |
| `autoOffsetReset` | `string` | `"latest"` | Offset reset strategy: `"earliest"`, `"latest"`, or `"none"`. |
| `concurrentConsumers` | `int` | `1` | Number of concurrent consumer threads. |
| `isolationLevel` | `string` | `"read_uncommitted"` | Transactional read isolation: `"read_committed"` or `"read_uncommitted"`. |

</TabItem>
</Tabs>

Route failed messages to a dead letter queue for manual inspection or retry.
## Event handler configuration

A Kafka service defines remote functions that the runtime calls when records arrive or errors occur. Add handlers from the **Kafka Consumer Designer** using the **+ Handler** button.

<Tabs>
<TabItem value="ui" label="Visual Designer" default>

In the **Kafka Consumer Designer**, the **Event Handlers** section lists all handlers. Click **+ Handler** to add a new handler. Each row shows an **Event** badge and the handler name. Click the settings icon (⚙) on a handler row to configure its parameters.

| Handler | Trigger | Required |
|---|---|---|
| **onConsumerRecord** | Called for each batch of records received from subscribed topics. | Yes |
| **onError** | Called when the Kafka consumer encounters an error. | No |

</TabItem>
<TabItem value="code" label="Ballerina Code">

**`onConsumerRecord`** — receives a batch of messages. The message type can be `string`, `json`, `xml`, `byte[]`, or a custom record:

```ballerina
function processWithDLQ(kafka:Caller caller, OrderEvent order) returns error? {
do {
remote function onConsumerRecord(kafka:Caller caller, OrderEvent[] orders) returns error? {
foreach OrderEvent order in orders {
check processOrder(order);
check caller->commit();
} on fail error e {
log:printError("Processing failed, sending to DLQ", orderId = order.orderId);
check sendToDLQ(order, e.message());
check caller->commit(); // Acknowledge so it does not reprocess
}
check caller->commit();
}
```

The `kafka:Caller` parameter provides offset management methods:

| Method | Description |
|---|---|
| `caller->commit()` | Commit offsets for the current batch. Use when `autoCommit: false`. |
| `caller->seek(partition, offset)` | Seek to a specific offset on a given partition. |

**`onError`** — handles consumer-level errors such as deserialization failures or connection issues:

```ballerina
remote function onError(kafka:Error err) {
log:printError("Consumer error", 'error = err);
}
```

**Typed message payloads** — Ballerina deserializes JSON automatically when the parameter type is a record:

```ballerina
type OrderEvent record {|
string orderId;
string customerId;
decimal amount;
|};

remote function onConsumerRecord(kafka:Caller caller, OrderEvent[] orders) returns error? {
foreach OrderEvent order in orders {
log:printInfo("Order received", orderId = order.orderId);
}
}
``` -->
```

</TabItem>
</Tabs>

## Offset management strategies

| Strategy | Configuration | Behavior |
|---|---|---|
| **Auto-commit** | `autoCommit: true` | Offsets committed automatically at the polling interval |
| **Manual commit** | `autoCommit: false` | Call `caller->commit()` after processing |
| **Seek to beginning** | `autoOffsetReset: "earliest"` | Reprocess from the beginning of the topic |
| **Seek to end** | `autoOffsetReset: "latest"` | Skip to the latest messages only |

### Acknowledgment Strategies
### Acknowledgment strategies

| Strategy | Guarantee | Use Case |
| Strategy | Guarantee | Use case |
|---|---|---|
| Auto-acknowledge | At most once | Low-value events, metrics |
| Manual acknowledge | At least once | Business-critical events |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,40 @@ title: Configurations
description: Externalize environment-specific settings using configurable variables and Config.toml.
---

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

# Configurations

Configuration artifacts externalize values that change between environments using Ballerina's `configurable` keyword. This separates environment-specific settings (URLs, credentials, feature flags) from your integration logic.

## Adding a configuration

<Tabs>
<TabItem value="ui" label="Visual Designer" default>

1. Open the **WSO2 Integrator: BI** sidebar in VS Code.

![WSO2 Integrator sidebar showing the project structure with Configurations listed](/img/develop/integration-artifacts/supporting/configurations/step-1.png)

2. Click **+** next to **Configurations** in the sidebar.

3. In the **Add Configurable Variable** panel, fill in the following fields:

![Add Configurable Variable form showing Variable Name, Variable Type, Default Value, and Documentation fields](/img/develop/integration-artifacts/supporting/configurations/step-2.png)

| Field | Description |
|---|---|
| **Variable Name** | The identifier used to reference the variable in code (for example, `apiEndpoint`). Required. |
| **Variable Type** | The Ballerina type of the variable (for example, `string`, `int`, `boolean`). Required. |
| **Default Value** | An optional default value. Leave empty to make the variable required — the integration will not start unless it is provided in `Config.toml`. |
| **Documentation** | Optional description in Markdown format, used as inline documentation. |

4. Click **Save**. The variable is added to your project's configurable declarations.

</TabItem>
<TabItem value="code" label="Ballerina Code">

```ballerina
// config.bal

Expand Down Expand Up @@ -34,9 +64,29 @@ type NotificationConfig record {|
|};
```

## Config.toml
</TabItem>
</Tabs>

## Viewing configurations

<Tabs>
<TabItem value="ui" label="Visual Designer" default>

Click **View Configurations** (or the configurations icon) next to **Configurations** in the sidebar to open the **Configurable Variables** panel.

![Configurable Variables panel showing variables grouped by Integration and Imported libraries](/img/develop/integration-artifacts/supporting/configurations/step-3.png)

Provide values for configurable variables in a `Config.toml` file at the project root.
The panel organizes variables into two groups:

- **Integration** — variables declared in your integration project. Each entry shows the variable name, type, and default value.
- **Imported libraries** — configurable variables exposed by libraries your integration depends on (for example, `ballerina/http` or `ballerina/log`).

Use the **Search Configurables** box to filter by name. Click a variable to edit or delete it.

</TabItem>
<TabItem value="code" label="Ballerina Code">

Provide values for configurable variables in a `Config.toml` file at the project root:

```toml
apiEndpoint = "https://api.example.com/v2"
Expand All @@ -52,7 +102,10 @@ slackEnabled = true
slackWebhookUrl = "https://hooks.slack.com/services/..."
```

## Configuration Types
</TabItem>
</Tabs>

## Configuration types

| Type | Example | Notes |
|---|---|---|
Expand All @@ -62,7 +115,7 @@ slackWebhookUrl = "https://hooks.slack.com/services/..."
| **Record** | `configurable DbConfig db = {...};` | Grouped settings for a subsystem |
| **Array** | `configurable string[] allowedOrigins = [];` | Lists of values |

## Environment-Specific Overrides
## Environment-specific overrides

Use different `Config.toml` files for each environment.

Expand All @@ -74,7 +127,7 @@ my-integration/
└── ...
```

## Best Practices
## Best practices

| Practice | Description |
|---|---|
Expand Down
Loading