Skip to content

Commit

Permalink
more doc cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-quix committed Feb 4, 2025
1 parent 81b6ea0 commit d237aa4
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions docs/connectors/sinks/mongodb-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ If no `document_matcher` or `_id` specification is specified (and `upsert=True`)
create a new document where `_id` will be assigned an `ObjectID` (default MongoDB behavior).


#### Example
```python
from quixstreams.sinks.community.mongodb import MongoDBSink
from quixstreams.sinks.base.item import SinkItem

def match_on_last_name(batch_item: SinkItem):
return {"_id": SinkItem.value["name"]["last"]}

sink = MongoDBSink(
..., # other required stuff
document_matcher=match_on_last_name,
)
```


### Alternate behavior: pattern-based updates

Expand All @@ -85,6 +99,8 @@ If no match is made, it will instead create a new document with a random `_id`
(assuming `upsert=True`) with the provided updates.


You can see an example with [UpdateMany pattern matching](#an-updatemany-example) below.


### Include Message Metadata

Expand Down Expand Up @@ -115,6 +131,20 @@ current document as an argument, and returns the desired finalized outgoing docu

> **Note**: any of the `add_*_metadata` flags will have already added their data.

#### Example
```python
from quixstreams.sinks.community.mongodb import MongoDBSink

def edit_doc(my_doc: dict):
return {k: v for k,v in my_doc.items() if k not in ["age", "zip_code"]}

sink = MongoDBSink(
..., # other required stuff
value_selector=edit_doc,
)
```

## How To Use

Create an instance of `MongoDBSink` and pass it to the `StreamingDataFrame.sink()` method:
Expand Down

0 comments on commit d237aa4

Please sign in to comment.