Skip to content

Commit

Permalink
Merge pull request #2 from neo4j-field/fixes-and-improvements
Browse files Browse the repository at this point in the history
Fixes and improvements
  • Loading branch information
ali-ince authored Jun 9, 2023
2 parents e34b93b + fd3dacc commit 686351a
Show file tree
Hide file tree
Showing 12 changed files with 523 additions and 229 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,6 @@ dmypy.json

# Pyre type checker
.pyre/

# ide
.idea/
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
SHELL = /bin/sh
VERSION = 0.5.0
# TODO: read VERSION from setup.py
VERSION = $(shell python setup.py --version)

.PHONY: build build-py37 build-py38 build-py39
.PHONY: test test-py37 test-py38 test-py39
Expand Down
220 changes: 161 additions & 59 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# neo4j_arrow

PyArrow client for working with GDS Apache Arrow Flight Service

# What is this?
Expand Down Expand Up @@ -31,7 +32,7 @@ from a release tarball or zip.
Assuming you're in an active virtual environment named "venv":

```
(venv) $ pip install neo4j_arrow@https://github.com/neo4j-field/neo4j_arrow/archive/refs/tags/0.4.0.tar.gz
(venv) $ pip install neo4j_arrow@https://github.com/neo4j-field/neo4j_arrow/archive/refs/tags/0.6.0.tar.gz
```

> Note: this module is _not_ distributed via PyPI or Anaconda.
Expand All @@ -56,12 +57,13 @@ below (along with their defaults):

```python
import neo4j_arrow as na

client = na.Neo4jArrowClient("myhost.domain.com",
"mygraph",
port=8491,
database="neo4j",
tls=True,
user=neo4j,
user="neo4j",
password="neo4j",
concurrency=4,
debug=False,
Expand All @@ -77,7 +79,9 @@ environment such as Apache Spark or Apache Beam.
## Projecting a Graph

The process of projecting a graph mirrors the protocol outlined in the
docs for [projecting graphs using Apache Arrow](https://neo4j.com/docs/graph-data-science/current/graph-project-apache-arrow/). While
docs
for [projecting graphs using Apache Arrow](https://neo4j.com/docs/graph-data-science/current/graph-project-apache-arrow/).
While
the client tries to track and mirror the protocol state internally, it
is ultimately the server that dictates what operations are valid and
when.
Expand All @@ -87,23 +91,51 @@ when.
### 1. Start an import process

#### 1a. Import into an in-memory graph

```python
result = client.start()
result = client.start_create_graph()
```

Calling the `start` method will connect and authenticate the client
Calling the `start_create_graph` method will connect and authenticate the client
(if it hasn't already been connected) and send a `CREATE_GRAPH` action
to the server.

The response (in this case, the `result` object) will be a Python dict
containing the name of the graph being imported:
`start_create_graph()` takes the following optional arguments;

| name | type | default value | description |
|---------------------------|---------------|---------------|------------------------------------------------------------------------------------------------------------------------------|
| force | bool | False | Whether to abort an existing import and restart from scratch |
| undirected_rel_types | Iterable[str] | [] | A list of relationship types that must be imported as undirected. A wildcard (*) can be used to include all the types. |
| inverse_indexed_rel_types | Iterable[str] | [] | A list of relationship types that must be indexed in inverse direction. A wildcard (*) can be used to include all the types. |

#### 1b. Import into a new database (not available on AuraDS)

```python
{ "name": "mygraph" }
result = client.start_create_database()
```

> `start()` takes an optional bool keyword argument "force" that can
> be used for aborting an existing import and restarting from scratch.
Calling the `start_create_database` method will connect and authenticate the client
(if it hasn't already been connected) and send a `CREATE_DATABASE` action
to the server.

`start_create_database()` takes the following optional arguments;

| name | type | default value | description |
|-------------------|------|---------------|-------------------------------------------------------------------------------------------------------------------------------------|
| force | bool | False | Force deletes any existing database files prior to the import. |
| id_type | str | INTEGER | Sets the node id type used in the input data. Can be either INTEGER or STRING |
| id_property | str | originalId | The node property key which stores the node id of the input data. |
| record_format | str | "" | Database record format. Valid values are blank (no value, default), standard, aligned, or high_limit. |
| high_io | bool | False | Ignore environment-based heuristics, and specify whether the target storage subsystem can support parallel IO with high throughput. |
| use_bad_collector | bool | False | Collects bad node and relationship records during import and writes them into the log. |

The response (in this case, the `result` object) will be a Python dict
containing the name of the graph/database being imported:

```python
{"name": "mygraph"}
```

### 2a. Feed Nodes

Expand All @@ -120,7 +152,7 @@ import pyarrow as pa
# Create two nodes, :Person and the other :Person:VIP, with an age property
t = pa.Table.from_pydict({
"nodeId": [1, 2],
"labels": [["User"],["User", "Admin"]],
"labels": [["User"], ["User", "Admin"]],
"age": [21, 40],
})

Expand Down Expand Up @@ -152,7 +184,7 @@ nodes loaded (from the point of view of the server) and the name of
the graph beign imported. For example:

```python
{ "name": "mygraph", "node_count": 2 }
{"name": "mygraph", "node_count": 2}
```

### 3a. Feeding Relationships
Expand All @@ -161,6 +193,7 @@ Relationships are loaded similarly to nodes, albeit with a different schema requ

```python
import pyarrow as pa

t = pyarrow.Table.from_pydict({
"sourceNodeId": [1, 1, 2],
"targetNodeId": [2, 1, 1],
Expand Down Expand Up @@ -196,7 +229,7 @@ name of the graph being imported and the number of relationships as
observed by the server-side:

```python
{ "name": "mygraph", "relationship_count": 3 }
{"name": "mygraph", "relationship_count": 3}
```

### 4. Validating the Import
Expand All @@ -222,19 +255,7 @@ print({
For the previous examples, you should see something like:

```python
{ "nodes": (["User", "Admin"], 2), "edges": (["KNOWS", "SELF"], 3) }
```

## Creating a Database

The Neo4j GDS Arrow Flight Service also supports database creation if
running a self-managed installation of Neo4j. The process is the exact
same as above for importing a graph, but with a single change to the
initial start step: pass in an overriding action name with the value
`"CREATE_DATABASE"`.

```python
result = client.start("CREATE_DATABASE")
{"nodes": (["User", "Admin"], 2), "edges": (["KNOWS", "SELF"], 3)}
```

## Streaming Graph Data and Features
Expand Down Expand Up @@ -276,9 +297,12 @@ following (assuming a single item in the list):

```python
[pyarrow.RecordBatch
nodeId: int64 not null
louvain: int64 not null
pageRank: int64 not null]
nodeId: int64
not null
louvain: int64
not null
pageRank: int64
not null]
```

### 2. Streaming Relationships
Expand All @@ -305,18 +329,109 @@ topology = client.read_edges()
edges = client.read_edges(properties=["score"], relationship_types=["SIMILAR"])
```

## The Graph Model

A graph model could also be used, constructed programmatically or via JSON, to
dictate how to translate the datasource fields to the appropriate parts (nodes,
edges) of the intended graph.

In Python, it looks like:

```python
from neo4j_arrow.model import Graph

G = (
Graph(name="test", db="neo4j")
.with_node(Node(source="gs://.*/papers.*parquet", label_field="labels",
key_field="paper"))
.with_node(Node(source="gs://.*/authors.*parquet", label_field="labels",
key_field="author"))
.with_node(Node(source="gs://.*/institution.*parquet", label_field="labels",
key_field="institution"))
.with_edge(Edge(source="gs://.*/citations.*parquet", type_field="type",
source_field="source", target_field="target"))
.with_edge(Edge(source="gs://.*/affiliation.*parquet", type_field="type",
source_field="author", target_field="institution"))
.with_edge(Edge(source="gs://.*/authorship.*parquet", type_field="type",
source_field="author", target_field="paper"))
)
```

The same graph model, but in JSON:

```json
{
"name": "test",
"db": "neo4j",
"nodes": [
{
"source": "gs://.*/papers.*parquet",
"label_field": "labels",
"key_field": "paper"
},
{
"source": "gs://.*/authors.*parquet",
"label_field": "labels",
"key_field": "author"
},
{
"source": "gs://.*/institution.*parquet",
"label_field": "labels",
"key_field": "institution"
}
],
"edges": [
{
"source": "gs://.*/citations.*parquet",
"type_field": "type",
"source_field": "source",
"target_field": "target"
},
{
"source": "gs://.*/affiliation.*parquet",
"type_field": "type",
"source_field": "author",
"target_field": "institution"
},
{
"source": "gs://.*/authorship.*parquet",
"type_field": "type",
"source_field": "author",
"target_field": "paper"
}
]
}
```

The fields of the Nodes and Edges in the model have specific purposes:

- `source` -- a regex pattern used to match source data against the model, i.e.
it's used to determine which node or edge a record corresponds to.
- `label` -- the fixed label to be used for the imported nodes.
- `label_field` -- the source field name containing the node label or labels
- `key_field` -- the source field name containing the unique node identifier
(NB: this currently must be a numeric field as of GDS 2.1.)
- `type` -- the fixed edge type for the imported edges.
- `type_field` -- the source field name containing the edge type
- `source_field` -- the source field name containing the node identifier of the
origin of an edge.
- `target_field` -- the source field name containing the node identifier of the
target of an edge.

Example model JSON files are provided in [example_models](./example_models).

### Streaming Caveats

There are a few known caveats to be aware of when creating and consuming Arrow-based streams from Neo4j GDS:

- You must consume the stream in its entirety to avoid blocking
server-side threads.
- While recent versions of GDS will include a timeout, older
versions will consume one or many threads until the stream is
consumed.
- There's no API call (yet) for aborting a stream, so failing to
consume the stream will prevent the threads from having tasks
scheduled on them for other stream requests.
- While recent versions of GDS will include a timeout, older
versions will consume one or many threads until the stream is
consumed.
- There's no API call (yet) for aborting a stream, so failing to
consume the stream will prevent the threads from having tasks
scheduled on them for other stream requests.

- The only way to request Nodes is to do so by property, which means
nodes that don't have properties may not be streamable today.
Expand All @@ -326,35 +441,22 @@ There are a few known caveats to be aware of when creating and consuming Arrow-b
The `neo4j_arrow` module is used by multiple existing Neo4j projects:

- Google Dataflow Flex Template for Neo4j GDS & AuraDS
- An Apache Beam pipeline for large scale import of graphs.
- https://github.com/neo4j-field/dataflow-flex-pyarrow-to-gds
- An Apache Beam pipeline for large scale import of graphs.
- https://github.com/neo4j-field/dataflow-flex-pyarrow-to-gds

- Google BigQuery Stored Procedure for Neo4j GDS & AuraDS
- An Apache Spark job powering a BigQuery Stored Procedure for
bidirectionl data integration between BigQuery Tables and Neo4j
GDS graphs.
- https://github.com/neo4j-field/bigquery-connector
- An Apache Spark job powering a BigQuery Stored Procedure for
bidirectionl data integration between BigQuery Tables and Neo4j
GDS graphs.
- https://github.com/neo4j-field/bigquery-connector

- Neo4j GraphConnect 2022 Keynote Demo
- A Jupyter Notebook-based approach for high-performance streaming
of BigQuery data into Neo4j GDS as featured at Neo4j GraphConnect
2022.
- https://github.com/neo4j-product-examples/ds-graphconnect-2022-demo

# Advanced Usage & Features

The `neo4j_arrow` module also supports some minor last-mile
translation of Arrow schema and field filtering using "graph models."
This primarily exists to help simplify loading PyArrow buffers that
don't exactly match the required schema (e.g. having a "nodeId" field
name) or having additional fields you don't care about or aren't
supported.

For examples and more details on the concept of using "models" to
perform this last-mile transformation, see the Dataflow project README
section titled
["The Graph Model"](https://github.com/neo4j-field/dataflow-flex-pyarrow-to-gds#the-graph-model).
- A Jupyter Notebook-based approach for high-performance streaming
of BigQuery data into Neo4j GDS as featured at Neo4j GraphConnect
2022.
- https://github.com/neo4j-product-examples/ds-graphconnect-2022-demo

# Copyright & License

`neo4j_arrow` is licensed under the Apache Software License version
2.0. All content is copyright © Neo4j Sweden AB.
41 changes: 41 additions & 0 deletions example_models/bigquery_model.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"name": "testbq",
"db": "neo4j",
"nodes": [
{
"source": "paper.*",
"label_field": "labels",
"key_field": "paper"
},
{
"source": "authors.*",
"label_field": "labels",
"key_field": "author"
},
{
"source": "institution.*",
"label_field": "labels",
"key_field": "institution"
}
],
"edges": [
{
"source": "citations.*",
"type_field": "type",
"source_field": "source",
"target_field": "target"
},
{
"source": "affiliation.*",
"type_field": "type",
"source_field": "author",
"target_field": "institution"
},
{
"source": "authorship.*",
"type_field": "type",
"source_field": "author",
"target_field": "paper"
}
]
}
Loading

0 comments on commit 686351a

Please sign in to comment.