Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][input] - Websocket Input with CEL engine #37774

Merged
merged 39 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
0de28ef
initial commit
ShourieG Nov 15, 2023
58a7e23
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG Nov 30, 2023
1b58923
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG Dec 21, 2023
1b8980e
working version of websocket input with cel engine
ShourieG Jan 29, 2024
ee9cce6
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG Jan 29, 2024
9819004
updated go mod and NOTICE
ShourieG Jan 29, 2024
f2056a0
added regex support to cel engine, added all metrics params to releva…
ShourieG Jan 30, 2024
de1b707
removed unused config from input struct
ShourieG Jan 30, 2024
c0dddc6
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG Jan 30, 2024
1f6da1a
added redactor
ShourieG Jan 30, 2024
85ad3e6
removed cel references in logs
ShourieG Jan 30, 2024
8e7ad35
addressed draft PR suggestions and added more metrics
ShourieG Feb 1, 2024
4768d4a
added tests
ShourieG Feb 5, 2024
8aaed71
added retry function
ShourieG Feb 5, 2024
d8bc064
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG Feb 5, 2024
9347ff6
updated changelog
ShourieG Feb 5, 2024
f7b2d19
updated tests
ShourieG Feb 5, 2024
b618501
addressed PR suggestions, removed auto retry mechanism for the moment
ShourieG Feb 6, 2024
1e5b205
addressed PR suggestions, removed auto retry mechanism for the moment
ShourieG Feb 6, 2024
e96e474
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG Feb 6, 2024
e9aa0b3
added documentation and updated codeowners
ShourieG Feb 6, 2024
2b1f90f
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG Feb 6, 2024
a71731b
updated experimental tags
ShourieG Feb 6, 2024
2ae755b
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG Feb 6, 2024
129c5bd
added a new test, cleaned up some code and logic
ShourieG Feb 6, 2024
21cdf6f
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG Feb 6, 2024
82b54bc
added cursor condition check test and updated filebeat-options asciidoc
ShourieG Feb 8, 2024
581a210
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG Feb 8, 2024
c550da9
added auth tests, removed api-key config and updated it to custom aut…
ShourieG Feb 8, 2024
3d6f070
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG Feb 8, 2024
8a61509
addressed PR suggestions and added config tests
ShourieG Feb 9, 2024
6d00e55
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG Feb 9, 2024
c301b16
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG Feb 9, 2024
d60d73b
Update x-pack/filebeat/docs/inputs/input-websocket.asciidoc
ShourieG Feb 12, 2024
b3a7e20
Update x-pack/filebeat/docs/inputs/input-websocket.asciidoc
ShourieG Feb 12, 2024
341ad87
Update x-pack/filebeat/docs/inputs/input-websocket.asciidoc
ShourieG Feb 12, 2024
dd850d2
added debug log for cel state before cel eval, updated the docs accor…
ShourieG Feb 12, 2024
57a5dac
Merge remote-tracking branch 'upstream/main' into feature/websocket
ShourieG Feb 12, 2024
c68ad52
updated URL config docs
ShourieG Feb 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ CHANGELOG*
/x-pack/filebeat/input/lumberjack/ @elastic/security-service-integrations
/x-pack/filebeat/input/netflow/ @elastic/sec-deployment-and-devices
/x-pack/filebeat/input/o365audit/ @elastic/security-service-integrations
/x-pack/filebeat/input/websocket/ @elastic/security-service-integrations
/x-pack/filebeat/module/activemq @elastic/obs-infraobs-integrations
/x-pack/filebeat/module/aws @elastic/obs-cloud-monitoring
/x-pack/filebeat/module/awsfargate @elastic/obs-cloud-monitoring
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Relax TCP/UDP metric polling expectations to improve metric collection. {pull}37714[37714]
- Add support for PEM-based Okta auth in HTTPJSON. {pull}37772[37772]
- Prevent complete loss of long request trace data. {issue}37826[37826] {pull}37836[37836]
- Added experimental version of the Websocket Input. {pull}37774[37774]
- Add support for PEM-based Okta auth in CEL. {pull}37813[37813]

*Auditbeat*
Expand Down
64 changes: 32 additions & 32 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18447,6 +18447,38 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


--------------------------------------------------------------------------------
Dependency : github.com/gorilla/websocket
Version: v1.4.2
Licence type (autodetected): BSD-2-Clause
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/gorilla/websocket@v1.4.2/LICENSE:

Copyright (c) 2013 The Gorilla WebSocket Authors. All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


--------------------------------------------------------------------------------
Dependency : github.com/h2non/filetype
Version: v1.1.1
Expand Down Expand Up @@ -41594,38 +41626,6 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


--------------------------------------------------------------------------------
Dependency : github.com/gorilla/websocket
Version: v1.4.2
Licence type (autodetected): BSD-2-Clause
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/gorilla/websocket@v1.4.2/LICENSE:

