Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] Allow the OpenSearch source plugin to shut down the Data Prepper pipeline #2944

Open
kartg opened this issue Jun 27, 2023 · 6 comments
Labels
enhancement New feature or request

Comments

@kartg
Copy link
Member

kartg commented Jun 27, 2023

Is your feature request related to a problem? Please describe.
Hello! We're working on changing the opensearch-migrations tooling so our data migration implementation uses Data Prepper instead of Logstash. We will be leveraging the newly minted OpenSearch/ElasticSearch source plugin for this.

Since this is a pull-based plugin, there is a finite set of data that needs to be ingested. Once all of the data has been processed, our expectation is that the Data Prepper pipeline would shut itself down based on a signal from the source plugin. This is similar to how pull-based plugins function in Logstash.

However, Data Prepper does not currently operate this way. The pipeline/process continues to stay alive (though the source plugin is not pulling any more data) until the caller terminates it or shuts it down via APIs.

Describe the solution you'd like
Once a pull-based source plugin has completed ingesting all data, it should signal to the Data Prepper pipeline, and the pipeline should shut itself down.

Describe alternatives you've considered (Optional)
An alternative approach would be to have the pipeline / source plugin signal externally (to the caller) that all data has been processed. The caller can then invoke the Data Prepper shutdown API to stop the process.

Additional context
N/A

@graytaylor0
Copy link
Member

graytaylor0 commented Jun 27, 2023

Thanks for creating this issue @kartg!

While the implementation to shut down pipelines from notifications from sources would be abstracted from the opensearch source, as far as the OpenSearch source is concerned, what do you think the appropriate behavior would be?

My proposal would be to have a configurable value for the number of consecutive times the OpenSearch source has attempted to acquire an index with nothing being returned. The current behavior is to wait 30 seconds if no indices are found before attempting to acquire an index again, which would also run the partition supplier to pick up new indices. The location where this happens is here (

).

The config could be added to the scheduling config (naming needs some work on the parameter)

scheduling:
   shutdown_after_no_indices_found_count: 3

This would mean that if the OpenSearch source does not get any indices to process after 3 attempts (roughly 1 min 30 seconds), then it would issue a shutdown of the pipeline with whatever mechanism is supported for that.

We could also consider nesting the shutdown for future additions, such as

scheduling:
   shutdown:
       after_no_indices_found_count: 3
       # conditionally shut down with conditional expressions
       when: "/some_key == DONE"
       # shut down at a certain time
       shutdown_time: "2023-06-28T22:01:30.00Z"

@asifsmohammed asifsmohammed added enhancement New feature or request and removed untriaged labels Jun 28, 2023
@kartg
Copy link
Member Author

kartg commented Jun 28, 2023

My proposal would be to have a configurable value for the number of consecutive times the OpenSearch source has attempted to acquire an index with nothing being returned.

@graytaylor0 if i understand this correctly, this would require the source indices to have a field that can be leveraged to sort temporally, so the approach cannot be applied more generally.

For example, consider an index that holds alphabets - (a, c, d). It's reasonable to assume that such documents would be sorted alphabetically. At a later point, if b is then inserted into the index, there is no query that would only return b, so it wouldn't be possible with fetch the index with nothing being returned.

A version of this to consider is a simple run count where the pipeline is shutdown after the OpenSearch source has queried its cluster a certain number of times:

scheduling:
   shutdown:
       after_query_count: 3

I do think that after_no_indices_found_count is a good idea, as are the other ideas you've listed (when and shutdown_time). It may just be easier to implement the simplest solution first. Wdyt?

@graytaylor0
Copy link
Member

graytaylor0 commented Jun 30, 2023

For example, consider an index that holds alphabets - (a, c, d). It's reasonable to assume that such documents would be sorted alphabetically. At a later point, if b is then inserted into the index, there is no query that would only return b, so it wouldn't be possible with fetch the index with nothing being returned.

@kartg I’m not sure I understand what you mean here. The after_no_indices_found_count has nothing to do with the data in the indices. I think this is the best general indicator that nothing else needs to be processed

A version of this to consider is a simple run count where the pipeline is shutdown after the OpenSearch source has queried its cluster a certain number of times:

As in put a limit on the number of indices a source node can process before shutting down?

@kartg
Copy link
Member Author

kartg commented Jul 5, 2023

The after_no_indices_found_count has nothing to do with the data in the indices. I think this is the best general indicator that nothing else needs to be processed

Ah, my bad - i misunderstood the phrase "acquire an index with nothing being returned" to as referring to data within the index rather than the index itself. @graytaylor0 for my understanding (and after looking at the code you've linked above) - when would the SourceCoordinator return nothing? If some indices are already processed, are these filtered out by state maintained in the SourceCoordinationStore ?

@graytaylor0
Copy link
Member

for my understanding (and after looking at the code you've linked above) - when would the SourceCoordinator return nothing? If some indices are already processed, are these filtered out by state maintained in the SourceCoordinationStore ?

In lines 144-159 here (

ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT);
) is where nothing would be returned. First a call to attempt to acquire a partition is made. If this returns empty, which means that either all the indices are COMPLETED, ASSIGNED without a partition ownership timeout being reached, or CLOSED without the reOpenAt timestamp being reached, then nothing is returned. If nothing is returned on the first call to tryAcquireAvailablePartition, the supplier is run (which would create new partitions if new indices were created in the source cluster since the last time the supplier was run). After the supplier is run, another call to tryAcquireAvailablePartition is made, and if there is still no available partition, then the method returns empty to the source that calls getNextPartition.

@dlvenable
Copy link
Member

I like this idea. But, let's be sure that the source only shutdown it's own pipeline. Data Prepper core has support for shutting down the entire Data Prepper application when either a single pipeline terminates or after all pipelines terminate. So depending on the configuration, this may or may not shutdown Data Prepper entirely.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Development

No branches or pull requests

4 participants