Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set pipeline for bulk request in OpenSearch sink #4965

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

miguel-vila
Copy link

We want to use the same index, with the same data source and compute multiple text embedding fields, each one using a different model. A document in the index would look like this :

{
  "id": "doc-id",
  "passage_text": "foo bar",
  "passage_embedding_model1": [....],
  "passage_embedding_model2": [....]
}

In order to do this we would need multiple pipelines, using the same data source, same target index.

The index would be created without a default ingest pipeline, and instead, we would create one pipeline per model, targetting a different field each:

{
  "text_embedding": {
    "model_id": "<model_id_1>",
    "field_map": {
      "passage_text": "passage_embedding_model1"
    }
  }
}
{
  "text_embedding": {
    "model_id": "<model_id_2>",
    "field_map": {
      "passage_text": "passage_embedding_model2"
    }
  }
}

but the OpenSearch sink for each data-prepper pipeline should:

  • use a different ingest pipeline
  • do an upsert as to not remove the embedding field in case the document was already created by the other pipeline

The bulk endpoint has a pipeline parameter, which I think can be used for this, but I don't think the OpenSearch sink receives a pipeline parameter.

This PR uses the pipeline value when doing that request, but not sure what other changes would be required to support this.

Signed-off-by: miguel-vila <miguelvilag@gmail.com>
Copy link
Member

@sb2k16 sb2k16 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your changes. Please refer to this merged PR (#4609) regarding adding support for pipeline parameter in the sink.

@miguel-vila
Copy link
Author

@sb2k16 thanks! I see that the pipeline parameter is being set for the CREATE and index operations, but not for the UPDATE/UPSERTs. I tried to do the same (something like pipeline.ifPresent(updateOperationBuilder::pipeline);) and it seems UpdateOperation doesn't have a pipeline parameter.

@sb2k16
Copy link
Member

sb2k16 commented Sep 25, 2024

Yes @miguel-vila. The Bulk API update operation does not support the pipeline parameter.

@miguel-vila
Copy link
Author

@sb2k16 I think the _bulk does support it. I ran a test:

First, I upserted a document with a pipeline parameter. The document was created, and the pipeline was used. But doing the same bulk request with a different pipeline (targeting the same document with the same fields) doesn't have any effect (the pipeline doesn't seem to be called).

Is there any way we could do this? our main objective is:

  • to have an index with multiple text embedding fields
  • when we want to add a new text embedding field, we can update all the old documents so they have the new field (whether this is through a new data-prepper pipeline or something else)

@dlvenable
Copy link
Member

Regarding the update action, could we have a simple validation to prevent setting the pipeline when the action is update?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants