diff --git a/docs/app/configuration/_meta.tsx b/docs/app/configuration/_meta.tsx index c2286aba..e39e5148 100644 --- a/docs/app/configuration/_meta.tsx +++ b/docs/app/configuration/_meta.tsx @@ -1,4 +1,5 @@ export default { + 'transformations': '', 'pipeline-json-reference': '', 'data-format': '', 'supported-kafka-connections': '', diff --git a/docs/app/configuration/supported-kafka-connections/page.mdx b/docs/app/configuration/supported-kafka-connections/page.mdx index 6a2d0022..5fb8d03c 100644 --- a/docs/app/configuration/supported-kafka-connections/page.mdx +++ b/docs/app/configuration/supported-kafka-connections/page.mdx @@ -52,7 +52,7 @@ Client certificate authentication (mTLS) is not supported. ### PLAINTEXT (no auth) - + PLAINTEXT (no auth) example @@ -69,7 +69,7 @@ Client certificate authentication (mTLS) is not supported. ### SASL/PLAIN over TLS - + SASL/PLAIN over TLS example @@ -89,7 +89,7 @@ Client certificate authentication (mTLS) is not supported. ### SASL/SCRAM-SHA-256 - + SASL/SCRAM-SHA-256 example @@ -121,7 +121,7 @@ GlassFlow supports Kerberos authentication (GSSAPI) for connecting to Kafka clus **Optional fields:** - `password`: Required only if using password-based authentication instead of keytab - + SASL/GSSAPI (Kerberos) example @@ -148,7 +148,7 @@ The `kerberos_keytab` field should contain the base64-encoded content of your ke ### SSL (skip TLS verification) - + SSL (skip TLS verification) example diff --git a/docs/app/configuration/transformations/deduplication/page.mdx b/docs/app/configuration/transformations/deduplication/page.mdx new file mode 100644 index 00000000..f43bbf40 --- /dev/null +++ b/docs/app/configuration/transformations/deduplication/page.mdx @@ -0,0 +1,135 @@ +--- +title: 'Deduplication' +description: 'Learn how deduplication works in GlassFlow and how to configure it' +--- +import { Callout, Tabs } from 'nextra/components' + +# Deduplication + +The **Deduplication** transformation removes duplicate events from your data stream, ensuring that each unique event is processed only once. This is essential for maintaining data quality and preventing duplicate records in your ClickHouse tables. + +## How It Works + +Deduplication in GlassFlow uses a key-value store ([BadgerDB](https://docs.hypermode.com/badger/overview)) to track unique event identifiers. The process works as follows: + +### Internal Process + +1. **Message Consumption**: Messages are read from the ingestor output stream in batches +2. **Duplicate Detection**: Each message's unique identifier (specified by `id_field`) is checked against the deduplication store +3. **Filtering**: Messages with identifiers that already exist in the store are filtered out as duplicates +4. **Storage**: Unique messages have their identifiers stored in the deduplication store +5. **Forwarding**: Only unique messages are forwarded to the downstream component (sink or join) + +### Time Window + +The deduplication store maintains entries for a configurable time window. After the time window expires, entries are automatically evicted from the store. This means: + +- Events with the same ID that arrive within the time window are considered duplicates +- Events with the same ID that arrive after the time window expires are treated as new events +- The time window prevents the deduplication store from growing indefinitely + +### Atomic Transactions + +Deduplication uses atomic transactions to ensure data consistency: + +- When processing a batch, all deduplication checks and store updates happen within a single transaction +- If processing fails, the transaction is not committed, ensuring no duplicate keys are saved +- This prevents duplicate events from being produced even during processing failures +- Messages are only acknowledged after successful deduplication and forwarding + +For more details on how deduplication handles failures, see the [Data Flow](/architecture/data-flow#stage-3-deduplication-optional) documentation. + +## Configuration + +Deduplication is configured per Kafka topic in the pipeline configuration. Here's the configuration structure: + + + + Deduplication Configuration + + + + ```json + { + "source": { + "topics": [ + { + "name": "orders", + "deduplication": { + "enabled": true, + "id_field": "order_id", + "id_field_type": "string", + "time_window": "1h" + } + } + ] + } + } + ``` + Refer to the [Pipeline JSON Reference](/configuration/pipeline-json-reference#deduplication-configuration) for more details. + + + +## Best Practices + +### Choosing an ID Field + +- **Use a truly unique identifier**: The `id_field` should be unique across all events (e.g., UUID, transaction ID, event ID) +- **Avoid timestamps**: Timestamps are not unique and should not be used as the deduplication key +- **Consider composite keys**: If no single field is unique, consider creating a composite key in your data + +### Setting Time Windows + +- **Match your use case**: Set the time window based on how long duplicates might arrive (e.g., retry windows, network delays) +- **Balance memory and coverage**: Longer windows use more memory but catch duplicates over longer periods +- **Consider event frequency**: For high-frequency events, shorter windows may be sufficient + +### Performance Considerations + +- **Batch processing**: Deduplication processes messages in batches for efficiency +- **Memory usage**: The deduplication store size depends on the number of unique IDs within the time window +- **Storage location**: The BadgerDB store is persisted to disk, ensuring deduplication state survives restarts + +## Example Configuration + +Here's a complete example of a pipeline with deduplication enabled: + +```json +{ + "version": "v2", + "pipeline_id": "deduplicated-pipeline", + "name": "Deduplicated Events Pipeline", + "source": { + "type": "kafka", + "connection_params": { + "brokers": ["kafka:9092"] + }, + "topics": [ + { + "name": "user-events", + "deduplication": { + "enabled": true, + "id_field": "event_id", + "id_field_type": "string", + "time_window": "24h" + } + } + ] + }, + "sink": { + "type": "clickhouse", + "connection_params": { + "host": "clickhouse:9000", + "database": "analytics", + "table": "user_events" + } + } +} +``` + +## Related Documentation + +- [Data Flow - Deduplication Stage](/architecture/data-flow#stage-3-deduplication-optional) +- [Pipeline JSON Reference - Deduplication Configuration](/configuration/pipeline-json-reference#deduplication-configuration) +- [Transformations Overview](/configuration/transformations) + diff --git a/docs/app/configuration/transformations/filter/page.mdx b/docs/app/configuration/transformations/filter/page.mdx new file mode 100644 index 00000000..e9ac96a1 --- /dev/null +++ b/docs/app/configuration/transformations/filter/page.mdx @@ -0,0 +1,257 @@ +--- +title: 'Filter' +description: 'Learn how filtering works in GlassFlow and how to configure filter expressions' +--- +import { Callout, Tabs } from 'nextra/components' + +# Filter + +The **Filter** transformation allows you to selectively process events based on configurable expressions. Events that match the filter expression are processed further in the pipeline, while non-matching events are filtered out and not processed. + +## How It Works + +Filtering in GlassFlow is applied at the ingestor stage, before any other processing. The filter uses expression-based evaluation to determine whether an event should be processed. + +### Internal Process + +1. **Event Reception**: Events are received from Kafka topics +2. **Expression Evaluation**: Each event is evaluated against the configured filter expression +3. **Filtering Decision**: + - If the expression evaluates to `true`, the event is processed further + - If the expression evaluates to `false`, the event is filtered out +4. **Processing**: Only matching events continue through the pipeline (deduplication, join, sink) + +### Expression Language + +GlassFlow uses the [expr](https://github.com/expr-lang/expr) expression language for filter expressions. This provides a simple, safe way to evaluate conditions on your event data. + +**Key Features:** +- Field-based evaluation using event field names +- Support for common comparison operators (`==`, `!=`, `>`, `<`, `>=`, `<=`) +- Logical operators (`and`, `or`, `not`) +- Type-safe evaluation based on field types + +## Configuration + +Filter is configured at the pipeline level. Here's the configuration structure: + + + + Filter Configuration + + + ```json + { + "filter": { + "enabled": true, + "expression": "age > 18 and status == 'active'" + } + } + ``` + Refer to the [Pipeline JSON Reference](/configuration/pipeline-json-reference#filter-configuration) for more details. + + + +### Expression Syntax + +Filter expressions use field names from your event schema and support the following operations: + +#### Comparison Operators + +- `==` - Equality +- `!=` - Inequality +- `>` - Greater than +- `<` - Less than +- `>=` - Greater than or equal +- `<=` - Less than or equal + +#### Logical Operators + +- `and` - Logical AND +- `or` - Logical OR +- `not` - Logical NOT + +#### Examples + +**String comparison:** + + + String Comparison + + + ```json + { + "filter": { + "enabled": true, + "expression": "status == 'active'" + } + } + ``` + + + +**Numeric comparison:** + + + Numeric Comparison + + + ```json + { + "filter": { + "enabled": true, + "expression": "age > 18" + } + } + ``` + + + +**Boolean field:** + + + Boolean Field + + + ```json + { + "filter": { + "enabled": true, + "expression": "is_premium == true" + } + } + ``` + + + +**Multiple conditions with AND:** + + + Multiple Conditions with AND + + + ```json + { + "filter": { + "enabled": true, + "expression": "age > 18 and status == 'active'" + } + } + ``` + + + +**Multiple conditions with OR:** + + + Multiple Conditions with OR + + + ```json + { + "filter": { + "enabled": true, + "expression": "status == 'active' or status == 'pending'" + } + } + ``` + + + +**Complex expression with parentheses:** + + + Complex Expression with Parentheses + + + ```json + { + "filter": { + "enabled": true, + "expression": "(age > 18 and age < 65) or is_student == true" + } + } + ``` + + + +**Nested field access:** + + + Nested Field Access + + + ```json + { + "filter": { + "enabled": true, + "expression": "user.age > 18" + } + } + ``` + + + +## Best Practices + +### Expression Design + +- **Keep expressions simple**: Complex expressions can be harder to maintain and debug +- **Use parentheses**: Explicitly group conditions with parentheses for clarity +- **Test expressions**: Validate filter expressions before deploying to production + +### Field Names + +- **Use exact field names**: Field names in expressions must match exactly with your event schema +- **Case sensitivity**: Field names are case-sensitive +- **Nested fields**: Use dot notation for nested fields (e.g., `user.age`) + +### Type Safety + +- **Match field types**: Ensure comparison values match the field types in your schema +- **String literals**: Use single quotes for string literals in expressions +- **Numeric values**: Use numeric literals without quotes for numeric comparisons +- **Boolean values**: Use `true` or `false` (lowercase) for boolean comparisons + +## Example Configuration + +Here's a complete example of a pipeline with filtering enabled: + +```json +{ + "version": "v2", + "pipeline_id": "filtered-pipeline", + "name": "Filtered Events Pipeline", + "source": { + "type": "kafka", + "connection_params": { + "brokers": ["kafka:9092"] + }, + "topics": [ + { + "name": "user-events" + } + ] + }, + "filter": { + "enabled": true, + "expression": "age > 18 and status == 'active'" + }, + "sink": { + "type": "clickhouse", + "connection_params": { + "host": "clickhouse:9000", + "database": "analytics", + "table": "active_users" + } + } +} +``` + +## Related Documentation + +- [Data Flow - Ingestor Stage](/architecture/data-flow#stage-1-kafka-to-nats-jetstream-ingestor) +- [Pipeline JSON Reference - Filter Configuration](/configuration/pipeline-json-reference#filter-configuration) +- [Transformations Overview](/configuration/transformations) +- [expr Language Documentation](https://github.com/expr-lang/expr) + diff --git a/docs/app/configuration/transformations/join/page.mdx b/docs/app/configuration/transformations/join/page.mdx new file mode 100644 index 00000000..621799f0 --- /dev/null +++ b/docs/app/configuration/transformations/join/page.mdx @@ -0,0 +1,168 @@ +--- +title: 'Join' +description: 'Learn how temporal joins work in GlassFlow and how to configure them' +--- +import { Callout, Tabs } from 'nextra/components' + +# Join + +The **Join** transformation combines data from multiple Kafka topics based on join keys and time windows. This enables you to enrich your data by merging related events from different sources using a temporal join algorithm. + +## How It Works + +Join in GlassFlow uses a temporal join algorithm that matches events from different streams based on join keys within a configurable time window. The process maintains separate key-value stores for each stream to enable efficient matching. + +### Internal Process + +The join service maintains separate key-value (KV) stores for the left and right streams: + +#### Left Stream Processing + +1. When a message arrives from the left stream, the system looks up its join key in the right stream's KV store +2. If a matching key is found in the right KV store, the messages are joined and sent to the output stream +3. If no match is found, the message is stored in the left KV store using its join key +4. After a joined message is successfully sent to the output stream, the related left-stream message is removed from the left KV store + +#### Right Stream Processing + +1. When a message arrives from the right stream, it is automatically stored in the right KV store +2. The system then checks if a matching key exists in the left KV store +3. If a match is found, the messages are joined and sent to the output stream +4. The matched left-stream message is removed from the left KV store after successful join + +### Time Window and TTL + +- Both left and right KV stores have a configured time-to-live (TTL) for each key, based on the join time window +- Entries are automatically evicted from the KV stores when their TTL expires +- This ensures that only messages within the configured time window are considered for joining +- Messages can be evicted only after they have been successfully joined and sent to the output stream, or when their TTL expires + +### Join Orientations + +GlassFlow supports two join orientations: + +- **Left Join**: All messages from the left stream are included, matched with right stream messages when available +- **Right Join**: All messages from the right stream are included, matched with left stream messages when available + +For more details on the join algorithm, see the [Data Flow](/architecture/data-flow#stage-4-join-optional) documentation. + +## Configuration + +Join is configured at the pipeline level. Here's the configuration structure: + + + + Join Configuration + + + ```json + { + "join": { + "enabled": true, + "type": "temporal", + "sources": [ + { + "source_id": "orders-topic", + "join_key": "user_id", + "time_window": "1h", + "orientation": "left" + }, + { + "source_id": "users-topic", + "join_key": "user_id", + "time_window": "1h", + "orientation": "right" + } + ] + } + } + ``` + Refer to the [Pipeline JSON Reference](/configuration/pipeline-json-reference#join-configuration) for more details. + + + +## Best Practices + +### Choosing Join Keys + +- **Use stable identifiers**: Join keys should be stable and consistent across both streams (e.g., order IDs, user IDs, product IDs) +- **Avoid high-cardinality fields**: Very high-cardinality join keys can increase memory usage +- **Ensure key presence**: The join key field must exist in both source topics + +### Setting Time Windows + +- **Match event arrival patterns**: Set the time window based on the expected time difference between related events +- **Consider processing delays**: Account for potential delays in event processing or network latency +- **Balance memory and coverage**: Longer windows use more memory but catch matches over longer periods +- **Avoid excessive windows**: Very long windows may hold unmatched events unnecessarily + +### Join Orientation + +- **Left join**: Use when you want all events from the first source, with optional enrichment from the second +- **Right join**: Use when you want all events from the second source, with optional enrichment from the first +- **Choose based on data volume**: Consider which stream has higher volume and should be the primary stream + +### Performance Considerations + +- **Memory usage**: The KV stores size depends on the number of unique join keys within the time window +- **Event ordering**: Join works best when events arrive in roughly chronological order +- **Unmatched events**: Events that don't find a match within the time window are evicted and won't be joined + +## Example Configuration + +Here's a complete example of a pipeline with join enabled: + +```json +{ + "version": "v2", + "pipeline_id": "joined-pipeline", + "name": "Orders and Payments Join Pipeline", + "source": { + "type": "kafka", + "connection_params": { + "brokers": ["kafka:9092"] + }, + "topics": [ + { + "name": "orders-topic" + }, + { + "name": "payments-topic" + } + ] + }, + "join": { + "enabled": true, + "type": "temporal", + "sources": [ + { + "source_id": "orders-topic", + "join_key": "order_id", + "time_window": "1h", + "orientation": "left" + }, + { + "source_id": "payments-topic", + "join_key": "order_id", + "time_window": "1h", + "orientation": "right" + } + ] + }, + "sink": { + "type": "clickhouse", + "connection_params": { + "host": "clickhouse:9000", + "database": "analytics", + "table": "joined_orders" + } + } +} +``` + +## Related Documentation + +- [Data Flow - Join Stage](/architecture/data-flow#stage-4-join-optional) +- [Pipeline JSON Reference - Join Configuration](/configuration/pipeline-json-reference#join-configuration) +- [Transformations Overview](/configuration/transformations) + diff --git a/docs/app/configuration/transformations/page.mdx b/docs/app/configuration/transformations/page.mdx new file mode 100644 index 00000000..2683f06c --- /dev/null +++ b/docs/app/configuration/transformations/page.mdx @@ -0,0 +1,28 @@ +--- +title: 'Transformations' +asIndexPage: true +description: 'Learn about the transformations available in GlassFlow pipelines' +--- + +# Transformations + +GlassFlow supports several transformations that can be applied to your data as it flows through the pipeline. Transformations allow you to process, filter, and combine data before it reaches the final destination in ClickHouse. + +## Available Transformations + +- [**Deduplication**](/configuration/transformations/deduplication) - Remove duplicate events from your data stream based on a unique identifier field +- [**Join**](/configuration/transformations/join) - Combine data from multiple Kafka topics based on join keys and time windows +- [**Filter**](/configuration/transformations/filter) - Selectively process events based on configurable expressions + +## Transformation Order + +Transformations are applied in the following order within a pipeline: + +1. **Filter** - Applied at the ingestor stage, before any other processing +2. **Deduplication** - Applied after ingestion, before joining +3. **Join** - Applied after deduplication, before sinking to ClickHouse + +This order ensures that: +- Filtering happens early, reducing processing overhead +- Deduplication works on clean, filtered data +- Joins operate on deduplicated streams diff --git a/docs/app/faq/page.mdx b/docs/app/faq/page.mdx index ac619335..c2ffcccb 100644 --- a/docs/app/faq/page.mdx +++ b/docs/app/faq/page.mdx @@ -11,7 +11,7 @@ description: 'Frequently asked questions about GlassFlow' ## Q: How does GlassFlow's deduplication work? -GlassFlow's deduplication is powered by [NATS JetStream](https://docs.nats.io/nats-concepts/jetstream) and is based on a **user-defined key** (e.g. `user_id`) and a **time window** (e.g. 1 hour) to identify duplicates. When multiple events with the same key arrive within the configured time window, only the **first event** is written to ClickHouse. Any subsequent events with the same key during that window are discarded. This mechanism ensures that only unique events are persisted, avoiding duplicates caused by retries or upstream noise. +GlassFlow uses BadgerDB for deduplication in disk and memory. For more details, see [Deduplication](/configuration/transformations/deduplication). ## Q: Why do duplicates happen in Kafka pipelines at all? diff --git a/docs/app/getting-started/live-preview/page.mdx b/docs/app/getting-started/live-preview/page.mdx index fa46c4e5..40f8bc88 100644 --- a/docs/app/getting-started/live-preview/page.mdx +++ b/docs/app/getting-started/live-preview/page.mdx @@ -3,7 +3,7 @@ Log in and see a working demo of GlassFlow running on a GPC cluster [demo.glassflow.dev](https://demo.glassflow.dev). You will see a demo version of the GlassFlow UI with a few pipelines running. -![GlassFlow Pipeline Data Flow](/assets/live_preview_login_page.png) +![GlassFlow Pipeline Data Flow](/assets/live_preview_pipeline_page.png) From the pipeline details page, you can access a Grafana dashboard that shows real-time metrics of the pipeline. diff --git a/docs/public/assets/live_preview_login_page.png b/docs/public/assets/live_preview_login_page.png deleted file mode 100644 index 08609d8a..00000000 Binary files a/docs/public/assets/live_preview_login_page.png and /dev/null differ diff --git a/docs/public/assets/live_preview_pipeline_page.png b/docs/public/assets/live_preview_pipeline_page.png new file mode 100644 index 00000000..248d640c Binary files /dev/null and b/docs/public/assets/live_preview_pipeline_page.png differ diff --git a/docs/public/assets/ui_dedup_configuration.png b/docs/public/assets/ui_dedup_configuration.png new file mode 100644 index 00000000..651a4625 Binary files /dev/null and b/docs/public/assets/ui_dedup_configuration.png differ diff --git a/docs/public/assets/ui_filter_boolean_field.png b/docs/public/assets/ui_filter_boolean_field.png new file mode 100644 index 00000000..ae4aa20a Binary files /dev/null and b/docs/public/assets/ui_filter_boolean_field.png differ diff --git a/docs/public/assets/ui_filter_complex_expression_with_parentheses.png b/docs/public/assets/ui_filter_complex_expression_with_parentheses.png new file mode 100644 index 00000000..d2888522 Binary files /dev/null and b/docs/public/assets/ui_filter_complex_expression_with_parentheses.png differ diff --git a/docs/public/assets/ui_filter_configuration.png b/docs/public/assets/ui_filter_configuration.png new file mode 100644 index 00000000..2426977f Binary files /dev/null and b/docs/public/assets/ui_filter_configuration.png differ diff --git a/docs/public/assets/ui_filter_multiple_conditions_with_and.png b/docs/public/assets/ui_filter_multiple_conditions_with_and.png new file mode 100644 index 00000000..33a3f705 Binary files /dev/null and b/docs/public/assets/ui_filter_multiple_conditions_with_and.png differ diff --git a/docs/public/assets/ui_filter_multiple_conditions_with_or.png b/docs/public/assets/ui_filter_multiple_conditions_with_or.png new file mode 100644 index 00000000..c325a154 Binary files /dev/null and b/docs/public/assets/ui_filter_multiple_conditions_with_or.png differ diff --git a/docs/public/assets/ui_filter_nested_field_access.png b/docs/public/assets/ui_filter_nested_field_access.png new file mode 100644 index 00000000..c001ff23 Binary files /dev/null and b/docs/public/assets/ui_filter_nested_field_access.png differ diff --git a/docs/public/assets/ui_filter_numeric_comparison.png b/docs/public/assets/ui_filter_numeric_comparison.png new file mode 100644 index 00000000..f1d438a1 Binary files /dev/null and b/docs/public/assets/ui_filter_numeric_comparison.png differ diff --git a/docs/public/assets/ui_filter_string_comparison.png b/docs/public/assets/ui_filter_string_comparison.png new file mode 100644 index 00000000..564ed98d Binary files /dev/null and b/docs/public/assets/ui_filter_string_comparison.png differ diff --git a/docs/public/assets/ui_join_configuration.png b/docs/public/assets/ui_join_configuration.png new file mode 100644 index 00000000..683636ed Binary files /dev/null and b/docs/public/assets/ui_join_configuration.png differ