-
Notifications
You must be signed in to change notification settings - Fork 5k
[filebeat][input] - Websocket Input with CEL engine #37774
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 22 commits
Commits
Show all changes
39 commits
Select commit
Hold shift + click to select a range
0de28ef
initial commit
ShourieG 58a7e23
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG 1b58923
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG 1b8980e
working version of websocket input with cel engine
ShourieG ee9cce6
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG 9819004
updated go mod and NOTICE
ShourieG f2056a0
added regex support to cel engine, added all metrics params to releva…
ShourieG de1b707
removed unused config from input struct
ShourieG c0dddc6
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG 1f6da1a
added redactor
ShourieG 85ad3e6
removed cel references in logs
ShourieG 8e7ad35
addressed draft PR suggestions and added more metrics
ShourieG 4768d4a
added tests
ShourieG 8aaed71
added retry function
ShourieG d8bc064
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG 9347ff6
updated changelog
ShourieG f7b2d19
updated tests
ShourieG b618501
addressed PR suggestions, removed auto retry mechanism for the moment
ShourieG 1e5b205
addressed PR suggestions, removed auto retry mechanism for the moment
ShourieG e96e474
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG e9aa0b3
added documentation and updated codeowners
ShourieG 2b1f90f
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG a71731b
updated experimental tags
ShourieG 2ae755b
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG 129c5bd
added a new test, cleaned up some code and logic
ShourieG 21cdf6f
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG 82b54bc
added cursor condition check test and updated filebeat-options asciidoc
ShourieG 581a210
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG c550da9
added auth tests, removed api-key config and updated it to custom aut…
ShourieG 3d6f070
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG 8a61509
addressed PR suggestions and added config tests
ShourieG 6d00e55
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG c301b16
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG d60d73b
Update x-pack/filebeat/docs/inputs/input-websocket.asciidoc
ShourieG b3a7e20
Update x-pack/filebeat/docs/inputs/input-websocket.asciidoc
ShourieG 341ad87
Update x-pack/filebeat/docs/inputs/input-websocket.asciidoc
ShourieG dd850d2
added debug log for cel state before cel eval, updated the docs accor…
ShourieG 57a5dac
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG c68ad52
updated URL config docs
ShourieG File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,265 @@ | ||
[role="xpack"] | ||
|
||
:type: websocket | ||
:mito_version: v1.8.0 | ||
:mito_docs: https://pkg.go.dev/github.com/elastic/mito@{mito_version} | ||
|
||
[id="{beatname_lc}-input-{type}"] | ||
=== Websocket Input | ||
beta[] | ||
|
||
The `websocket` input reads messages from a websocket server or api endpoint. This input uses the `CEL engine` and the `mito` library interally to parse and process the messages. Having support for `CEL` allows you to parse and process the messages in a more flexible way. It has many similarities with the `cel` input as to how the `CEL` programs are written but deviates in the way the messages are read and processed. The `websocket` input is a `streaming` input and can only be used to read messages from a websocket server or api endpoint. | ||
|
||
This input supports: | ||
|
||
* Auth | ||
** Basic | ||
** Bearer | ||
** API Key | ||
|
||
NOTE: The `websocket` input as of now does not support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart. | ||
|
||
==== Execution | ||
|
||
The execution environment provided for the input includes includes the functions, macros, and global variables provided by the mito library. | ||
A single JSON object is provided as an input accessible through a `state` variable. `state` contains a `response` map field and may contain arbitrary other fields configured via the input's `state` configuration. If the CEL program saves cursor states between executions of the program, the configured `state.cursor` value will be replaced by the saved cursor prior to execution. | ||
|
||
On start the `state` is will be something like this: | ||
|
||
["source","json",subs="attributes"] | ||
---- | ||
{ | ||
"response": { ... }, | ||
"cursor": { ... }, | ||
... | ||
} | ||
---- | ||
The `websocket` input creates a `response` field in the state map and attaches the websocket message to this field. All `CEL` programs written should act on this `response` field. Additional fields may be present at the root of the object and if the program tolerates it, the cursor value may be absent.Only the cursor is persisted over restarts, but all fields in state are retained between iterations of the processing loop except for the produced events array, see below. | ||
|
||
If the cursor is present the program should perform and process requests based on its value. | ||
If cursor is not present the program must have alternative logic to determine what requests to make. | ||
ShourieG marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
After completion of a program's execution it should return a single object with a structure looking like this: | ||
|
||
["source","json",subs="attributes"] | ||
---- | ||
{ | ||
"events": [ <1> | ||
{...}, | ||
... | ||
], | ||
"cursor": [ <2> | ||
{...}, | ||
... | ||
], | ||
ShourieG marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"want_more": false <3> | ||
ShourieG marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
---- | ||
|
||
<1> The `events` field must be present, but may be empty or null. If it is not empty, it must only have objects as elements. | ||
The field could be an array or a single object that will be treated as an array with a single element. This depends completely on the websocket server or api endpoint. The `events` field is the array of events to be published to the output. Each event must be a JSON object. | ||
|
||
<2> If `cursor` is present it must be either be a single object or an array with the same length as events; each element _i_ of the `cursor` will be the details for obtaining the events at and beyond event _i_ in the `events` array. If the `cursor` is a single object, it will be the details for obtaining events after the last event in the `events` array and will only be retained on successful publication of all the events in the `events` array. | ||
|
||
<3> Unlike in the `cel` input, the `want_more` field is always false. This is because the `websocket` input is a streaming input and will always be ready to receive more messages from the websocket server or api endpoint, however the `CEL` program will only be executed once for each message received. | ||
|
||
ShourieG marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Example configuration: | ||
|
||
["source","yaml",subs="attributes"] | ||
---- | ||
filebeat.inputs: | ||
# Read and process simple websocket messages from a local websocket server | ||
- type: websocket | ||
resource.url: ws://localhost:443/v1/stream | ||
program: | | ||
bytes(state.response).decode_json().as(inner_body,{ | ||
"events": { | ||
"message": inner_body.encode_json(), | ||
} | ||
}) | ||
ShourieG marked this conversation as resolved.
Show resolved
Hide resolved
|
||
---- | ||
|
||
==== Debug state logging | ||
|
||
The Websocket input will log the complete state after evaluation when logging at the DEBUG level. | ||
efd6 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
This will include any sensitive or secret information kept in the `state` object, and so DEBUG level logging should not be used in production when sensitive information is retained in the `state` object. See <<websocket-state-redact,`redact`>> configuration parameters for settings to exclude sensitive fields from DEBUG logs. | ||
|
||
==== Authentication | ||
The Websocket input supports authentication via Basic token authentication, Bearer token authentication and authentication via Api-Key. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and Api-Key contains an api key. These token/key values are are added to the request headers and are not exposed to the `state` object. For the Api-Key authentication, the header and value are both configurable since different servers might have different api-key header conventions. | ||
|
||
Example configurations with authentication: | ||
|
||
["source","yaml",subs="attributes"] | ||
---- | ||
filebeat.inputs: | ||
- type: websocket | ||
auth.basic_token: "dXNlcjpwYXNzd29yZA==" | ||
resource.url: wss://localhost:443/_stream | ||
---- | ||
|
||
["source","yaml",subs="attributes"] | ||
---- | ||
filebeat.inputs: | ||
- type: websocket | ||
auth.bearer_token: "dXNlcjpwYXNzd29yZA==" | ||
resource.url: wss://localhost:443/_stream | ||
---- | ||
|
||
["source","yaml",subs="attributes"] | ||
---- | ||
filebeat.inputs: | ||
- type: websocket | ||
auth.api_key: | ||
header: "x-api-key" | ||
value: "dXNlcjpwYXNzd29yZA==" | ||
resource.url: wss://localhost:443/_stream | ||
---- | ||
|
||
[[input-state-websocket]] | ||
==== Input state | ||
|
||
The `websocket` input keeps a runtime state between every message received. This state can be accessed by the CEL program and may contain arbitrary objects. | ||
The state must contain a `response` map and may contain any object the user wishes to store in it. All objects are stored at runtime, except `cursor`, which has values that are persisted between restarts. | ||
|
||
==== Configuration options | ||
|
||
The `websocket` input supports the following configuration options plus the | ||
<<{beatname_lc}-input-{type}-common-options>> described later. | ||
|
||
[[program-websocket]] | ||
[float] | ||
==== `program` | ||
|
||
The CEL program that is executed on each message received. This field should ideally be present but if not the default program given below is used. | ||
|
||
["source","yaml",subs="attributes"] | ||
---- | ||
program: | | ||
bytes(state.response).decode_json().as(inner_body,{ | ||
"events": { | ||
"message": inner_body.encode_json(), | ||
} | ||
}) | ||
ShourieG marked this conversation as resolved.
Show resolved
Hide resolved
|
||
---- | ||
|
||
[[state-websocket]] | ||
[float] | ||
==== `state` | ||
|
||
`state` is an optional object that is passed to the CEL program on the first execution. It is available to the executing program as the `state` variable. Except for the `state.cursor` field, `state` does not persist over restarts. | ||
|
||
[[cursor-websocket]] | ||
[float] | ||
==== `state.cursor` | ||
|
||
The cursor is an object available as `state.cursor` where arbitrary values may be stored. Cursor state is kept between input restarts and updated after each event of a request has been published. When a cursor is used the CEL program must either create a cursor state for each event that is returned by the program, or a single cursor that reflect the cursor for completion of the full set of events. | ||
ShourieG marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
["source","yaml",subs="attributes"] | ||
---- | ||
filebeat.inputs: | ||
# Read and process simple websocket messages from a local websocket server | ||
- type: websocket | ||
resource.url: ws://localhost:443/v1/stream | ||
program: | | ||
bytes(state.response).as(body, { | ||
"events": [body.decode_json().with({ | ||
"last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ? | ||
state.cursor.last_requested_at | ||
: | ||
now | ||
})], | ||
"cursor": {"last_requested_at": now} | ||
}) | ||
---- | ||
|
||
[[regexp-websocket]] | ||
[float] | ||
==== `regexp` | ||
|
||
A set of named regular expressions that may be used during a CEL program's execution using the `regexp` extension library. The syntax used for the regular expressions is https://github.com/google/re2/wiki/Syntax[RE2]. | ||
|
||
["source","yaml",subs="attributes"] | ||
---- | ||
filebeat.inputs: | ||
- type: websocket | ||
# Define two regular expressions, 'products' and 'solutions' for use during CEL program execution. | ||
regexp: | ||
products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)' | ||
solutions: '(?i)(Search|Observability|Security)' | ||
---- | ||
|
||
[[websocket-state-redact]] | ||
[float] | ||
==== `redact` | ||
|
||
During debug level logging, the `state` object and the resulting evaluation result are included in logs. This may result in leaking of secrets. In order to prevent this, fields may be redacted or deleted from the logged `state`. The `redact` configuration allows users to configure this field redaction behaviour. For safety reasons if the `redact` configuration is missing a warning is logged. | ||
|
||
In the case of no-required redaction an empty `redact.fields` configuration should be used to silence the logged warning. | ||
|
||
["source","yaml",subs="attributes"] | ||
---- | ||
- type: websocket | ||
redact: | ||
fields: ~ | ||
---- | ||
|
||
As an example, if a user-constructed Basic Authentication request is used in a CEL program the password can be redacted like so | ||
|
||
["source","yaml",subs="attributes"] | ||
---- | ||
filebeat.inputs: | ||
- type: websocket | ||
resource.url: ws://localhost:443/_stream | ||
state: | ||
user: user@domain.tld | ||
password: P@$$W0₹D | ||
redact: | ||
fields: | ||
- password | ||
delete: true | ||
---- | ||
|
||
Note that fields under the `auth` configuration hierarchy are not exposed to the `state` and so do not need to be redacted. For this reason it is preferable to use these for authentication over the request construction shown above where possible. | ||
|
||
[float] | ||
==== `redact.fields` | ||
|
||
This specifies fields in the `state` to be redacted prior to debug logging. Fields listed in this array will be either replaced with a `*` or deleted entirely from messages sent to debug logs. | ||
|
||
[float] | ||
==== `redact.delete` | ||
|
||
This specifies whether fields should be replaced with a `*` or deleted entirely from messages sent to debug logs. If delete is `true`, fields will be deleted rather than replaced. | ||
|
||
[float] | ||
=== Metrics | ||
|
||
This input exposes metrics under the <<http-endpoint, HTTP monitoring endpoint>>. | ||
These metrics are exposed under the `/inputs` path. They can be used to | ||
observe the activity of the input. | ||
|
||
[options="header"] | ||
|======= | ||
| Metric | Description | ||
| `resource` | URL or path of the input resource. | ||
| `cel_eval_errors` | Number of errors encountered during cel program evaluation. | ||
| `errors_total` | Number of errors encountered over the life cycle of the input. | ||
| `batches_received_total` | Number of event arrays received. | ||
| `batches_published_total` | Number of event arrays published. | ||
| `received_bytes_total` | Number of bytes received over the life cycle of the input. | ||
| `events_received_total` | Number of events received. | ||
| `events_published_total` | Number of events published. | ||
| `cel_processing_time` | Histogram of the elapsed successful CEL program processing times in nanoseconds. | ||
| `batch_processing_time` | Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches). | ||
|======= | ||
|
||
==== Developer tools | ||
|
||
A stand-alone CEL environment that implements the majority of the websocket input's Comment Expression Language functionality is available in the https://github.com/elastic/mito[Elastic Mito] repository. This tool may be used to help develop CEL programs to be used by the input. Installation is available from source by running `go install github.com/elastic/mito/cmd/mito@latest` and requires a Go toolchain. | ||
|
||
[id="{beatname_lc}-input-{type}-common-options"] | ||
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] | ||
|
||
NOTE: The `websocket` input is currently in beta and might have bugs and other issues. Please report any issues on the https://github.com/elastic/beats[Github] repository. | ||
|
||
:type!: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.