Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/app/configuration/_meta.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export default {
'transformations': '',
'pipeline-json-reference': '',
'data-format': '',
'supported-kafka-connections': '',
Expand Down
149 changes: 149 additions & 0 deletions docs/app/configuration/transformations/deduplication/page.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
---
title: 'Deduplication'
description: 'Learn how deduplication works in GlassFlow and how to configure it'
---

# Deduplication

The **Deduplication** transformation removes duplicate events from your data stream, ensuring that each unique event is processed only once. This is essential for maintaining data quality and preventing duplicate records in your ClickHouse tables.

## How It Works

Deduplication in GlassFlow uses a key-value store ([BadgerDB](https://docs.hypermode.com/badger/overview)) to track unique event identifiers. The process works as follows:

### Internal Process

1. **Message Consumption**: Messages are read from the ingestor output stream in batches
2. **Duplicate Detection**: Each message's unique identifier (specified by `id_field`) is checked against the deduplication store
3. **Filtering**: Messages with identifiers that already exist in the store are filtered out as duplicates
4. **Storage**: Unique messages have their identifiers stored in the deduplication store
5. **Forwarding**: Only unique messages are forwarded to the downstream component (sink or join)

### Time Window

The deduplication store maintains entries for a configurable time window. After the time window expires, entries are automatically evicted from the store. This means:

- Events with the same ID that arrive within the time window are considered duplicates
- Events with the same ID that arrive after the time window expires are treated as new events
- The time window prevents the deduplication store from growing indefinitely

### Atomic Transactions

Deduplication uses atomic transactions to ensure data consistency:

- When processing a batch, all deduplication checks and store updates happen within a single transaction
- If processing fails, the transaction is not committed, ensuring no duplicate keys are saved
- This prevents duplicate events from being produced even during processing failures
- Messages are only acknowledged after successful deduplication and forwarding

For more details on how deduplication handles failures, see the [Data Flow](/architecture/data-flow#stage-3-deduplication-optional) documentation.

## Configuration

Deduplication is configured per Kafka topic in the pipeline configuration. Here's the configuration structure:

```json
{
"source": {
"topics": [
{
"name": "my-topic",
"deduplication": {
"enabled": true,
"id_field": "event_id",
"id_field_type": "string",
"time_window": "24h"
}
}
]
}
}
```

### Configuration Fields

| Field | Type | Required | Description |
|-------|------|----------|-------------|
| `enabled` | boolean | Yes | Whether deduplication is enabled for this topic |
| `id_field` | string | Yes | The field name in your events that contains the unique identifier |
| `id_field_type` | string | Yes | The data type of the ID field (e.g., `"string"`, `"int"`, `"uuid"`) |
| `time_window` | string | Yes | Time window for deduplication (e.g., `"1h"`, `"24h"`). See [Time Windows](#time-windows) |

### Time Windows

Time windows specify how long duplicate detection should be active. Supported formats:

- `"30s"` - 30 seconds
- `"1m"` - 1 minute
- `"1h"` - 1 hour
- `"24h"` - 24 hours
- `"7d"` - 7 days

The time window determines:
- How long an event ID is remembered in the deduplication store
- The maximum time between duplicate events that will be detected
- When entries are automatically evicted from the store

## Best Practices

### Choosing an ID Field

- **Use a truly unique identifier**: The `id_field` should be unique across all events (e.g., UUID, transaction ID, event ID)
- **Avoid timestamps**: Timestamps are not unique and should not be used as the deduplication key
- **Consider composite keys**: If no single field is unique, consider creating a composite key in your data

### Setting Time Windows

- **Match your use case**: Set the time window based on how long duplicates might arrive (e.g., retry windows, network delays)
- **Balance memory and coverage**: Longer windows use more memory but catch duplicates over longer periods
- **Consider event frequency**: For high-frequency events, shorter windows may be sufficient

### Performance Considerations

- **Batch processing**: Deduplication processes messages in batches for efficiency
- **Memory usage**: The deduplication store size depends on the number of unique IDs within the time window
- **Storage location**: The BadgerDB store is persisted to disk, ensuring deduplication state survives restarts

## Example Configuration

Here's a complete example of a pipeline with deduplication enabled:

```json
{
"version": "v2",
"pipeline_id": "deduplicated-pipeline",
"name": "Deduplicated Events Pipeline",
"source": {
"type": "kafka",
"connection_params": {
"brokers": ["kafka:9092"]
},
"topics": [
{
"name": "user-events",
"deduplication": {
"enabled": true,
"id_field": "event_id",
"id_field_type": "string",
"time_window": "24h"
}
}
]
},
"sink": {
"type": "clickhouse",
"connection_params": {
"host": "clickhouse:9000",
"database": "analytics",
"table": "user_events"
}
}
}
```

## Related Documentation

- [Data Flow - Deduplication Stage](/architecture/data-flow#stage-3-deduplication-optional)
- [Pipeline JSON Reference - Deduplication Configuration](/configuration/pipeline-json-reference#deduplication-configuration)
- [Transformations Overview](/configuration/transformations)

216 changes: 216 additions & 0 deletions docs/app/configuration/transformations/filter/page.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
---
title: 'Filter'
description: 'Learn how filtering works in GlassFlow and how to configure filter expressions'
---

# Filter

The **Filter** transformation allows you to selectively process events based on configurable expressions. Events that match the filter expression are processed further in the pipeline, while non-matching events are filtered out and not processed.

## How It Works

Filtering in GlassFlow is applied at the ingestor stage, before any other processing. The filter uses expression-based evaluation to determine whether an event should be processed.

### Internal Process

1. **Event Reception**: Events are received from Kafka topics
2. **Expression Evaluation**: Each event is evaluated against the configured filter expression
3. **Filtering Decision**:
- If the expression evaluates to `true`, the event is processed further
- If the expression evaluates to `false`, the event is filtered out
4. **Processing**: Only matching events continue through the pipeline (deduplication, join, sink)

### Expression Language

GlassFlow uses the [expr](https://github.com/expr-lang/expr) expression language for filter expressions. This provides a simple, safe way to evaluate conditions on your event data.

**Key Features:**
- Field-based evaluation using event field names
- Support for common comparison operators (`==`, `!=`, `>`, `<`, `>=`, `<=`)
- Logical operators (`and`, `or`, `not`)
- Type-safe evaluation based on field types

### Filtered Events

Events that don't match the filter expression are:
- Not processed further in the pipeline
- Not sent to deduplication, join, or sink components
- Not written to ClickHouse
- Not counted in pipeline metrics (except filtered message metrics)

This early filtering reduces processing overhead and storage costs by eliminating unwanted events early in the pipeline.

## Configuration

Filter is configured at the pipeline level. Here's the configuration structure:

```json
{
"filter": {
"enabled": true,
"expression": "age > 18 and status == 'active'"
}
}
```

### Configuration Fields

| Field | Type | Required | Description |
|-------|------|----------|-------------|
| `enabled` | boolean | Yes | Whether filtering is enabled |
| `expression` | string | Yes | Filter expression that evaluates to a boolean. See [Expression Syntax](#expression-syntax) |

### Expression Syntax

Filter expressions use field names from your event schema and support the following operations:

#### Comparison Operators

- `==` - Equality
- `!=` - Inequality
- `>` - Greater than
- `<` - Less than
- `>=` - Greater than or equal
- `<=` - Less than or equal

#### Logical Operators

- `and` - Logical AND
- `or` - Logical OR
- `not` - Logical NOT

#### Examples

**String comparison:**
```json
{
"filter": {
"enabled": true,
"expression": "status == 'active'"
}
}
```

**Numeric comparison:**
```json
{
"filter": {
"enabled": true,
"expression": "age > 18"
}
}
```

**Boolean field:**
```json
{
"filter": {
"enabled": true,
"expression": "is_premium == true"
}
}
```

**Multiple conditions with AND:**
```json
{
"filter": {
"enabled": true,
"expression": "age > 18 and status == 'active'"
}
}
```

**Multiple conditions with OR:**
```json
{
"filter": {
"enabled": true,
"expression": "status == 'active' or status == 'pending'"
}
}
```

**Complex expression with parentheses:**
```json
{
"filter": {
"enabled": true,
"expression": "(age > 18 and age < 65) or is_student == true"
}
}
```

**Nested field access:**
```json
{
"filter": {
"enabled": true,
"expression": "user.age > 18"
}
}
```

## Best Practices

### Expression Design

- **Keep expressions simple**: Complex expressions can be harder to maintain and debug
- **Use parentheses**: Explicitly group conditions with parentheses for clarity
- **Test expressions**: Validate filter expressions before deploying to production

### Field Names

- **Use exact field names**: Field names in expressions must match exactly with your event schema
- **Case sensitivity**: Field names are case-sensitive
- **Nested fields**: Use dot notation for nested fields (e.g., `user.age`)

### Type Safety

- **Match field types**: Ensure comparison values match the field types in your schema
- **String literals**: Use single quotes for string literals in expressions
- **Numeric values**: Use numeric literals without quotes for numeric comparisons
- **Boolean values**: Use `true` or `false` (lowercase) for boolean comparisons

## Example Configuration

Here's a complete example of a pipeline with filtering enabled:

```json
{
"version": "v2",
"pipeline_id": "filtered-pipeline",
"name": "Filtered Events Pipeline",
"source": {
"type": "kafka",
"connection_params": {
"brokers": ["kafka:9092"]
},
"topics": [
{
"name": "user-events"
}
]
},
"filter": {
"enabled": true,
"expression": "age > 18 and status == 'active'"
},
"sink": {
"type": "clickhouse",
"connection_params": {
"host": "clickhouse:9000",
"database": "analytics",
"table": "active_users"
}
}
}
```

## Related Documentation

- [Data Flow - Ingestor Stage](/architecture/data-flow#stage-1-kafka-to-nats-jetstream-ingestor)
- [Pipeline JSON Reference - Filter Configuration](/configuration/pipeline-json-reference#filter-configuration)
- [Transformations Overview](/configuration/transformations)
- [expr Language Documentation](https://github.com/expr-lang/expr)

Loading