Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] bulk_size is underestimated #2954

Closed
kartg opened this issue Jun 29, 2023 · 2 comments
Closed

[BUG] bulk_size is underestimated #2954

kartg opened this issue Jun 29, 2023 · 2 comments
Assignees
Labels
bug Something isn't working

Comments

@kartg
Copy link
Member

kartg commented Jun 29, 2023

Describe the bug
My Data Prepper pipeline seems to be sending much larger requests that the bulk_size configured in the pipeline. Here's the relevant snippet of my pipeline configuration:

....
    processor:
        - add_entries:
            entries:
                - key: "document_id"
                  value_expression: "getMetadata(\"document_id\")"
                - key: "index"
                  value_expression: "getMetadata(\"index\")"

    sink:
        - opensearch:
            hosts: ["<redacted>"]
            username: "<redacted>"
            password: "<redacted>"
            index: "${index}"
            document_id_field: "document_id"
            bulk_size: 4

Running Data Prepper with this pipeline results in an exception that suggests that the size of the bulk request is much larger that what is configured:

...
WARN  org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy - Bulk Operation Failed. Number of retries 5. Retrying... 
org.opensearch.client.ResponseException: method [POST], host [<redacted>], URI [/_bulk], status line [HTTP/1.1 413 Request Entity Too Large]
{"Message":"Request size exceeded 10485760 bytes"}
    at org.opensearch.client.RestClient.convertResponse(RestClient.java:375) ~[opensearch-rest-client-2.7.0.jar:?]
    at org.opensearch.client.RestClient.performRequest(RestClient.java:345) ~[opensearch-rest-client-2.7.0.jar:?]
    at org.opensearch.client.RestClient.performRequest(RestClient.java:320) ~[opensearch-rest-client-2.7.0.jar:?]
    at org.opensearch.client.transport.rest_client.RestClientTransport.performRequest(RestClientTransport.java:143) ~[opensearch-java-2.5.0.jar:?]
    at org.opensearch.client.opensearch.OpenSearchClient.bulk(OpenSearchClient.java:217) ~[opensearch-java-2.5.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.lambda$doInitializeInternal$1(OpenSearchSink.java:202) ~[opensearch-2.4.0-SNAPSHOT.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:267) ~[opensearch-2.4.0-SNAPSHOT.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.execute(BulkRetryStrategy.java:191) ~[opensearch-2.4.0-SNAPSHOT.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.lambda$flushBatch$6(OpenSearchSink.java:319) ~[opensearch-2.4.0-SNAPSHOT.jar:?]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) ~[micrometer-core-1.10.5.jar:1.10.5]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.flushBatch(OpenSearchSink.java:316) ~[opensearch-2.4.0-SNAPSHOT.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doOutput(OpenSearchSink.java:288) ~[opensearch-2.4.0-SNAPSHOT.jar:?]
    at org.opensearch.dataprepper.model.sink.AbstractSink.lambda$output$0(AbstractSink.java:64) ~[data-prepper-api-2.4.0-SNAPSHOT.jar:?]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) ~[micrometer-core-1.10.5.jar:1.10.5]
    at org.opensearch.dataprepper.model.sink.AbstractSink.output(AbstractSink.java:64) ~[data-prepper-api-2.4.0-SNAPSHOT.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.lambda$publishToSinks$5(Pipeline.java:336) ~[data-prepper-core-2.4.0-SNAPSHOT.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
    at java.lang.Thread.run(Thread.java:833) [?:?]

This seems to be the opposite of #2852

To Reproduce
See description above

Expected behavior
Size of the bulk request should be equal or under to the configured bulk_size value.

Screenshots
N/A

Environment (please complete the following information):

  • OS: MacOS 12.6.6
  • Version: Data Prepper 2.4.0-SNAPSHOT

Additional context
N/A

@kartg kartg added bug Something isn't working untriaged labels Jun 29, 2023
@engechas
Copy link
Collaborator

We recently changed the bulk_size estimation logic to factor in compression to solve #2852. The estimation logic is crude in the current state and needs some iteration to be more accurate. I am planning to pick up this work soon.

I am surprised to see the estimation off by more than a factor of 2.5. I have seen the estimation within 1-1.5x the bulk_size in my testing. Could you provide more details about the sink you were using and the workload?

@kartg
Copy link
Member Author

kartg commented Jul 5, 2023

Could you provide more details about the sink you were using and the workload?

  • My sink is an AWS OpenSearch cluster running OpenSearch 2.5.
  • The dataset i'm trying to process is 2% of the StackOverflow OpenSearch Benchmarks workload
    • the document count from my _cat/indices output is for this index is 760000
  • My source is a single-node OpenSearch 1.3 cluster running on an EC2 instance
  • I'm using the newly minted OpenSearch source plugin in my Data Prepper pipeline

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Archived in project
Development

No branches or pull requests

3 participants