-
Notifications
You must be signed in to change notification settings - Fork 493
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
22 changed files
with
844 additions
and
153 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
28 changes: 28 additions & 0 deletions
28
docs/asciidoc/modules/ROOT/pages/database-integration/kafka/cloud.adoc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
|
||
= Confluent Cloud | ||
|
||
[[confluent_cloud]] | ||
Configuring a connection to a Confluent Cloud instance should follow | ||
link:{url-confluent-java-client}[Confluent's Java Client] configuration advice. | ||
At a minimum, to configure this, you will need: | ||
|
||
* `BOOTSTRAP_SERVER_URL` | ||
* `API_KEY` | ||
* `API_SECRET` | ||
|
||
More specifically the plugin has to be configured as follow: | ||
|
||
.neo4j.conf | ||
[source,ini] | ||
---- | ||
apoc.kafka.bootstrap.servers=${BOOTSTRAP_SERVER_URL} | ||
apoc.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${API_KEY}" password="${API_SECRET}"; | ||
apoc.kafka.ssl.endpoint.identification.algorithm=https | ||
apoc.kafka.security.protocol=SASL_SSL | ||
apoc.kafka.sasl.mechanism=PLAIN | ||
apoc.kafka.request.timeout.ms=20000 | ||
apoc.kafka.retry.backoff.ms=500 | ||
---- | ||
|
||
Make sure to replace `BOOTSTRAP_SERVER_URL`, `API_SECRET`, and `API_KEY` with the values that Confluent Cloud | ||
gives you when you generate an API access key. |
33 changes: 33 additions & 0 deletions
33
...iidoc/modules/ROOT/pages/database-integration/kafka/consumer-configuration.adoc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
=== Configuration summary | ||
|
||
You can set the following Kafka configuration values in your `neo4j.conf`, here are the defaults. | ||
|
||
.neo4j.conf | ||
[source,subs="verbatim,attributes"] | ||
---- | ||
apoc.kafka.bootstrap.servers=localhost:9092 | ||
apoc.kafka.auto.offset.reset=earliest | ||
apoc.kafka.group.id=neo4j | ||
apoc.kafka.enable.auto.commit=true | ||
apoc.kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer | ||
apoc.kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer | ||
{environment}.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY> | ||
{environment}.topic.cdc.sourceId=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON> | ||
{environment}.topic.cdc.schema=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON> | ||
{environment}.topic.cud=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON> | ||
{environment}.topic.pattern.node.<TOPIC_NAME>=<NODE_EXTRACTION_PATTERN> | ||
{environment}.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN> | ||
{environment}.enabled=<true/false, default=false> | ||
---- | ||
|
||
See the https://kafka.apache.org/documentation/#brokerconfigs[Apache Kafka documentation] for details on these settings. | ||
|
||
[NOTE] | ||
|
||
if `apoc.kafka.cluster.only` is set to true, APOC Kafka will refuse to start in single instance mode, | ||
or when run in the context of the backup operation. This is an important safety guard to ensure that operations do not occur in unexpected situations for production deploys | ||
|
||
See the https://kafka.apache.org/documentation/#brokerconfigs[Apache Kafka documentation] for details on these settings. | ||
|
168 changes: 168 additions & 0 deletions
168
docs/asciidoc/modules/ROOT/pages/database-integration/kafka/index.adoc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
= Kafka | ||
|
||
[[kafka]] | ||
|
||
ifdef::env-docs[] | ||
[abstract] | ||
-- | ||
Get started fast for common scenarios, using APOC Kafka plugin or Kafka Connect plugin | ||
-- | ||
endif::env-docs[] | ||
|
||
[[apoc_neo4j_plugin_quickstart]] | ||
== APOC Kafka Plugin | ||
|
||
Any configuration option that starts with `apoc.kafka.` controls how the plugin itself behaves. For a full | ||
list of options available, see the documentation subsections on the xref:database-integration/kafka/producer.adoc[source] and xref:database-integration/kafka/consumer.adoc#apoc_kafka_sink[sink]. | ||
|
||
=== Install the Plugin | ||
|
||
This dependency is included in https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/{apoc-release}/apoc-kafka-dependencies-{apoc-release}-all.jar[apoc-kafka-dependencies-{apoc-release}-all.jar^], which can be downloaded from the https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/tag/{apoc-release}[releases page^]. | ||
Once that file is downloaded, it should be placed in the `plugins` directory and the Neo4j Server restarted. | ||
|
||
[[kafka-settings]] | ||
=== Kafka settings | ||
|
||
Any configuration option that starts with `apoc.kafka.` will be passed to the underlying Kafka driver. Neo4j | ||
Kafka plugin uses the official Confluent Kafka producer and consumer java clients. | ||
Configuration settings which are valid for those connectors will also work for APOC Kafka. | ||
|
||
For example, in the Kafka documentation linked below, the configuration setting named `batch.size` should be stated as | ||
`apoc.kafka.batch.size` in APOC Kafka. | ||
|
||
The following are common configuration settings you may wish to use. _This is not a complete | ||
list_. The full list of configuration options and reference material is available from Confluent's | ||
site for link:{url-confluent-install}/configuration/consumer-configs.html[sink configurations] and | ||
link:{url-confluent-install}/configuration/producer-configs.html[source configurations]. | ||
|
||
.Most Common Needed Configuration Settings | ||
|=== | ||
|Setting Name |Description |Default Value | ||
|
||
|apoc.kafka.max.poll.records | ||
|The maximum number of records to pull per batch from Kafka. Increasing this number will mean | ||
larger transactions in Neo4j memory and may improve throughput. | ||
|500 | ||
|
||
|apoc.kafka.buffer.memory | ||
|The total bytes of memory the producer can use to buffer records waiting. Use this to adjust | ||
how much memory the plugin may require to hold messages not yet delivered to Neo4j | ||
|33554432 | ||
|
||
|apoc.kafka.batch.size | ||
|(Producer only) The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. | ||
|16384 | ||
|
||
|apoc.kafka.max.partition.fetch.bytes | ||
|(Consumer only) The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. | ||
|1048576 | ||
|
||
|apoc.kafka.group.id | ||
|A unique string that identifies the consumer group this consumer belongs to. | ||
|N/A | ||
|=== | ||
|
||
=== Configure Kafka Connection | ||
|
||
If you are running locally or against a standalone machine, configure `apoc.conf` to point to that server: | ||
|
||
.neo4j.conf | ||
[source,ini] | ||
---- | ||
apoc.kafka.bootstrap.servers=localhost:9092 | ||
---- | ||
|
||
If you are using Confluent Cloud (managed Kafka), you can connect to Kafka as described in | ||
the xref:database-integration/kafka/cloud.adoc#confluent_cloud[Confluent Cloud] section | ||
|
||
=== Decide: Sink, Source, or Both | ||
|
||
Configuring APOC Neo4j plugin comes in three different parts, depending on your need: | ||
|
||
. *Required*: Configuring a connection to Kafka | ||
|
||
.neo4j.conf | ||
[source,ini] | ||
---- | ||
apoc.kafka.bootstrap.servers=localhost:9092 | ||
---- | ||
|
||
. _Optional_: Configuring Neo4j to produce records to Kafka (xref:database-integration/kafka/producer.adoc[Source]) | ||
. _Optional_: Configuring Neo4j to ingest from Kafka (xref:database-integration/kafka/consumer.adoc#apoc_kafka_sink[Sink]) | ||
|
||
Follow one or both subsections according to your use case and need: | ||
|
||
==== Sink | ||
|
||
Take data from Kafka and store it in Neo4j (Neo4j as a data consumer) by adding configuration such as: | ||
|
||
.neo4j.conf | ||
[source,ini] | ||
---- | ||
apoc.kafka.sink.enabled=true | ||
apoc.kafka.sink.topic.cypher.my-ingest-topic=MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties | ||
---- | ||
|
||
This will process every message that comes in on `my-ingest-topic` with the given cypher statement. When | ||
that cypher statement executes, the `event` variable that is referenced will be set to the message received, | ||
so this sample cypher will create a `(:Label)` node in the graph with the given ID, copying all of the | ||
properties in the source message. | ||
|
||
For full details on what you can do here, see the xref:database-integration/kafka/consumer.adoc#apoc_kafka_sink[Sink] section of the documentation. | ||
|
||
==== Source | ||
|
||
Produce data from Neo4j and send it to a Kafka topic (Neo4j as a data producer) by adding configuration such as: | ||
|
||
.neo4j.conf | ||
[source,ini] | ||
---- | ||
apoc.kafka.source.topic.nodes.my-nodes-topic=Person{*} | ||
apoc.kafka.source.topic.relationships.my-rels-topic=BELONGS-TO{*} | ||
apoc.kafka.source.enabled=true | ||
apoc.kafka.source.schema.polling.interval=10000 | ||
---- | ||
|
||
This will produce all graph nodes labeled `(:Person)` on to the topic `my-nodes-topic` and all | ||
relationships of type `-[:BELONGS-TO]->` to the topic named `my-rels-topic`. Further, schema changes will | ||
be polled every 10,000 ms, which affects how quickly the database picks up new indexes/schema changes. | ||
Please note that if not specified a value for `apoc.kafka.source.schema.polling.interval` property then Streams plugin will use | ||
300,000 ms as default. | ||
|
||
The expressions `Person{\*}` and `BELONGS-TO{*}` are _patterns_. You can find documentation on how to change | ||
these in the xref:database-integration/kafka/producer.adoc#source-patterns[Patterns] section. | ||
|
||
For full details on what you can do here, see the xref:database-integration/kafka/producer.adoc[Source] section of the documentation. | ||
|
||
==== Restart Neo4j | ||
|
||
Once the plugin is installed and configured, restarting the database will make it active. | ||
If you have configured Neo4j to consume from kafka, it will begin immediately processing messages. | ||
|
||
[NOTE] | ||
|
||
==== | ||
When installing the latest version of the APOC Kafka plugin into Neo4j 4.x, watching to logs you could find something | ||
similar to the following: | ||
[source,logs] | ||
---- | ||
2020-03-25 20:13:50.606+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.max.partition.fetch.bytes | ||
2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.log.include.messages | ||
2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.auto.offset.reset | ||
2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.bootstrap.servers | ||
2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.max.poll.records | ||
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.log.enable | ||
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.source.enabled | ||
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.topic.cypher.boa.to.kafkaTest | ||
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.tolerance | ||
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.group.id | ||
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.deadletterqueue.context.headers.enable | ||
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.deadletterqueue.context.header.prefix | ||
2020-03-25 20:13:50.610+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.deadletterqueue.topic.name | ||
2020-03-25 20:13:50.610+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.enabled.to.kafkaTest | ||
---- | ||
*These are not errors*. They comes from the new Neo4j 4 Configuration System, which warns that it doesn't recognize those | ||
properties. Despite these warnings the plugin will work properly. | ||
==== |
Oops, something went wrong.