Skip to content

add OpenSearchExt #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ Manifest.toml
/docs/build/
/docs/Manifest.toml
debug_out

envs*

# VS Code
.vscode/*
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ authors = ["Community contributors"]
version = "0.1.0"

[deps]
AWS = "fbe9abb3-538b-5e4e-ba9e-bc94f4f92ebc"
BytePairEncoding = "a4280ba5-8788-555a-8ca8-4a8c3d966a71"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
DebugDataWriter = "810e33c6-efd6-4462-86b1-f71ae88af720"
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# GptSearchPlugin

[![](https://img.shields.io/badge/docs-stable-blue.svg)](https://opensesame.github.io/GptSearchPlugin)
<!-- [![](https://img.shields.io/badge/docs-stable-blue.svg)](https://opensesame.github.io/GptSearchPlugin) -->
[![](https://img.shields.io/badge/docs-dev-blue.svg)](https://opensesame.github.io/GptSearchPlugin/dev)

Simple and pure Julia-based implementation of a GPT retrieval plugin logic

1 change: 1 addition & 0 deletions docs/make.jl
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ makedocs(
"Vector storage providers" => [
"Getting Started" => "Providers/index.md",
"Elasticsearch" => "Providers/Elasticsearch/index.md",
"OpenSearch" => "Providers/Opensearch/index.md",
],
"Internals" => "Internals/index.md"
]
7 changes: 6 additions & 1 deletion docs/src/Providers/Elasticsearch/index.md
Original file line number Diff line number Diff line change
@@ -5,4 +5,9 @@ Use our [compose-script](docker-compose.yml) and run the following command in a

docker-compose up

If you want to configure a cluster [see full instructions](https://www.elastic.co/guide/en/elasticsearch/reference/8.8/docker.html#docker)
If you want to configure a cluster [see full instructions](https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html#docker)

To enable ElasticSearch use:
```bash
export DATASTORE=ELASTICSEARCH
```
12 changes: 12 additions & 0 deletions docs/src/Providers/Opensearch/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: '3'
services:
opensearch-node: # This is also the hostname of the container within the Docker network (i.e. https://opensearch-node1/)
image: opensearchproject/opensearch:latest # Specifying the latest available image - modify if you want a specific version
container_name: opensearch-node
environment:
- discovery.type=single-node
- plugins.security.disabled=true
ports:
- 9200:9200 # REST API
- 9600:9600 # Performance Analyzer

14 changes: 14 additions & 0 deletions docs/src/Providers/Opensearch/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# OpenSearch as a vector storage

OpenSearch is a search application licensed under Apache 2.0 and available as a cloud solution on AWS. OpenSearch can be installed in [a number of ways](https://opensearch.org/downloads.html). But the easiest way to have it locally without authentication is to use Docker.

Use our [compose-script](docker-compose.yml) and run the following command in a terminal:

docker-compose up

If you want to configure a cluster [see full instructions](https://opensearch.org/docs/latest/install-and-configure/index/)

To enable OpenSearch use:
```bash
export DATASTORE=OPENSEARCH
```
8 changes: 8 additions & 0 deletions docs/src/Providers/index.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
# Vector storage service providers

The plugin uses a vector storage to store the calculated embedding vectors and doing approximate search.

The plugin can use any storage if it provides it:
- Store vectors up to 2k in size (for OpenAI embeddings);
- Add and remove records by a string ID with a vector and metadata;
- Do an approximate vector search with a distance score.

See the current implementations of the storage interfaces if you want to create a new one.
47 changes: 47 additions & 0 deletions ext/OpenSearchExt/OpenSearchAuth.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
module OpenSearchAuth

using HTTP
using AWS
using Dates
using TimeZones

const AWS_REQUEST_HEADERS = ["Host", "Content-Type", "User-Agent"]
const AWS_AUTH_HEADERS = ["Authorization", "Content-MD5", "x-amz-date", "x-amz-security-token"]

const ES_SERVICE_NAME = "es"
# TODO: add a mechanism for configuring a region of deployment for the instance
const ES_REGION = "us-west-1"

const ACCEPT_HEADER_KEY = "Accept"

function auth_layer(handler)
return function (req; auth_params::AWSCredentials, kw...)
config = AWSConfig(;creds=auth_params, region=ES_REGION)

headers = Dict{String,String}(req.headers)
# With accept header AWS return error about mismatch signature
accept_header = pop!(headers, ACCEPT_HEADER_KEY, nothing)

aws_request = AWS.Request(
content=req.body.s,
url=string(req.url),
headers=headers,
api_version="none",
request_method=req.method,
service=ES_SERVICE_NAME
)
AWS.sign_aws4!(config, aws_request, now(UTC))

if !isnothing(accept_header)
aws_request.headers[ACCEPT_HEADER_KEY] = accept_header
end

req.headers = collect(aws_request.headers)

return handler(req; kw...)
end
end

HTTP.@client [auth_layer]

end
308 changes: 308 additions & 0 deletions ext/OpenSearchExt/OpenSearchExt.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
module OpenSearchExt

include("OpenSearchAuth.jl")

using ElasticsearchClient

using Mustache
using JSON
using Mocking
using AWS
using Dates

using ..DataStore
using ..DataStore: AbstractStorage
using ...AppServer
using ...AppServer:
DocumentChunk, DocumentChunkMetadata,
QueryWithEmbedding, QueryResult,
DocumentMetadataFilter, DocumentChunkWithScore,
UpsertResponse

const DEFAULT_CHUNKS_INDEX_NAME = "gpt_plugin_chunks_knn_index"
const CHUNKS_INDEX_SCHEMA_FILE_PATH =
joinpath(@__DIR__, "es_settings", "chunks_index_schema.json")

const DEFAULT_KNN_DIMENSION = 1536
const DEFAULT_KNN_SPACE_TYPE = "cosinesimil"
const DEFAULT_KNN_EF_CONSTRUCTION = 512
const DEFAULT_KNN_M = 16
const DEFAULT_KNN_EF_SEARCH = 512
const DEFAULT_NUM_CANDIDATES = 10_000

struct OpensearchStorage <: AbstractStorage
client::ElasticsearchClient.Client
local_storage::Bool
chunks_index_name::AbstractString

function OpensearchStorage(client::ElasticsearchClient.Client, local_storage::Bool)
index_name = chunks_index_name()

if !ElasticsearchClient.Indices.exists(client, index=index_name, auth_params=get_auth_params(local_storage))
index_settings_template =
read(CHUNKS_INDEX_SCHEMA_FILE_PATH) |>
String |>
Mustache.parse

index_settings = index_settings_template(
dimension=knn_dimension(),
space_type=knn_space_type(),
ef_construction=knn_ef_construction(),
m=knn_m(),
ef_search=knn_ef_search()
) |> JSON.parse

try
ElasticsearchClient.Indices.create(
client,
index=index_name,
body=index_settings,
auth_params=get_auth_params(local_storage)
)
catch e
@error e
end
end

new(client, local_storage, index_name)
end
end

chunks_index_name() = get(ENV, "CHUNKS_INDEX_NAME", DEFAULT_CHUNKS_INDEX_NAME)
knn_dimension() = get(ENV, "KNN_DIMENSION", DEFAULT_KNN_DIMENSION)
knn_space_type() = get(ENV, "KNN_SPACE_TYPE", DEFAULT_KNN_SPACE_TYPE)
knn_ef_construction() = get(ENV, "KNN_EF_CONSTRUCTION", DEFAULT_KNN_EF_CONSTRUCTION)
knn_m() = get(ENV, "KNN_M", DEFAULT_KNN_M)
knn_ef_search() = get(ENV, "KNN_EF_SEARCH", DEFAULT_KNN_EF_SEARCH)

struct AuthParams
creds::AWSCredentials
expires_at::DateTime

function AuthParams()
# ttl in seconds, 1 hour by default
ttl = get(ENV, "AUTH_PARAMS_TTL", 3600)

new(
AWSCredentials(),
now() + Second(ttl)
)
end
end
global CURRENT_AUTH_PARAMS::Union{Nothing,Ref{AWSCredentials}} = nothing

"""
Takes in a list of list of document chunks and inserts them into the database.
Return a list of document ids.
"""
function DataStore.upsert(
storage::OpensearchStorage,
chunks::Dict{String,<:AbstractVector{DocumentChunk}}
)::UpsertResponse
index_batch = AbstractDict[]

for doc_chunks in values(chunks), doc_chunk in doc_chunks
operation_name = :index
operation_body = Dict(
:_id => doc_chunk.id,
:data => Dict(
:text => doc_chunk.text,
:metadata => doc_chunk.metadata,
:embedding => doc_chunk.embedding
)
)

push!(index_batch, Dict(operation_name => operation_body))
end

ElasticsearchClient.bulk(storage.client, index=storage.chunks_index_name, body=index_batch, auth_params=get_auth_params(storage))
ElasticsearchClient.Indices.refresh(storage.client, index=storage.chunks_index_name, auth_params=get_auth_params(storage))

UpsertResponse(collect(keys(chunks)))
end

"""
Takes in a list of queries with embeddings and filters and
returns a list of query results with matching document chunks and scores.
"""
function DataStore.query(
storage::OpensearchStorage,
queries::AbstractVector{QueryWithEmbedding}
)::Vector{QueryResult}
query_tasks = map(query_with_emb -> single_query(storage, query_with_emb), queries)

map(query_tasks) do (query, query_task)
wait(query_task)

response = query_task.result
results =
map(response.body["hits"]["hits"]) do hit
source = hit["_source"]

stored_metadata = source["metadata"]
metadata = DocumentChunkMetadata(
source=get(stored_metadata, "source", nothing),
source_id=get(stored_metadata, "source_id", nothing),
url=get(stored_metadata, "url", nothing),
created_at=get(stored_metadata, "created_at", nothing),
author=get(stored_metadata, "author", nothing),
document_id=get(stored_metadata, "document_id", nothing),
)

DocumentChunkWithScore(;
id=hit["_id"],
text=source["text"],
metadata=metadata,
# embedding=source["embedding"], # not required for ChatGPT
score=hit["_score"]
)
end

QueryResult(;
query=query.query,
results=results
)
end
end

function single_query(
storage::OpensearchStorage,
query::QueryWithEmbedding
)::Tuple{QueryWithEmbedding,Task}
knn_query = Dict(
:knn => Dict(
:embedding => Dict(
:vector => query.embedding,
:k => query.top_k
)
)
)

full_query = Dict(
:query => Dict(
:bool => Dict(
:must => [knn_query]
)
)
)

if !isnothing(query.filter)
nested_document_filter = Dict(
:nested => Dict(
:path => :metadata,
:query => Dict(
:term => Dict(
"metadata.document_id" => query.filter.document_id
)
)
)
)
full_query[:query][:bool][:filter] = [nested_document_filter]
end

task = @async ElasticsearchClient.search(
storage.client,
index=storage.chunks_index_name,
body=full_query,
auth_params=get_auth_params(storage)
)

(query, task)
end

"""
Removes vectors by ids, filter.
Multiple parameters can be used at once.
Returns whether the operation was successful.
"""
function DataStore.delete(
storage::OpensearchStorage;
filter::Vector{DocumentMetadataFilter}
)::Bool
document_ids = getproperty.(filter, :document_id)
query = Dict(
:query => Dict(
:nested => Dict(
:path => "metadata",
:query => Dict(
:terms => Dict(
"metadata.document_id" => document_ids
)
)
)
)
)
response = ElasticsearchClient.delete_by_query(
storage.client,
index=storage.chunks_index_name,
body=query,
auth_params=get_auth_params(storage)
)

response.status == 200
end

"""
Removes everything in the datastore
Returns whether the operation was successful.
"""
function DataStore.delete_all(storage::OpensearchStorage)::Bool
query = Dict(:query => Dict(:match_all => Dict()))

response = ElasticsearchClient.delete_by_query(
storage.client,
index=storage.chunks_index_name,
body=query,
auth_params=get_auth_params(storage)
)

response.status == 200
end

function create_storage()
local_storage = get(ENV, "LOCAL_STORAGE", true)
if local_storage
OpensearchStorage(
ElasticsearchClient.Client(
host=(
host=get(ENV, "ES_HOST", "localhost"),
port=get(ENV, "ES_PORT", "9200") |> str -> parse(Int16, str),
scheme="http"
)
),
local_storage
)
else
es_host = get(ENV, "ES_HOST", nothing)

isnothing(es_host) && throw(ArgumentError("Environment variable ES_HOST must be set"))

OpensearchStorage(
ElasticsearchClient.Client(
host=(host=es_host, port=443, scheme="https"),
http_client=OpenSearchAuth
),
local_storage
)
end
end

function get_auth_params(local_storage::Bool)
local_storage && return

if isnothing(CURRENT_AUTH_PARAMS) || now() > CURRENT_AUTH_PARAMS.x.expires_at
refresh_auth_params()
end

CURRENT_AUTH_PARAMS.x
end
get_auth_params(storage::OpensearchStorage) = get_auth_params(storage.local_storage)

function refresh_auth_params()
global CURRENT_AUTH_PARAMS = Ref(AuthParams())
end

end
36 changes: 36 additions & 0 deletions ext/OpenSearchExt/es_settings/chunks_index_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"settings": {
"index": {
"knn": true,
"knn.algo_param.ef_search": {{:ef_search}}
}
},
"mappings": {
"properties": {
"text": { "type": "text" },
"metadata": {
"type": "nested",
"properties": {
"document_id": { "type": "keyword" },
"source_id": { "type": "keyword" },
"author": { "type": "text" },
"url": { "type": "text" },
"created_at": { "type": "keyword" }
}
},
"embedding": {
"type": "knn_vector",
"dimension": {{:dimension}},
"method": {
"name": "hnsw",
"space_type": "{{:space_type}}",
"engine": "nmslib",
"parameters": {
"ef_construction": {{:ef_construction}},
"m": {{:m}}
}
}
}
}
}
}
75 changes: 75 additions & 0 deletions ext_test/test_opensearch_ext.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
test_index_name = "test_gpt_plugin_chunks_index"

ENV["DATASTORE"] = "opensearch"
ENV["CHUNKS_INDEX_NAME"] = test_index_name
ENV["KNN_DIMENSION"] = 10

using ElasticsearchClient
using TimeZones
using Dates
using AWS
using GptSearchPlugin
using GptSearchPlugin.AppServer.DataStore
using Test
using Mocking
using OpenAI: create_embeddings
using HTTP

Mocking.activate()

total_docs_in_index(storage) =
ElasticsearchClient.search(
storage.client,
index=test_index_name
).body["hits"]["total"]["value"]

storage = DataStore.STORAGE

DataStore.delete_all(storage)
@test total_docs_in_index(storage) == 0

text = "Respond with a JSON containing the extracted metadata in key value pairs."

build_embeddings_mock(text_vectors) = (
status=200,
response=Dict("data" => map(_ -> Dict("embedding" => rand(10)), text_vectors))
)
chunk_size = 4
doc = GptSearchPlugin.AppServer.Document(text=text)
patch = @patch create_embeddings(api_key::String, text_vectors) = build_embeddings_mock(text_vectors)
arr_chunks = apply(patch) do
GptSearchPlugin.AppServer.get_document_chunks(repeat([doc], 10), chunk_size)
end
chunks_count = values(arr_chunks) |> Iterators.flatten |> collect |> length

document_ids = DataStore.upsert(
storage,
arr_chunks
)

@test length(document_ids.ids) == length(arr_chunks)
@test all(in(document_ids.ids), keys(arr_chunks))
@test total_docs_in_index(storage) == chunks_count

query_with_emb = GptSearchPlugin.AppServer.QueryWithEmbedding(
query = "Some query",
embedding = rand(10)
)

query_results = DataStore.query(
storage,
[query_with_emb]
)
@test length(first(query_results).results) == query_with_emb.top_k
@test first(query_results).query == query_with_emb.query

doc_ids_for_delete = rand(document_ids.ids, 2)

@test DataStore.delete(
storage,
filter=map(
doc_id -> GptSearchPlugin.AppServer.DocumentMetadataFilter(document_id=doc_id),
doc_ids_for_delete
)
)
@test total_docs_in_index(storage) == chunks_count - 10
5 changes: 5 additions & 0 deletions src/datastore/factory.jl
Original file line number Diff line number Diff line change
@@ -15,6 +15,11 @@ global DATASTORE_MODULE = let
include("../../ext/ElasticsearchClientExt/ElasticsearchClientExt.jl")

ElasticsearchClientExt
elseif isequal(datastore, "OPENSEARCH")
@info "Pluging OpenSearchExt"
include("../../ext/OpenSearchExt/OpenSearchExt.jl")

OpenSearchExt
elseif isequal(datastore, "TEST")
TestStorageExt
else