Skip to content

feat(processing_engine): Runtime and Write-back improvements#25672

Merged
jacksonrnewhouse merged 1 commit intomainfrom
processing_engine/write-back
Dec 18, 2024
Merged

feat(processing_engine): Runtime and Write-back improvements#25672
jacksonrnewhouse merged 1 commit intomainfrom
processing_engine/write-back

Conversation

@jacksonrnewhouse
Copy link
Copy Markdown

This makes two major changes, running plugins in their own tasks and enabling writing back line protocol to the WriteBuffer.

Running Plugins as Separate Tasks

Each plugin now runs in its own task, started either when created through the API or when the system is started. It is connected to the rest of the system through two mechanisms:

  • A broadcast channel whose sender is owned by the QueryableBuffer and whose receiver it polls from.
  • An Arc to enable write-backs. This also gives access to the catalog and much of the rest of the db as we continue to build out the API.
    Right now the only message being sent is Arc<WalContents>, but I expect the PluginEvent enum to be expanded both for other triggering events and control messages, e.g. Shutdown, Status, etc.

Write-Back Line Protocol

The running pythons have a reference to the WriteBuffer, and this PR also adds a simple receiver for python to write line protocols to with the insert_line_protocol call. Everything is buffered per PluginEvent, and data is copied several times, so there is a lot of room for improvement.

Example REST calls

Creating a Plugin:

curl -X POST 'http://127.0.0.1:8181/api/v3/configure/processing_engine_plugin' \ 
-H 'Content-Type: application/json' \
-d '{
    "db": "load_test",
    "plugin_name": "counter_output",
    "plugin_type": "wal_rows",                 
    "function_name": "count_points",
    "code": "import time\nfrom collections import defaultdict\nfrom influxdb_client_3.write_client.client.write.point import Point\n\ndef count_points(iterator, output):\n    start_time = time.time()\n    table_counts = defaultdict(int)\n    \n    while True:\n        point = iterator.next_point()\n        if point is None:\n            break\n            \n        measurement = point._name\n        table_counts[measurement] += 1\n    \n    for table_name, count in table_counts.items():\n        stats_point = Point(\"table_counts\")\n        stats_point.tag(\"table_name\", table_name)\n        stats_point.field(\"row_count\", count)\n        output.insert_line_protocol(str(stats_point))\n    \n    end_time = time.time()\n    duration = end_time - start_time\n    \n    print(f\"Processing completed in {duration:.2f} seconds\")"
}'

Creating a Trigger:

 curl -X POST 'http://127.0.0.1:8181/api/v3/configure/processing_engine_trigger' \
-H 'Content-Type: application/json' \
-d '{
    "db": "load_test",
    "plugin_name": "counter_output",
    "trigger_name": "counter_output_trigger",
    "trigger_specification": {
         "single_table_wal_write": {"table_name": "measurement_data"}
    }
}'

@jacksonrnewhouse jacksonrnewhouse force-pushed the processing_engine/write-back branch 2 times, most recently from a24353c to 4e9c322 Compare December 17, 2024 21:39
Copy link
Copy Markdown
Member

@pauldix pauldix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of small comments, but otherwise LGTM

* Move processing engine invocation to a seperate tokio task.
* Support writing back line protocol from python via insert_line_protocol().
* Update structs to work with bincode.
@jacksonrnewhouse jacksonrnewhouse force-pushed the processing_engine/write-back branch from 4e9c322 to 354b185 Compare December 18, 2024 00:03
@jacksonrnewhouse jacksonrnewhouse merged commit 8bfccb7 into main Dec 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants