Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checkpoint_harmony_endpoint: improve handling of 404 and 503 errors #13009

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/checkpoint_harmony_endpoint/changelog.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
# newer versions go on top
- version: "0.5.0"
changes:
- description: Improve handling of 404 and 503 API responses.
type: enhancement
link: https://github.com/elastic/integrations/pull/13009
- description: Propagate forensics CEL fixes to all data streams.
type: enhancement
link: https://github.com/elastic/integrations/pull/13009
- version: "0.4.0"
changes:
- description: Update Kibana constraint to support 9.0.0.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ state:
limit: {{limit}}
page_limit: {{page_limit}}
filter: {{filter}}
program: |-

program: |
(
state.?cursor.auth_data.expires.optMap(t,
t.parse_time(time_layout.RFC1123) - now() > duration("5m")
Expand All @@ -35,22 +34,44 @@ program: |-
"accessKey": state.auth_access_key,
}.encode_json(),
}
).do_request().as(resp,
).do_request().as(resp, resp.StatusCode == 200 ?
bytes(resp.Body).decode_json().as(body,
{
"token": body.data.token,
"expires": body.data.expires,
}
)
:
state.with(
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "POST " + state.url.trim_right("/") + "/auth/external: " + (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
)
).as(auth_data,
).as(v, has(v.?events.error) ?
v
: v.as(auth_data,
(state.?cursor.task_id.orValue(null) == null) ?
// No task ID - Submit a query and get its task ID.
{
"startTime": state.?cursor.next_startTime.orValue(
timestamp(now() - duration(state.initial_interval)).format(time_layout.RFC3339)
),
"endTime": timestamp(now() - duration("1m")).format(time_layout.RFC3339),
"endTime": state.?cursor.next_endTime.orValue(
timestamp(now() - duration("1m")).format(time_layout.RFC3339)
),
}.as(timeframe,
request("POST", state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query").with(
{
Expand Down Expand Up @@ -97,7 +118,10 @@ program: |-
"auth_data": auth_data,
"task_id": body.data.taskId,
"page_token": null,
"current_startTime": timeframe.startTime,
"current_endTime": timeframe.endTime,
"next_startTime": timeframe.endTime,
"next_endTime": null,
},
}
)
Expand Down Expand Up @@ -125,6 +149,24 @@ program: |-
}
)
:
(resp.StatusCode == 404) ?
// 404 Not Found - Resubmit the task ID query for the same timeframe.
state.with(
{
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When want_more is true, a dummy event is required to make it immediately re-run. If there are no events it will wait for an interval.

Here, want_more is false, so that dummy event is unnecessary.

I think it should either be an empty array:

Suggested change
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"events": [],

or a single event with an error message (single events get logged):

Suggested change
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"events": { "error": {"message": "404: task ID not found" }},

or the same with the polling message, so the event will be logged, but the pipeline will drop it:

Suggested change
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"events": {"message": {"event": {"reason": "polling"}}.encode_json(), "error": {"message": "404: task ID not found"}},

The last one would be easier if the pipeline expected parsed JSON instead of { "message": "JSON string" }, because the CEL input logging code only logs a single message if it's parsed JSON.

I don't have a strong opinion about which one. The later ones are more messy, but get some useful information in the log/ES. The first one is clean and 404s shouldn't be happening a lot, and if they are they can always be seen with request tracing.

Here's a relevant part of the CEL documentation:

The field should be an array, but in the case of an error condition in the CEL program it is acceptable to return a single object instead of an array; this will will be wrapped as an array for publication and an error will be logged. If the single object contains a key, "error", the error value will be used to update the status of the input to report to Elastic Agent. This can be used to more rapidly respond to API failures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are talking about the 503 case instead of the 404 (that's the one that sets want_more to false).

Thanks for the detailed explanation, I would choose to log the error, as receiving a 503 is not very common and so we can discover it more easily. My question is if we are going to log the error, why not use the same format as for the rest of the errors? I have submitted a commit with the changes, would you mind taking a look at it again?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right it was 503. The change looks good. I think that kind of logged error event could be used for all error cases. Doesn't necessarily have to be now.

For these error cases there are two choices:

  • return single error event to be logged?
  • keep that error event in Elasticsearch or drop it from the ingest pipeline?

So for now it's logged and not dropped, which is fine. I think in the future we could have some conventions for those choices.

"want_more": true,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"task_id": null,
"next_startTime": state.cursor.current_startTime,
"next_endTime": state.cursor.current_endTime,
}
),
}
)
:
(resp.StatusCode == 200) ?
bytes(resp.Body).decode_json().as(body,
(body.data.state == "Ready") ?
// 'Ready' (Results found) - Save the first page token.
Expand Down Expand Up @@ -168,6 +210,24 @@ program: |-
}
)
)
:
state.with(
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET " + state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query: " + (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
)
:
// Task is ready - Use the task ID and page token to retrieve a page of results.
Expand All @@ -194,6 +254,23 @@ program: |-
}
)
:
(resp.StatusCode == 503) ?
// 503 Service Unavailable - Clear the task ID and page token, and end the sequence.
state.with(
{
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"want_more": false,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"task_id": null,
"page_token": null,
}
),
}
)
:
(resp.StatusCode == 200) ?
bytes(resp.Body).decode_json().as(body,
(body.data.nextPageToken != "NULL") ?
// Not the last page - Save the next page token and continue.
Expand Down Expand Up @@ -225,8 +302,26 @@ program: |-
}
)
)
:
state.with(
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "POST " + state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query/retrieve: " + (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
)
)
))
tags:
{{#if preserve_original_event}}
- preserve_original_event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ state:
limit: {{limit}}
page_limit: {{page_limit}}
filter: {{filter}}
program: |-

program: |
(
state.?cursor.auth_data.expires.optMap(t,
t.parse_time(time_layout.RFC1123) - now() > duration("5m")
Expand All @@ -35,22 +34,44 @@ program: |-
"accessKey": state.auth_access_key,
}.encode_json(),
}
).do_request().as(resp,
).do_request().as(resp, resp.StatusCode == 200 ?
bytes(resp.Body).decode_json().as(body,
{
"token": body.data.token,
"expires": body.data.expires,
}
)
:
state.with(
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "POST " + state.url.trim_right("/") + "/auth/external: " + (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
)
).as(auth_data,
).as(v, has(v.?events.error) ?
v
: v.as(auth_data,
(state.?cursor.task_id.orValue(null) == null) ?
// No task ID - Submit a query and get its task ID.
{
"startTime": state.?cursor.next_startTime.orValue(
timestamp(now() - duration(state.initial_interval)).format(time_layout.RFC3339)
),
"endTime": timestamp(now() - duration("1m")).format(time_layout.RFC3339),
"endTime": state.?cursor.next_endTime.orValue(
timestamp(now() - duration("1m")).format(time_layout.RFC3339)
),
}.as(timeframe,
request("POST", state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query").with(
{
Expand Down Expand Up @@ -97,7 +118,10 @@ program: |-
"auth_data": auth_data,
"task_id": body.data.taskId,
"page_token": null,
"current_startTime": timeframe.startTime,
"current_endTime": timeframe.endTime,
"next_startTime": timeframe.endTime,
"next_endTime": null,
},
}
)
Expand Down Expand Up @@ -125,6 +149,24 @@ program: |-
}
)
:
(resp.StatusCode == 404) ?
// 404 Not Found - Resubmit the task ID query for the same timeframe.
state.with(
{
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"want_more": true,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"task_id": null,
"next_startTime": state.cursor.current_startTime,
"next_endTime": state.cursor.current_endTime,
}
),
}
)
:
(resp.StatusCode == 200) ?
bytes(resp.Body).decode_json().as(body,
(body.data.state == "Ready") ?
// 'Ready' (Results found) - Save the first page token.
Expand Down Expand Up @@ -168,6 +210,24 @@ program: |-
}
)
)
:
state.with(
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET " + state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query: " + (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
)
:
// Task is ready - Use the task ID and page token to retrieve a page of results.
Expand All @@ -194,6 +254,23 @@ program: |-
}
)
:
(resp.StatusCode == 503) ?
// 503 Service Unavailable - Clear the task ID and page token, and end the sequence.
state.with(
{
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"want_more": false,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"task_id": null,
"page_token": null,
}
),
}
)
:
(resp.StatusCode == 200) ?
bytes(resp.Body).decode_json().as(body,
(body.data.nextPageToken != "NULL") ?
// Not the last page - Save the next page token and continue.
Expand Down Expand Up @@ -225,8 +302,26 @@ program: |-
}
)
)
:
state.with(
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "POST " + state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query/retrieve: " + (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
)
)
))
tags:
{{#if preserve_original_event}}
- preserve_original_event
Expand Down
Loading