From 51278b9e124bf795a74ed69f02ab2b2fc3746dac Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Fri, 5 Dec 2025 17:52:11 +0100 Subject: [PATCH 1/2] add first version of transformation docs --- docs/app/configuration/_meta.tsx | 1 + .../transformations/deduplication/page.mdx | 196 +++++++++++ .../transformations/filter/page.mdx | 328 ++++++++++++++++++ .../transformations/join/page.mdx | 294 ++++++++++++++++ .../configuration/transformations/page.mdx | 78 +++++ 5 files changed, 897 insertions(+) create mode 100644 docs/app/configuration/transformations/deduplication/page.mdx create mode 100644 docs/app/configuration/transformations/filter/page.mdx create mode 100644 docs/app/configuration/transformations/join/page.mdx create mode 100644 docs/app/configuration/transformations/page.mdx diff --git a/docs/app/configuration/_meta.tsx b/docs/app/configuration/_meta.tsx index c2286aba..e39e5148 100644 --- a/docs/app/configuration/_meta.tsx +++ b/docs/app/configuration/_meta.tsx @@ -1,4 +1,5 @@ export default { + 'transformations': '', 'pipeline-json-reference': '', 'data-format': '', 'supported-kafka-connections': '', diff --git a/docs/app/configuration/transformations/deduplication/page.mdx b/docs/app/configuration/transformations/deduplication/page.mdx new file mode 100644 index 00000000..752e6b57 --- /dev/null +++ b/docs/app/configuration/transformations/deduplication/page.mdx @@ -0,0 +1,196 @@ +--- +title: 'Deduplication' +description: 'Learn how deduplication works in GlassFlow and how to configure it' +--- + +# Deduplication + +The **Deduplication** transformation removes duplicate events from your data stream, ensuring that each unique event is processed only once. This is essential for maintaining data quality and preventing duplicate records in your ClickHouse tables. + +## How It Works + +Deduplication in GlassFlow uses a key-value store (BadgerDB) to track unique event identifiers. The process works as follows: + +### Internal Process + +1. **Message Consumption**: Messages are read from the ingestor output stream in batches +2. **Duplicate Detection**: Each message's unique identifier (specified by `id_field`) is checked against the deduplication store +3. **Filtering**: Messages with identifiers that already exist in the store are filtered out as duplicates +4. **Storage**: Unique messages have their identifiers stored in the deduplication store +5. **Forwarding**: Only unique messages are forwarded to the downstream component (sink or join) + +### Time Window + +The deduplication store maintains entries for a configurable time window. After the time window expires, entries are automatically evicted from the store. This means: + +- Events with the same ID that arrive within the time window are considered duplicates +- Events with the same ID that arrive after the time window expires are treated as new events +- The time window prevents the deduplication store from growing indefinitely + +### Atomic Transactions + +Deduplication uses atomic transactions to ensure data consistency: + +- When processing a batch, all deduplication checks and store updates happen within a single transaction +- If processing fails, the transaction is not committed, ensuring no duplicate keys are saved +- This prevents duplicate events from being produced even during processing failures +- Messages are only acknowledged after successful deduplication and forwarding + +For more details on how deduplication handles failures, see the [Data Flow](/architecture/data-flow#stage-3-deduplication-optional) documentation. + +## Configuration + +Deduplication is configured per Kafka topic in the pipeline configuration. Here's the configuration structure: + +```json +{ + "source": { + "topics": [ + { + "name": "my-topic", + "deduplication": { + "enabled": true, + "id_field": "event_id", + "id_field_type": "string", + "time_window": "24h" + } + } + ] + } +} +``` + +### Configuration Fields + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `enabled` | boolean | Yes | Whether deduplication is enabled for this topic | +| `id_field` | string | Yes | The field name in your events that contains the unique identifier | +| `id_field_type` | string | Yes | The data type of the ID field (e.g., `"string"`, `"int"`, `"uuid"`) | +| `time_window` | string | Yes | Time window for deduplication (e.g., `"1h"`, `"24h"`). See [Time Windows](#time-windows) | + +### Time Windows + +Time windows specify how long duplicate detection should be active. Supported formats: + +- `"30s"` - 30 seconds +- `"1m"` - 1 minute +- `"1h"` - 1 hour +- `"24h"` - 24 hours +- `"7d"` - 7 days + +The time window determines: +- How long an event ID is remembered in the deduplication store +- The maximum time between duplicate events that will be detected +- When entries are automatically evicted from the store + +## Use Cases + +### Preventing Duplicate Events + +If your Kafka producer might send duplicate events (e.g., due to retries or network issues), deduplication ensures only one copy reaches ClickHouse: + +```json +{ + "deduplication": { + "enabled": true, + "id_field": "event_id", + "id_field_type": "string", + "time_window": "1h" + } +} +``` + +### Idempotent Processing + +When processing events that should only be handled once (e.g., payment transactions, user registrations), deduplication provides idempotency: + +```json +{ + "deduplication": { + "enabled": true, + "id_field": "transaction_id", + "id_field_type": "uuid", + "time_window": "24h" + } +} +``` + +### Data Quality Assurance + +For critical data pipelines where duplicate records would cause issues, deduplication acts as a safety net: + +```json +{ + "deduplication": { + "enabled": true, + "id_field": "record_id", + "id_field_type": "int64", + "time_window": "7d" + } +} +``` + +## Best Practices + +### Choosing an ID Field + +- **Use a truly unique identifier**: The `id_field` should be unique across all events (e.g., UUID, transaction ID, event ID) +- **Avoid timestamps**: Timestamps are not unique and should not be used as the deduplication key +- **Consider composite keys**: If no single field is unique, consider creating a composite key in your data + +### Setting Time Windows + +- **Match your use case**: Set the time window based on how long duplicates might arrive (e.g., retry windows, network delays) +- **Balance memory and coverage**: Longer windows use more memory but catch duplicates over longer periods +- **Consider event frequency**: For high-frequency events, shorter windows may be sufficient + +### Performance Considerations + +- **Batch processing**: Deduplication processes messages in batches for efficiency +- **Memory usage**: The deduplication store size depends on the number of unique IDs within the time window +- **Storage location**: The BadgerDB store is persisted to disk, ensuring deduplication state survives restarts + +## Example Configuration + +Here's a complete example of a pipeline with deduplication enabled: + +```json +{ + "version": "v2", + "pipeline_id": "deduplicated-pipeline", + "name": "Deduplicated Events Pipeline", + "source": { + "type": "kafka", + "connection_params": { + "brokers": ["kafka:9092"] + }, + "topics": [ + { + "name": "user-events", + "deduplication": { + "enabled": true, + "id_field": "event_id", + "id_field_type": "string", + "time_window": "24h" + } + } + ] + }, + "sink": { + "type": "clickhouse", + "connection_params": { + "host": "clickhouse:9000", + "database": "analytics", + "table": "user_events" + } + } +} +``` + +## Related Documentation + +- [Data Flow - Deduplication Stage](/architecture/data-flow#stage-3-deduplication-optional) +- [Pipeline JSON Reference - Deduplication Configuration](/configuration/pipeline-json-reference#deduplication-configuration) +- [Transformations Overview](/usage-guide/transformations) + diff --git a/docs/app/configuration/transformations/filter/page.mdx b/docs/app/configuration/transformations/filter/page.mdx new file mode 100644 index 00000000..babf1153 --- /dev/null +++ b/docs/app/configuration/transformations/filter/page.mdx @@ -0,0 +1,328 @@ +--- +title: 'Filter' +description: 'Learn how filtering works in GlassFlow and how to configure filter expressions' +--- + +# Filter + +The **Filter** transformation allows you to selectively process events based on configurable expressions. Events that match the filter expression are processed further in the pipeline, while non-matching events are filtered out and not processed. + +## How It Works + +Filtering in GlassFlow is applied at the ingestor stage, before any other processing. The filter uses expression-based evaluation to determine whether an event should be processed. + +### Internal Process + +1. **Event Reception**: Events are received from Kafka topics +2. **Expression Evaluation**: Each event is evaluated against the configured filter expression +3. **Filtering Decision**: + - If the expression evaluates to `true`, the event is processed further + - If the expression evaluates to `false`, the event is filtered out +4. **Processing**: Only matching events continue through the pipeline (deduplication, join, sink) + +### Expression Language + +GlassFlow uses the [expr](https://github.com/expr-lang/expr) expression language for filter expressions. This provides a simple, safe way to evaluate conditions on your event data. + +**Key Features:** +- Field-based evaluation using event field names +- Support for common comparison operators (`==`, `!=`, `>`, `<`, `>=`, `<=`) +- Logical operators (`and`, `or`, `not`) +- Type-safe evaluation based on field types + +### Filtered Events + +Events that don't match the filter expression are: +- Not processed further in the pipeline +- Not sent to deduplication, join, or sink components +- Not written to ClickHouse +- Not counted in pipeline metrics (except filtered message metrics) + +This early filtering reduces processing overhead and storage costs by eliminating unwanted events early in the pipeline. + +## Configuration + +Filter is configured at the pipeline level. Here's the configuration structure: + +```json +{ + "filter": { + "enabled": true, + "expression": "age > 18 and status == 'active'" + } +} +``` + +### Configuration Fields + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `enabled` | boolean | Yes | Whether filtering is enabled | +| `expression` | string | Yes | Filter expression that evaluates to a boolean. See [Expression Syntax](#expression-syntax) | + +### Expression Syntax + +Filter expressions use field names from your event schema and support the following operations: + +#### Comparison Operators + +- `==` - Equality +- `!=` - Inequality +- `>` - Greater than +- `<` - Less than +- `>=` - Greater than or equal +- `<=` - Less than or equal + +#### Logical Operators + +- `and` - Logical AND +- `or` - Logical OR +- `not` - Logical NOT + +#### Examples + +**String comparison:** +```json +{ + "filter": { + "enabled": true, + "expression": "status == 'active'" + } +} +``` + +**Numeric comparison:** +```json +{ + "filter": { + "enabled": true, + "expression": "age > 18" + } +} +``` + +**Boolean field:** +```json +{ + "filter": { + "enabled": true, + "expression": "is_premium == true" + } +} +``` + +**Multiple conditions with AND:** +```json +{ + "filter": { + "enabled": true, + "expression": "age > 18 and status == 'active'" + } +} +``` + +**Multiple conditions with OR:** +```json +{ + "filter": { + "enabled": true, + "expression": "status == 'active' or status == 'pending'" + } +} +``` + +**Complex expression with parentheses:** +```json +{ + "filter": { + "enabled": true, + "expression": "(age > 18 and age < 65) or is_student == true" + } +} +``` + +**Nested field access:** +```json +{ + "filter": { + "enabled": true, + "expression": "user.age > 18" + } +} +``` + +## Use Cases + +### Filtering by Status + +Only process events with a specific status: + +```json +{ + "filter": { + "enabled": true, + "expression": "status == 'completed'" + } +} +``` + +### Age-Based Filtering + +Process events for users within a specific age range: + +```json +{ + "filter": { + "enabled": true, + "expression": "age >= 18 and age <= 65" + } +} +``` + +### Premium User Filtering + +Only process events for premium users: + +```json +{ + "filter": { + "enabled": true, + "expression": "is_premium == true" + } +} +``` + +### Multi-Condition Filtering + +Filter events based on multiple criteria: + +```json +{ + "filter": { + "enabled": true, + "expression": "status == 'active' and region == 'US' and score > 100" + } +} +``` + +### Exclusion Filtering + +Exclude specific values: + +```json +{ + "filter": { + "enabled": true, + "expression": "status != 'deleted' and status != 'archived'" + } +} +``` + +## Best Practices + +### Expression Design + +- **Keep expressions simple**: Complex expressions can be harder to maintain and debug +- **Use parentheses**: Explicitly group conditions with parentheses for clarity +- **Test expressions**: Validate filter expressions before deploying to production +- **Document intent**: Use clear field names and conditions that express business logic + +### Performance Considerations + +- **Filter early**: Filtering at the ingestor stage reduces processing overhead +- **Index frequently filtered fields**: If possible, structure your data to support efficient filtering +- **Avoid complex nested conditions**: Very complex expressions may impact performance + +### Field Names + +- **Use exact field names**: Field names in expressions must match exactly with your event schema +- **Case sensitivity**: Field names are case-sensitive +- **Nested fields**: Use dot notation for nested fields (e.g., `user.age`) + +### Type Safety + +- **Match field types**: Ensure comparison values match the field types in your schema +- **String literals**: Use single quotes for string literals in expressions +- **Numeric values**: Use numeric literals without quotes for numeric comparisons +- **Boolean values**: Use `true` or `false` (lowercase) for boolean comparisons + +## Example Configuration + +Here's a complete example of a pipeline with filtering enabled: + +```json +{ + "version": "v2", + "pipeline_id": "filtered-pipeline", + "name": "Filtered Events Pipeline", + "source": { + "type": "kafka", + "connection_params": { + "brokers": ["kafka:9092"] + }, + "topics": [ + { + "name": "user-events" + } + ] + }, + "filter": { + "enabled": true, + "expression": "age > 18 and status == 'active'" + }, + "sink": { + "type": "clickhouse", + "connection_params": { + "host": "clickhouse:9000", + "database": "analytics", + "table": "active_users" + } + } +} +``` + +## Combining with Other Transformations + +### Filter + Deduplication + +Filter events first, then deduplicate the filtered results: + +1. Configure filter to select relevant events +2. Enable deduplication on the filtered stream +3. This reduces the number of events that need deduplication + +### Filter + Join + +Filter events from multiple topics before joining: + +1. Configure filters on source topics (if supported per-topic) +2. Use pipeline-level filter for post-join filtering +3. Only filtered events participate in the join + +### Filter + Deduplication + Join + +Combine all three transformations: + +1. Filter events to select relevant data +2. Deduplicate filtered events +3. Join deduplicated streams +4. This provides a complete data processing pipeline + +## Expression Validation + +Filter expressions are validated when the pipeline is created or updated. The validation: + +- Checks expression syntax +- Verifies that all field names exist in the event schema +- Ensures expression evaluates to a boolean value +- Validates type compatibility for comparisons + +If validation fails, the pipeline creation or update will fail with an error message indicating the issue. + +## Related Documentation + +- [Data Flow - Ingestor Stage](/architecture/data-flow#stage-1-kafka-to-nats-jetstream-ingestor) +- [Pipeline JSON Reference - Filter Configuration](/configuration/pipeline-json-reference#filter-configuration) +- [Transformations Overview](/usage-guide/transformations) +- [expr Language Documentation](https://github.com/expr-lang/expr) + diff --git a/docs/app/configuration/transformations/join/page.mdx b/docs/app/configuration/transformations/join/page.mdx new file mode 100644 index 00000000..f53c92d4 --- /dev/null +++ b/docs/app/configuration/transformations/join/page.mdx @@ -0,0 +1,294 @@ +--- +title: 'Join' +description: 'Learn how temporal joins work in GlassFlow and how to configure them' +--- + +# Join + +The **Join** transformation combines data from multiple Kafka topics based on join keys and time windows. This enables you to enrich your data by merging related events from different sources using a temporal join algorithm. + +## How It Works + +Join in GlassFlow uses a temporal join algorithm that matches events from different streams based on join keys within a configurable time window. The process maintains separate key-value stores for each stream to enable efficient matching. + +### Internal Process + +The join service maintains separate key-value (KV) stores for the left and right streams: + +#### Left Stream Processing + +1. When a message arrives from the left stream, the system looks up its join key in the right stream's KV store +2. If a matching key is found in the right KV store, the messages are joined and sent to the output stream +3. If no match is found, the message is stored in the left KV store using its join key +4. After a joined message is successfully sent to the output stream, the related left-stream message is removed from the left KV store + +#### Right Stream Processing + +1. When a message arrives from the right stream, it is automatically stored in the right KV store +2. The system then checks if a matching key exists in the left KV store +3. If a match is found, the messages are joined and sent to the output stream +4. The matched left-stream message is removed from the left KV store after successful join + +### Time Window and TTL + +- Both left and right KV stores have a configured time-to-live (TTL) for each key, based on the join time window +- Entries are automatically evicted from the KV stores when their TTL expires +- This ensures that only messages within the configured time window are considered for joining +- Messages can be evicted only after they have been successfully joined and sent to the output stream, or when their TTL expires + +### Join Orientations + +GlassFlow supports two join orientations: + +- **Left Join**: All messages from the left stream are included, matched with right stream messages when available +- **Right Join**: All messages from the right stream are included, matched with left stream messages when available + +For more details on the join algorithm, see the [Data Flow](/architecture/data-flow#stage-4-join-optional) documentation. + +## Configuration + +Join is configured at the pipeline level. Here's the configuration structure: + +```json +{ + "join": { + "enabled": true, + "type": "temporal", + "sources": [ + { + "source_id": "orders-topic", + "join_key": "order_id", + "time_window": "1h", + "orientation": "left" + }, + { + "source_id": "payments-topic", + "join_key": "order_id", + "time_window": "1h", + "orientation": "right" + } + ] + } +} +``` + +### Configuration Fields + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `enabled` | boolean | Yes | Whether joining is enabled | +| `type` | string | Yes | Join type. Currently only `"temporal"` is supported | +| `sources` | array | Yes | List of sources to join. Must contain exactly 2 sources. See [Join Source Configuration](#join-source-configuration) | + +### Join Source Configuration + +Each source in the `sources` array has the following configuration: + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `source_id` | string | Yes | Name of the Kafka topic to join. Must match a topic name in the `source.topics` array | +| `join_key` | string | Yes | Field name in the events used for joining records. Must exist in both source topics | +| `time_window` | string | Yes | Time window for joining records (e.g., `"1h"` for one hour). See [Time Windows](#time-windows) | +| `orientation` | string | Yes | Join orientation. Must be either `"left"` or `"right"` | + +### Time Windows + +Time windows specify the maximum time difference between events that can be joined. Supported formats: + +- `"30s"` - 30 seconds +- `"1m"` - 1 minute +- `"1h"` - 1 hour +- `"24h"` - 24 hours +- `"7d"` - 7 days + +The time window determines: +- How long events are kept in the KV stores waiting for a match +- The maximum time difference between events that will be joined +- When unmatched events are evicted from the stores + +## Use Cases + +### Enriching Events with Related Data + +Join orders with payment information to create enriched order records: + +```json +{ + "join": { + "enabled": true, + "type": "temporal", + "sources": [ + { + "source_id": "orders", + "join_key": "order_id", + "time_window": "5m", + "orientation": "left" + }, + { + "source_id": "payments", + "join_key": "order_id", + "time_window": "5m", + "orientation": "right" + } + ] + } +} +``` + +### Combining User Activity Streams + +Join user profile updates with user activity events: + +```json +{ + "join": { + "enabled": true, + "type": "temporal", + "sources": [ + { + "source_id": "user-profiles", + "join_key": "user_id", + "time_window": "1h", + "orientation": "left" + }, + { + "source_id": "user-activities", + "join_key": "user_id", + "time_window": "1h", + "orientation": "right" + } + ] + } +} +``` + +### Merging Product Information + +Join product catalog updates with product sales events: + +```json +{ + "join": { + "enabled": true, + "type": "temporal", + "sources": [ + { + "source_id": "product-catalog", + "join_key": "product_id", + "time_window": "24h", + "orientation": "left" + }, + { + "source_id": "product-sales", + "join_key": "product_id", + "time_window": "24h", + "orientation": "right" + } + ] + } +} +``` + +## Best Practices + +### Choosing Join Keys + +- **Use stable identifiers**: Join keys should be stable and consistent across both streams (e.g., order IDs, user IDs, product IDs) +- **Avoid high-cardinality fields**: Very high-cardinality join keys can increase memory usage +- **Ensure key presence**: The join key field must exist in both source topics + +### Setting Time Windows + +- **Match event arrival patterns**: Set the time window based on the expected time difference between related events +- **Consider processing delays**: Account for potential delays in event processing or network latency +- **Balance memory and coverage**: Longer windows use more memory but catch matches over longer periods +- **Avoid excessive windows**: Very long windows may hold unmatched events unnecessarily + +### Join Orientation + +- **Left join**: Use when you want all events from the first source, with optional enrichment from the second +- **Right join**: Use when you want all events from the second source, with optional enrichment from the first +- **Choose based on data volume**: Consider which stream has higher volume and should be the primary stream + +### Performance Considerations + +- **Memory usage**: The KV stores size depends on the number of unique join keys within the time window +- **Event ordering**: Join works best when events arrive in roughly chronological order +- **Unmatched events**: Events that don't find a match within the time window are evicted and won't be joined + +## Example Configuration + +Here's a complete example of a pipeline with join enabled: + +```json +{ + "version": "v2", + "pipeline_id": "joined-pipeline", + "name": "Orders and Payments Join Pipeline", + "source": { + "type": "kafka", + "connection_params": { + "brokers": ["kafka:9092"] + }, + "topics": [ + { + "name": "orders-topic" + }, + { + "name": "payments-topic" + } + ] + }, + "join": { + "enabled": true, + "type": "temporal", + "sources": [ + { + "source_id": "orders-topic", + "join_key": "order_id", + "time_window": "1h", + "orientation": "left" + }, + { + "source_id": "payments-topic", + "join_key": "order_id", + "time_window": "1h", + "orientation": "right" + } + ] + }, + "sink": { + "type": "clickhouse", + "connection_params": { + "host": "clickhouse:9000", + "database": "analytics", + "table": "joined_orders" + } + } +} +``` + +## Combining with Other Transformations + +### Join + Deduplication + +You can combine join with deduplication to ensure unique joined results: + +1. Enable deduplication on the source topics +2. Enable join to combine the deduplicated streams +3. This ensures that duplicate events don't create duplicate joins + +### Filter + Join + +You can filter events before joining: + +1. Configure filters on the source topics +2. Only filtered events participate in the join +3. This reduces the number of events in the join stores + +## Related Documentation + +- [Data Flow - Join Stage](/architecture/data-flow#stage-4-join-optional) +- [Pipeline JSON Reference - Join Configuration](/configuration/pipeline-json-reference#join-configuration) +- [Transformations Overview](/usage-guide/transformations) + diff --git a/docs/app/configuration/transformations/page.mdx b/docs/app/configuration/transformations/page.mdx new file mode 100644 index 00000000..492c6869 --- /dev/null +++ b/docs/app/configuration/transformations/page.mdx @@ -0,0 +1,78 @@ +--- +title: 'Transformations' +asIndexPage: true +description: 'Learn about the transformations available in GlassFlow pipelines' +--- + +# Transformations + +GlassFlow supports several transformations that can be applied to your data as it flows through the pipeline. Transformations allow you to process, filter, and combine data before it reaches the final destination in ClickHouse. + +## Available Transformations + +### [Deduplication](/usage-guide/transformations/deduplication) + +The **Deduplication** transformation removes duplicate events from your data stream based on a unique identifier field. This is essential for ensuring data quality and preventing duplicate records in your ClickHouse tables. + +**Key Features:** +- Removes duplicate events based on a configurable ID field +- Uses a time window to determine duplicate detection scope +- Ensures exactly-once semantics through atomic transactions +- Prevents duplicate output even during processing failures + +[Learn more about Deduplication →](/usage-guide/transformations/deduplication) + +### [Join](/usage-guide/transformations/join) + +The **Join** transformation combines data from multiple Kafka topics based on join keys and time windows. This enables you to enrich your data by merging related events from different sources. + +**Key Features:** +- Temporal join algorithm for time-based matching +- Supports left and right join orientations +- Configurable time windows for matching events +- Automatic cleanup of matched entries + +[Learn more about Join →](/usage-guide/transformations/join) + +### [Filter](/usage-guide/transformations/filter) + +The **Filter** transformation allows you to selectively process events based on configurable expressions. Events that match the filter expression are processed, while non-matching events are filtered out. + +**Key Features:** +- Expression-based filtering using field values +- Supports complex logical operations (AND, OR, NOT) +- Evaluates filters at the ingestor stage +- Filtered events are not processed further in the pipeline + +[Learn more about Filter →](/usage-guide/transformations/filter) + +## Transformation Order + +Transformations are applied in the following order within a pipeline: + +1. **Filter** - Applied at the ingestor stage, before any other processing +2. **Deduplication** - Applied after ingestion, before joining +3. **Join** - Applied after deduplication, before sinking to ClickHouse + +This order ensures that: +- Filtering happens early, reducing processing overhead +- Deduplication works on clean, filtered data +- Joins operate on deduplicated streams + +## Combining Transformations + +You can combine multiple transformations in a single pipeline: + +- **Filter + Deduplication**: Filter events first, then deduplicate the filtered results +- **Filter + Join**: Filter events from multiple topics, then join them +- **Deduplication + Join**: Deduplicate events, then join deduplicated streams +- **All Three**: Apply filter, then deduplication, then join + +## Configuration + +Each transformation is configured in the pipeline JSON configuration. For detailed configuration options, see the [Pipeline JSON Reference](/configuration/pipeline-json-reference) documentation. + +## Data Flow + +Transformations are applied as data flows through the pipeline stages. For a detailed explanation of how data flows through transformations, see the [Data Flow](/architecture/data-flow) documentation. + From cabcf9eaff50bf5bede6d0ee530fcd7435e0e914 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Fri, 5 Dec 2025 18:12:13 +0100 Subject: [PATCH 2/2] refine transformations docs --- .../transformations/deduplication/page.mdx | 51 +------- .../transformations/filter/page.mdx | 114 +----------------- .../transformations/join/page.mdx | 103 +--------------- .../configuration/transformations/page.mdx | 56 +-------- 4 files changed, 7 insertions(+), 317 deletions(-) diff --git a/docs/app/configuration/transformations/deduplication/page.mdx b/docs/app/configuration/transformations/deduplication/page.mdx index 752e6b57..84e0d7c0 100644 --- a/docs/app/configuration/transformations/deduplication/page.mdx +++ b/docs/app/configuration/transformations/deduplication/page.mdx @@ -9,7 +9,7 @@ The **Deduplication** transformation removes duplicate events from your data str ## How It Works -Deduplication in GlassFlow uses a key-value store (BadgerDB) to track unique event identifiers. The process works as follows: +Deduplication in GlassFlow uses a key-value store ([BadgerDB](https://docs.hypermode.com/badger/overview)) to track unique event identifiers. The process works as follows: ### Internal Process @@ -84,53 +84,6 @@ The time window determines: - The maximum time between duplicate events that will be detected - When entries are automatically evicted from the store -## Use Cases - -### Preventing Duplicate Events - -If your Kafka producer might send duplicate events (e.g., due to retries or network issues), deduplication ensures only one copy reaches ClickHouse: - -```json -{ - "deduplication": { - "enabled": true, - "id_field": "event_id", - "id_field_type": "string", - "time_window": "1h" - } -} -``` - -### Idempotent Processing - -When processing events that should only be handled once (e.g., payment transactions, user registrations), deduplication provides idempotency: - -```json -{ - "deduplication": { - "enabled": true, - "id_field": "transaction_id", - "id_field_type": "uuid", - "time_window": "24h" - } -} -``` - -### Data Quality Assurance - -For critical data pipelines where duplicate records would cause issues, deduplication acts as a safety net: - -```json -{ - "deduplication": { - "enabled": true, - "id_field": "record_id", - "id_field_type": "int64", - "time_window": "7d" - } -} -``` - ## Best Practices ### Choosing an ID Field @@ -192,5 +145,5 @@ Here's a complete example of a pipeline with deduplication enabled: - [Data Flow - Deduplication Stage](/architecture/data-flow#stage-3-deduplication-optional) - [Pipeline JSON Reference - Deduplication Configuration](/configuration/pipeline-json-reference#deduplication-configuration) -- [Transformations Overview](/usage-guide/transformations) +- [Transformations Overview](/configuration/transformations) diff --git a/docs/app/configuration/transformations/filter/page.mdx b/docs/app/configuration/transformations/filter/page.mdx index babf1153..230ea518 100644 --- a/docs/app/configuration/transformations/filter/page.mdx +++ b/docs/app/configuration/transformations/filter/page.mdx @@ -151,73 +151,6 @@ Filter expressions use field names from your event schema and support the follow } ``` -## Use Cases - -### Filtering by Status - -Only process events with a specific status: - -```json -{ - "filter": { - "enabled": true, - "expression": "status == 'completed'" - } -} -``` - -### Age-Based Filtering - -Process events for users within a specific age range: - -```json -{ - "filter": { - "enabled": true, - "expression": "age >= 18 and age <= 65" - } -} -``` - -### Premium User Filtering - -Only process events for premium users: - -```json -{ - "filter": { - "enabled": true, - "expression": "is_premium == true" - } -} -``` - -### Multi-Condition Filtering - -Filter events based on multiple criteria: - -```json -{ - "filter": { - "enabled": true, - "expression": "status == 'active' and region == 'US' and score > 100" - } -} -``` - -### Exclusion Filtering - -Exclude specific values: - -```json -{ - "filter": { - "enabled": true, - "expression": "status != 'deleted' and status != 'archived'" - } -} -``` - ## Best Practices ### Expression Design @@ -225,13 +158,6 @@ Exclude specific values: - **Keep expressions simple**: Complex expressions can be harder to maintain and debug - **Use parentheses**: Explicitly group conditions with parentheses for clarity - **Test expressions**: Validate filter expressions before deploying to production -- **Document intent**: Use clear field names and conditions that express business logic - -### Performance Considerations - -- **Filter early**: Filtering at the ingestor stage reduces processing overhead -- **Index frequently filtered fields**: If possible, structure your data to support efficient filtering -- **Avoid complex nested conditions**: Very complex expressions may impact performance ### Field Names @@ -281,48 +207,10 @@ Here's a complete example of a pipeline with filtering enabled: } ``` -## Combining with Other Transformations - -### Filter + Deduplication - -Filter events first, then deduplicate the filtered results: - -1. Configure filter to select relevant events -2. Enable deduplication on the filtered stream -3. This reduces the number of events that need deduplication - -### Filter + Join - -Filter events from multiple topics before joining: - -1. Configure filters on source topics (if supported per-topic) -2. Use pipeline-level filter for post-join filtering -3. Only filtered events participate in the join - -### Filter + Deduplication + Join - -Combine all three transformations: - -1. Filter events to select relevant data -2. Deduplicate filtered events -3. Join deduplicated streams -4. This provides a complete data processing pipeline - -## Expression Validation - -Filter expressions are validated when the pipeline is created or updated. The validation: - -- Checks expression syntax -- Verifies that all field names exist in the event schema -- Ensures expression evaluates to a boolean value -- Validates type compatibility for comparisons - -If validation fails, the pipeline creation or update will fail with an error message indicating the issue. - ## Related Documentation - [Data Flow - Ingestor Stage](/architecture/data-flow#stage-1-kafka-to-nats-jetstream-ingestor) - [Pipeline JSON Reference - Filter Configuration](/configuration/pipeline-json-reference#filter-configuration) -- [Transformations Overview](/usage-guide/transformations) +- [Transformations Overview](/configuration/transformations) - [expr Language Documentation](https://github.com/expr-lang/expr) diff --git a/docs/app/configuration/transformations/join/page.mdx b/docs/app/configuration/transformations/join/page.mdx index f53c92d4..de3b82bc 100644 --- a/docs/app/configuration/transformations/join/page.mdx +++ b/docs/app/configuration/transformations/join/page.mdx @@ -106,89 +106,6 @@ The time window determines: - The maximum time difference between events that will be joined - When unmatched events are evicted from the stores -## Use Cases - -### Enriching Events with Related Data - -Join orders with payment information to create enriched order records: - -```json -{ - "join": { - "enabled": true, - "type": "temporal", - "sources": [ - { - "source_id": "orders", - "join_key": "order_id", - "time_window": "5m", - "orientation": "left" - }, - { - "source_id": "payments", - "join_key": "order_id", - "time_window": "5m", - "orientation": "right" - } - ] - } -} -``` - -### Combining User Activity Streams - -Join user profile updates with user activity events: - -```json -{ - "join": { - "enabled": true, - "type": "temporal", - "sources": [ - { - "source_id": "user-profiles", - "join_key": "user_id", - "time_window": "1h", - "orientation": "left" - }, - { - "source_id": "user-activities", - "join_key": "user_id", - "time_window": "1h", - "orientation": "right" - } - ] - } -} -``` - -### Merging Product Information - -Join product catalog updates with product sales events: - -```json -{ - "join": { - "enabled": true, - "type": "temporal", - "sources": [ - { - "source_id": "product-catalog", - "join_key": "product_id", - "time_window": "24h", - "orientation": "left" - }, - { - "source_id": "product-sales", - "join_key": "product_id", - "time_window": "24h", - "orientation": "right" - } - ] - } -} -``` - ## Best Practices ### Choosing Join Keys @@ -268,27 +185,9 @@ Here's a complete example of a pipeline with join enabled: } ``` -## Combining with Other Transformations - -### Join + Deduplication - -You can combine join with deduplication to ensure unique joined results: - -1. Enable deduplication on the source topics -2. Enable join to combine the deduplicated streams -3. This ensures that duplicate events don't create duplicate joins - -### Filter + Join - -You can filter events before joining: - -1. Configure filters on the source topics -2. Only filtered events participate in the join -3. This reduces the number of events in the join stores - ## Related Documentation - [Data Flow - Join Stage](/architecture/data-flow#stage-4-join-optional) - [Pipeline JSON Reference - Join Configuration](/configuration/pipeline-json-reference#join-configuration) -- [Transformations Overview](/usage-guide/transformations) +- [Transformations Overview](/configuration/transformations) diff --git a/docs/app/configuration/transformations/page.mdx b/docs/app/configuration/transformations/page.mdx index 492c6869..2683f06c 100644 --- a/docs/app/configuration/transformations/page.mdx +++ b/docs/app/configuration/transformations/page.mdx @@ -10,41 +10,9 @@ GlassFlow supports several transformations that can be applied to your data as i ## Available Transformations -### [Deduplication](/usage-guide/transformations/deduplication) - -The **Deduplication** transformation removes duplicate events from your data stream based on a unique identifier field. This is essential for ensuring data quality and preventing duplicate records in your ClickHouse tables. - -**Key Features:** -- Removes duplicate events based on a configurable ID field -- Uses a time window to determine duplicate detection scope -- Ensures exactly-once semantics through atomic transactions -- Prevents duplicate output even during processing failures - -[Learn more about Deduplication →](/usage-guide/transformations/deduplication) - -### [Join](/usage-guide/transformations/join) - -The **Join** transformation combines data from multiple Kafka topics based on join keys and time windows. This enables you to enrich your data by merging related events from different sources. - -**Key Features:** -- Temporal join algorithm for time-based matching -- Supports left and right join orientations -- Configurable time windows for matching events -- Automatic cleanup of matched entries - -[Learn more about Join →](/usage-guide/transformations/join) - -### [Filter](/usage-guide/transformations/filter) - -The **Filter** transformation allows you to selectively process events based on configurable expressions. Events that match the filter expression are processed, while non-matching events are filtered out. - -**Key Features:** -- Expression-based filtering using field values -- Supports complex logical operations (AND, OR, NOT) -- Evaluates filters at the ingestor stage -- Filtered events are not processed further in the pipeline - -[Learn more about Filter →](/usage-guide/transformations/filter) +- [**Deduplication**](/configuration/transformations/deduplication) - Remove duplicate events from your data stream based on a unique identifier field +- [**Join**](/configuration/transformations/join) - Combine data from multiple Kafka topics based on join keys and time windows +- [**Filter**](/configuration/transformations/filter) - Selectively process events based on configurable expressions ## Transformation Order @@ -58,21 +26,3 @@ This order ensures that: - Filtering happens early, reducing processing overhead - Deduplication works on clean, filtered data - Joins operate on deduplicated streams - -## Combining Transformations - -You can combine multiple transformations in a single pipeline: - -- **Filter + Deduplication**: Filter events first, then deduplicate the filtered results -- **Filter + Join**: Filter events from multiple topics, then join them -- **Deduplication + Join**: Deduplicate events, then join deduplicated streams -- **All Three**: Apply filter, then deduplication, then join - -## Configuration - -Each transformation is configured in the pipeline JSON configuration. For detailed configuration options, see the [Pipeline JSON Reference](/configuration/pipeline-json-reference) documentation. - -## Data Flow - -Transformations are applied as data flows through the pipeline stages. For a detailed explanation of how data flows through transformations, see the [Data Flow](/architecture/data-flow) documentation. -