Copyright (c) 2013 The Gorilla WebSocket Authors. All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


--------------------------------------------------------------------------------
Dependency : github.com/hashicorp/cronexpr
Version: v1.1.0
Expand Down
3 changes: 3 additions & 0 deletions filebeat/docs/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ You can configure {beatname_uc} to use the following inputs:
* <<{beatname_lc}-input-tcp>>
* <<{beatname_lc}-input-udp>>
* <<{beatname_lc}-input-gcs>>
* <<{beatname_lc}-input-websocket>>

include::multiline.asciidoc[]

Expand Down Expand Up @@ -145,3 +146,5 @@ include::inputs/input-udp.asciidoc[]
include::inputs/input-unix.asciidoc[]

include::../../x-pack/filebeat/docs/inputs/input-gcs.asciidoc[]

include::../../x-pack/filebeat/docs/inputs/input-websocket.asciidoc[]
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ require (
github.com/googleapis/gax-go/v2 v2.12.0
github.com/gorilla/handlers v1.5.1
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/icholy/digest v0.1.22
github.com/lestrrat-go/jwx/v2 v2.0.19
github.com/otiai10/copy v1.12.0
Expand Down Expand Up @@ -299,7 +300,6 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect
github.com/googleapis/gnostic v0.5.5 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/cronexpr v1.1.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down
272 changes: 272 additions & 0 deletions x-pack/filebeat/docs/inputs/input-websocket.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
[role="xpack"]

:type: websocket
:mito_version: v1.8.0
:mito_docs: https://pkg.go.dev/github.com/elastic/mito@{mito_version}

[id="{beatname_lc}-input-{type}"]
=== Websocket Input
experimental[]

The `websocket` input reads messages from a websocket server or api endpoint. This input uses the `CEL engine` and the `mito` library interally to parse and process the messages. Having support for `CEL` allows you to parse and process the messages in a more flexible way. It has many similarities with the `cel` input as to how the `CEL` programs are written but deviates in the way the messages are read and processed. The `websocket` input is a `streaming` input and can only be used to read messages from a websocket server or api endpoint.

This input supports:

* Auth
** Basic
** Bearer
** Custom

NOTE: The `websocket` input as of now does not support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart.

==== Execution

The execution environment provided for the input includes includes the functions, macros, and global variables provided by the mito library.
A single JSON object is provided as an input accessible through a `state` variable. `state` contains a `response` map field and may contain arbitrary other fields configured via the input's `state` configuration. If the CEL program saves cursor states between executions of the program, the configured `state.cursor` value will be replaced by the saved cursor prior to execution.

On start the `state` will be something like this:

["source","json",subs="attributes"]
----
{
"response": { ... },
"cursor": { ... },
...
}
----
The `websocket` input creates a `response` field in the state map and attaches the websocket message to this field. All `CEL` programs written should act on this `response` field. Additional fields may be present at the root of the object and if the program tolerates it, the cursor value may be absent. Only the cursor is persisted over restarts, but all fields in state are retained between iterations of the processing loop except for the produced events array, see below.

If the cursor is present the program should process or filter out responses based on its value. If cursor is not present all responses should be processed as per the program's logic.

After completion of a program's execution it should return a single object with a structure looking like this:

["source","json",subs="attributes"]
----
{
"events": [ <1>
{...},
...
],
"cursor": [ <2>
{...},
...
]
}
----

<1> The `events` field must be present, but may be empty or null. If it is not empty, it must only have objects as elements.
The field could be an array or a single object that will be treated as an array with a single element. This depends completely on the websocket server or api endpoint. The `events` field is the array of events to be published to the output. Each event must be a JSON object.

<2> If `cursor` is present it must be either be a single object or an array with the same length as events; each element _i_ of the `cursor` will be the details for obtaining the events at and beyond event _i_ in the `events` array. If the `cursor` is a single object, it will be the details for obtaining events after the last event in the `events` array and will only be retained on successful publication of all the events in the `events` array.


Example configuration:

["source","yaml",subs="attributes"]
----
filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: websocket
url: ws://localhost:443/v1/stream
program: |
bytes(state.response).decode_json().as(inner_body,{
"events": {
"message": inner_body.encode_json(),
}
})
----

==== Debug state logging

The Websocket input will log the complete state when logging at the DEBUG level before and after CEL evaluation.
This will include any sensitive or secret information kept in the `state` object, and so DEBUG level logging should not be used in production when sensitive information is retained in the `state` object. See <<websocket-state-redact,`redact`>> configuration parameters for settings to exclude sensitive fields from DEBUG logs.

==== Authentication
The Websocket input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively.

Example configurations with authentication:

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
auth.basic_token: "dXNlcjpwYXNzd29yZA=="
url: wss://localhost:443/_stream
----

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
auth.bearer_token: "dXNlcjpwYXNzd29yZA=="
url: wss://localhost:443/_stream
----

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
auth.custom:
header: "x-api-key"
value: "dXNlcjpwYXNzd29yZA=="
url: wss://localhost:443/_stream
----

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
auth.custom:
header: "Auth"
value: "Bearer dXNlcjpwYXNzd29yZA=="
url: wss://localhost:443/_stream
----

[[input-state-websocket]]
==== Input state

The `websocket` input keeps a runtime state between every message received. This state can be accessed by the CEL program and may contain arbitrary objects.
The state must contain a `response` map and may contain any object the user wishes to store in it. All objects are stored at runtime, except `cursor`, which has values that are persisted between restarts.

==== Configuration options

The `websocket` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.

[[program-websocket]]
[float]
==== `program`

The CEL program that is executed on each message received. This field should ideally be present but if not the default program given below is used.

["source","yaml",subs="attributes"]
----
program: |
bytes(state.response).decode_json().as(inner_body,{
"events": {
"message": inner_body.encode_json(),
}
})
----

[[state-websocket]]
[float]
==== `state`

`state` is an optional object that is passed to the CEL program on the first execution. It is available to the executing program as the `state` variable. Except for the `state.cursor` field, `state` does not persist over restarts.

[[cursor-websocket]]
[float]
==== `state.cursor`

The cursor is an object available as `state.cursor` where arbitrary values may be stored. Cursor state is kept between input restarts and updated after each event of a request has been published. When a cursor is used the CEL program must either create a cursor state for each event that is returned by the program, or a single cursor that reflects the cursor for completion of the full set of events.

["source","yaml",subs="attributes"]
----
filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: websocket
url: ws://localhost:443/v1/stream
program: |
bytes(state.response).as(body, {
"events": [body.decode_json().with({
"last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ?
state.cursor.last_requested_at
:
now
})],
"cursor": {"last_requested_at": now}
})
----

[[regexp-websocket]]
[float]
==== `regexp`

A set of named regular expressions that may be used during a CEL program's execution using the `regexp` extension library. The syntax used for the regular expressions is https://github.com/google/re2/wiki/Syntax[RE2].

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
# Define two regular expressions, 'products' and 'solutions' for use during CEL program execution.
regexp:
products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)'
solutions: '(?i)(Search|Observability|Security)'
----

[[websocket-state-redact]]
[float]
==== `redact`

During debug level logging, the `state` object and the resulting evaluation result are included in logs. This may result in leaking of secrets. In order to prevent this, fields may be redacted or deleted from the logged `state`. The `redact` configuration allows users to configure this field redaction behaviour. For safety reasons if the `redact` configuration is missing a warning is logged.

In the case of no-required redaction an empty `redact.fields` configuration should be used to silence the logged warning.

["source","yaml",subs="attributes"]
----
- type: websocket
redact:
fields: ~
----

As an example, if a user-constructed Basic Authentication request is used in a CEL program the password can be redacted like so

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
url: ws://localhost:443/_stream
state:
user: user@domain.tld
password: P@$$W0₹D
redact:
fields:
- password
delete: true
----

Note that fields under the `auth` configuration hierarchy are not exposed to the `state` and so do not need to be redacted. For this reason it is preferable to use these for authentication over the request construction shown above where possible.

[float]
==== `redact.fields`

This specifies fields in the `state` to be redacted prior to debug logging. Fields listed in this array will be either replaced with a `*` or deleted entirely from messages sent to debug logs.

[float]
==== `redact.delete`

This specifies whether fields should be replaced with a `*` or deleted entirely from messages sent to debug logs. If delete is `true`, fields will be deleted rather than replaced.

[float]
=== Metrics

This input exposes metrics under the <<http-endpoint, HTTP monitoring endpoint>>.
These metrics are exposed under the `/inputs` path. They can be used to
observe the activity of the input.

[options="header"]
|=======
| Metric | Description
| `url` | URL of the input resource.
| `cel_eval_errors` | Number of errors encountered during cel program evaluation.
| `errors_total` | Number of errors encountered over the life cycle of the input.
| `batches_received_total` | Number of event arrays received.
| `batches_published_total` | Number of event arrays published.
| `received_bytes_total` | Number of bytes received over the life cycle of the input.
| `events_received_total` | Number of events received.
| `events_published_total` | Number of events published.
| `cel_processing_time` | Histogram of the elapsed successful CEL program processing times in nanoseconds.
| `batch_processing_time` | Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches).
|=======

==== Developer tools

A stand-alone CEL environment that implements the majority of the websocket input's Comment Expression Language functionality is available in the https://github.com/elastic/mito[Elastic Mito] repository. This tool may be used to help develop CEL programs to be used by the input. Installation is available from source by running `go install github.com/elastic/mito/cmd/mito@latest` and requires a Go toolchain.

[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

NOTE: The `websocket` input is currently tagged as experimental and might have bugs and other issues. Please report any issues on the https://github.com/elastic/beats[Github] repository.

:type!:
Loading
Loading