Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document include subject in NATS source #211

Merged
merged 3 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions ingestion/ingest-additional-fields-with-include-clause.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ sidebarTitle: "Ingest additional source fields"
To add additional columns, use the `INCLUDE` clause.

```sql
INCLUDE { header | key | offset | partition | timestamp | payload } [AS <column_name>]
INCLUDE { header | key | offset | partition | timestamp | payload | subject } [AS <column_name>]
```

If `<column_name>` is not specified, a default one will be generated in the format `_rw_{connector}_{col}`, where `connector` is the name of the source connector used (Kafka, Pulsar, Kinesis, etc.), and `col` is the type of column being generated (key, offset, timestamp, etc.). For instance, if an offset column is added to a Kafka source, the default column name would be `_rw_kafka_offset`.
Expand Down Expand Up @@ -73,6 +73,40 @@ When ingesting data from Kinesis, here are some things to note when including th

For more components, see [Struct aws\_sdk\_kinesis::types::Record](https://docs.rs/aws-sdk-kinesis/latest/aws%5Fsdk%5Fkinesis/types/struct.Record.html).

### MongoDB CDC

When ingesting data from MongoDB CDC, the following additional fields can be included.

| Allowed components | Default type | Note |
| :------------------| :------------| :---------------------------------|
| timestamp | `timestamp with time zone` | The upstream commit timestamp. |
| partition | `varchar` | The partition the record is from. |
| offset | `varchar` | The offset in the partition. |
| database_name | `varchar` | Name of the database. |
| offset | `varchar` | Name of the MongoDB collection. |
fuyufjh marked this conversation as resolved.
Show resolved Hide resolved


### MQTT

When ingesting data from MQTT, the following additional fields can be included.

| Allowed components | Default type | Note |
| :------------------| :------------| :---------------------------------|
| partition | `varchar` | The partition the record is from. |
| offset | `varchar` | The offset in the partition. |
fuyufjh marked this conversation as resolved.
Show resolved Hide resolved


### NATS

When ingesting data from NATS, the following additional fields can be included.

| Allowed components | Default type | Note |
| :------------------| :------------| :---------------------------------|
| partition | `varchar` | The partition the record is from. |
| offset | `varchar` | The offset in the partition. |
| payload | `json` | The actual content or data of the message. Only supports `JSON` format. |
| subject | `varchar` | The subject the message is from. |

### Pulsar

When ingesting data from Pulsar, here are some things to note when including the following fields.
Expand All @@ -96,13 +130,7 @@ When ingesting data from AWS S3, GCS or Azure Blob, the following additional fie
| offset | `varchar` | The offset in the file. |
| payload | `json` | The actual content or data of the message. Only supports `JSON` format. |

### MQTT

When ingesting data from MQTT, the following additional fields can be included.

| Allowed components | Default type | Note |
| :------------------| :------------| :---------------------------------|
| partition | varchar | The topic the record is from. |

## Examples

Expand Down
1 change: 1 addition & 0 deletions integrations/sources/nats-jetstream.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ When creating a source, you can choose to persist the data from the source in Ri
```sql
CREATE { TABLE | SOURCE} [ IF NOT EXISTS ] source_name
[ schema_definition ]
[INCLUDE { partition | offset | payload | subject } [AS <column_name>]]
WITH (
connector='nats',
server_url='<your nats server>:<port>', [ <another_server_url_if_available>, ...]
Expand Down
Loading