diff --git a/.gitignore b/.gitignore index 9c61b6b..25c4ca4 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,7 @@ Manifest.toml /docs/build/ /docs/Manifest.toml debug_out - +envs* # VS Code .vscode/* diff --git a/Project.toml b/Project.toml index 02b35a2..e1ab0cc 100644 --- a/Project.toml +++ b/Project.toml @@ -4,6 +4,7 @@ authors = ["Community contributors"] version = "0.1.0" [deps] +AWS = "fbe9abb3-538b-5e4e-ba9e-bc94f4f92ebc" BytePairEncoding = "a4280ba5-8788-555a-8ca8-4a8c3d966a71" Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" DebugDataWriter = "810e33c6-efd6-4462-86b1-f71ae88af720" diff --git a/README.md b/README.md index d5fbcfc..53ff70a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # GptSearchPlugin -[![](https://img.shields.io/badge/docs-stable-blue.svg)](https://opensesame.github.io/GptSearchPlugin) + +[![](https://img.shields.io/badge/docs-dev-blue.svg)](https://opensesame.github.io/GptSearchPlugin/dev) Simple and pure Julia-based implementation of a GPT retrieval plugin logic diff --git a/docs/make.jl b/docs/make.jl index a1c8507..3901c98 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -11,6 +11,7 @@ makedocs( "Vector storage providers" => [ "Getting Started" => "Providers/index.md", "Elasticsearch" => "Providers/Elasticsearch/index.md", + "OpenSearch" => "Providers/Opensearch/index.md", ], "Internals" => "Internals/index.md" ] diff --git a/docs/src/Providers/Elasticsearch/index.md b/docs/src/Providers/Elasticsearch/index.md index 59b8670..cb382f3 100644 --- a/docs/src/Providers/Elasticsearch/index.md +++ b/docs/src/Providers/Elasticsearch/index.md @@ -5,4 +5,9 @@ Use our [compose-script](docker-compose.yml) and run the following command in a docker-compose up -If you want to configure a cluster [see full instructions](https://www.elastic.co/guide/en/elasticsearch/reference/8.8/docker.html#docker) +If you want to configure a cluster [see full instructions](https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html#docker) + +To enable ElasticSearch use: +```bash +export DATASTORE=ELASTICSEARCH +``` diff --git a/docs/src/Providers/Opensearch/docker-compose.yml b/docs/src/Providers/Opensearch/docker-compose.yml new file mode 100644 index 0000000..a59da28 --- /dev/null +++ b/docs/src/Providers/Opensearch/docker-compose.yml @@ -0,0 +1,12 @@ +version: '3' +services: + opensearch-node: # This is also the hostname of the container within the Docker network (i.e. https://opensearch-node1/) + image: opensearchproject/opensearch:latest # Specifying the latest available image - modify if you want a specific version + container_name: opensearch-node + environment: + - discovery.type=single-node + - plugins.security.disabled=true + ports: + - 9200:9200 # REST API + - 9600:9600 # Performance Analyzer + diff --git a/docs/src/Providers/Opensearch/index.md b/docs/src/Providers/Opensearch/index.md new file mode 100644 index 0000000..8a3b1b2 --- /dev/null +++ b/docs/src/Providers/Opensearch/index.md @@ -0,0 +1,14 @@ +# OpenSearch as a vector storage + +OpenSearch is a search application licensed under Apache 2.0 and available as a cloud solution on AWS. OpenSearch can be installed in [a number of ways](https://opensearch.org/downloads.html). But the easiest way to have it locally without authentication is to use Docker. + +Use our [compose-script](docker-compose.yml) and run the following command in a terminal: + + docker-compose up + +If you want to configure a cluster [see full instructions](https://opensearch.org/docs/latest/install-and-configure/index/) + +To enable OpenSearch use: +```bash +export DATASTORE=OPENSEARCH +``` diff --git a/docs/src/Providers/index.md b/docs/src/Providers/index.md index f0424e9..4172bfc 100644 --- a/docs/src/Providers/index.md +++ b/docs/src/Providers/index.md @@ -1,2 +1,10 @@ # Vector storage service providers +The plugin uses a vector storage to store the calculated embedding vectors and doing approximate search. + +The plugin can use any storage if it provides it: +- Store vectors up to 2k in size (for OpenAI embeddings); +- Add and remove records by a string ID with a vector and metadata; +- Do an approximate vector search with a distance score. + +See the current implementations of the storage interfaces if you want to create a new one. diff --git a/ext/OpenSearchExt/OpenSearchAuth.jl b/ext/OpenSearchExt/OpenSearchAuth.jl new file mode 100644 index 0000000..20a790d --- /dev/null +++ b/ext/OpenSearchExt/OpenSearchAuth.jl @@ -0,0 +1,47 @@ +module OpenSearchAuth + +using HTTP +using AWS +using Dates +using TimeZones + +const AWS_REQUEST_HEADERS = ["Host", "Content-Type", "User-Agent"] +const AWS_AUTH_HEADERS = ["Authorization", "Content-MD5", "x-amz-date", "x-amz-security-token"] + +const ES_SERVICE_NAME = "es" +# TODO: add a mechanism for configuring a region of deployment for the instance +const ES_REGION = "us-west-1" + +const ACCEPT_HEADER_KEY = "Accept" + +function auth_layer(handler) + return function (req; auth_params::AWSCredentials, kw...) + config = AWSConfig(;creds=auth_params, region=ES_REGION) + + headers = Dict{String,String}(req.headers) + # With accept header AWS return error about mismatch signature + accept_header = pop!(headers, ACCEPT_HEADER_KEY, nothing) + + aws_request = AWS.Request( + content=req.body.s, + url=string(req.url), + headers=headers, + api_version="none", + request_method=req.method, + service=ES_SERVICE_NAME + ) + AWS.sign_aws4!(config, aws_request, now(UTC)) + + if !isnothing(accept_header) + aws_request.headers[ACCEPT_HEADER_KEY] = accept_header + end + + req.headers = collect(aws_request.headers) + + return handler(req; kw...) + end +end + +HTTP.@client [auth_layer] + +end diff --git a/ext/OpenSearchExt/OpenSearchExt.jl b/ext/OpenSearchExt/OpenSearchExt.jl new file mode 100644 index 0000000..9eac3ce --- /dev/null +++ b/ext/OpenSearchExt/OpenSearchExt.jl @@ -0,0 +1,308 @@ +module OpenSearchExt + +include("OpenSearchAuth.jl") + +using ElasticsearchClient + +using Mustache +using JSON +using Mocking +using AWS +using Dates + +using ..DataStore +using ..DataStore: AbstractStorage +using ...AppServer +using ...AppServer: + DocumentChunk, DocumentChunkMetadata, + QueryWithEmbedding, QueryResult, + DocumentMetadataFilter, DocumentChunkWithScore, + UpsertResponse + +const DEFAULT_CHUNKS_INDEX_NAME = "gpt_plugin_chunks_knn_index" +const CHUNKS_INDEX_SCHEMA_FILE_PATH = + joinpath(@__DIR__, "es_settings", "chunks_index_schema.json") + +const DEFAULT_KNN_DIMENSION = 1536 +const DEFAULT_KNN_SPACE_TYPE = "cosinesimil" +const DEFAULT_KNN_EF_CONSTRUCTION = 512 +const DEFAULT_KNN_M = 16 +const DEFAULT_KNN_EF_SEARCH = 512 +const DEFAULT_NUM_CANDIDATES = 10_000 + +struct OpensearchStorage <: AbstractStorage + client::ElasticsearchClient.Client + local_storage::Bool + chunks_index_name::AbstractString + + function OpensearchStorage(client::ElasticsearchClient.Client, local_storage::Bool) + index_name = chunks_index_name() + + if !ElasticsearchClient.Indices.exists(client, index=index_name, auth_params=get_auth_params(local_storage)) + index_settings_template = + read(CHUNKS_INDEX_SCHEMA_FILE_PATH) |> + String |> + Mustache.parse + + index_settings = index_settings_template( + dimension=knn_dimension(), + space_type=knn_space_type(), + ef_construction=knn_ef_construction(), + m=knn_m(), + ef_search=knn_ef_search() + ) |> JSON.parse + + try + ElasticsearchClient.Indices.create( + client, + index=index_name, + body=index_settings, + auth_params=get_auth_params(local_storage) + ) + catch e + @error e + end + end + + new(client, local_storage, index_name) + end +end + +chunks_index_name() = get(ENV, "CHUNKS_INDEX_NAME", DEFAULT_CHUNKS_INDEX_NAME) +knn_dimension() = get(ENV, "KNN_DIMENSION", DEFAULT_KNN_DIMENSION) +knn_space_type() = get(ENV, "KNN_SPACE_TYPE", DEFAULT_KNN_SPACE_TYPE) +knn_ef_construction() = get(ENV, "KNN_EF_CONSTRUCTION", DEFAULT_KNN_EF_CONSTRUCTION) +knn_m() = get(ENV, "KNN_M", DEFAULT_KNN_M) +knn_ef_search() = get(ENV, "KNN_EF_SEARCH", DEFAULT_KNN_EF_SEARCH) + +struct AuthParams + creds::AWSCredentials + expires_at::DateTime + + function AuthParams() + # ttl in seconds, 1 hour by default + ttl = get(ENV, "AUTH_PARAMS_TTL", 3600) + + new( + AWSCredentials(), + now() + Second(ttl) + ) + end +end +global CURRENT_AUTH_PARAMS::Union{Nothing,Ref{AWSCredentials}} = nothing + +""" +Takes in a list of list of document chunks and inserts them into the database. + +Return a list of document ids. +""" +function DataStore.upsert( + storage::OpensearchStorage, + chunks::Dict{String,<:AbstractVector{DocumentChunk}} +)::UpsertResponse + index_batch = AbstractDict[] + + for doc_chunks in values(chunks), doc_chunk in doc_chunks + operation_name = :index + operation_body = Dict( + :_id => doc_chunk.id, + :data => Dict( + :text => doc_chunk.text, + :metadata => doc_chunk.metadata, + :embedding => doc_chunk.embedding + ) + ) + + push!(index_batch, Dict(operation_name => operation_body)) + end + + ElasticsearchClient.bulk(storage.client, index=storage.chunks_index_name, body=index_batch, auth_params=get_auth_params(storage)) + ElasticsearchClient.Indices.refresh(storage.client, index=storage.chunks_index_name, auth_params=get_auth_params(storage)) + + UpsertResponse(collect(keys(chunks))) +end + +""" +Takes in a list of queries with embeddings and filters and +returns a list of query results with matching document chunks and scores. +""" +function DataStore.query( + storage::OpensearchStorage, + queries::AbstractVector{QueryWithEmbedding} +)::Vector{QueryResult} + query_tasks = map(query_with_emb -> single_query(storage, query_with_emb), queries) + + map(query_tasks) do (query, query_task) + wait(query_task) + + response = query_task.result + results = + map(response.body["hits"]["hits"]) do hit + source = hit["_source"] + + stored_metadata = source["metadata"] + metadata = DocumentChunkMetadata( + source=get(stored_metadata, "source", nothing), + source_id=get(stored_metadata, "source_id", nothing), + url=get(stored_metadata, "url", nothing), + created_at=get(stored_metadata, "created_at", nothing), + author=get(stored_metadata, "author", nothing), + document_id=get(stored_metadata, "document_id", nothing), + ) + + DocumentChunkWithScore(; + id=hit["_id"], + text=source["text"], + metadata=metadata, + # embedding=source["embedding"], # not required for ChatGPT + score=hit["_score"] + ) + end + + QueryResult(; + query=query.query, + results=results + ) + end +end + +function single_query( + storage::OpensearchStorage, + query::QueryWithEmbedding +)::Tuple{QueryWithEmbedding,Task} + knn_query = Dict( + :knn => Dict( + :embedding => Dict( + :vector => query.embedding, + :k => query.top_k + ) + ) + ) + + full_query = Dict( + :query => Dict( + :bool => Dict( + :must => [knn_query] + ) + ) + ) + + if !isnothing(query.filter) + nested_document_filter = Dict( + :nested => Dict( + :path => :metadata, + :query => Dict( + :term => Dict( + "metadata.document_id" => query.filter.document_id + ) + ) + ) + ) + full_query[:query][:bool][:filter] = [nested_document_filter] + end + + task = @async ElasticsearchClient.search( + storage.client, + index=storage.chunks_index_name, + body=full_query, + auth_params=get_auth_params(storage) + ) + + (query, task) +end + +""" +Removes vectors by ids, filter. +Multiple parameters can be used at once. + +Returns whether the operation was successful. +""" +function DataStore.delete( + storage::OpensearchStorage; + filter::Vector{DocumentMetadataFilter} +)::Bool + document_ids = getproperty.(filter, :document_id) + query = Dict( + :query => Dict( + :nested => Dict( + :path => "metadata", + :query => Dict( + :terms => Dict( + "metadata.document_id" => document_ids + ) + ) + ) + ) + ) + response = ElasticsearchClient.delete_by_query( + storage.client, + index=storage.chunks_index_name, + body=query, + auth_params=get_auth_params(storage) + ) + + response.status == 200 +end + +""" +Removes everything in the datastore + +Returns whether the operation was successful. +""" +function DataStore.delete_all(storage::OpensearchStorage)::Bool + query = Dict(:query => Dict(:match_all => Dict())) + + response = ElasticsearchClient.delete_by_query( + storage.client, + index=storage.chunks_index_name, + body=query, + auth_params=get_auth_params(storage) + ) + + response.status == 200 +end + +function create_storage() + local_storage = get(ENV, "LOCAL_STORAGE", true) + if local_storage + OpensearchStorage( + ElasticsearchClient.Client( + host=( + host=get(ENV, "ES_HOST", "localhost"), + port=get(ENV, "ES_PORT", "9200") |> str -> parse(Int16, str), + scheme="http" + ) + ), + local_storage + ) + else + es_host = get(ENV, "ES_HOST", nothing) + + isnothing(es_host) && throw(ArgumentError("Environment variable ES_HOST must be set")) + + OpensearchStorage( + ElasticsearchClient.Client( + host=(host=es_host, port=443, scheme="https"), + http_client=OpenSearchAuth + ), + local_storage + ) + end +end + +function get_auth_params(local_storage::Bool) + local_storage && return + + if isnothing(CURRENT_AUTH_PARAMS) || now() > CURRENT_AUTH_PARAMS.x.expires_at + refresh_auth_params() + end + + CURRENT_AUTH_PARAMS.x +end +get_auth_params(storage::OpensearchStorage) = get_auth_params(storage.local_storage) + +function refresh_auth_params() + global CURRENT_AUTH_PARAMS = Ref(AuthParams()) +end + +end diff --git a/ext/OpenSearchExt/es_settings/chunks_index_schema.json b/ext/OpenSearchExt/es_settings/chunks_index_schema.json new file mode 100644 index 0000000..92263aa --- /dev/null +++ b/ext/OpenSearchExt/es_settings/chunks_index_schema.json @@ -0,0 +1,36 @@ +{ + "settings": { + "index": { + "knn": true, + "knn.algo_param.ef_search": {{:ef_search}} + } + }, + "mappings": { + "properties": { + "text": { "type": "text" }, + "metadata": { + "type": "nested", + "properties": { + "document_id": { "type": "keyword" }, + "source_id": { "type": "keyword" }, + "author": { "type": "text" }, + "url": { "type": "text" }, + "created_at": { "type": "keyword" } + } + }, + "embedding": { + "type": "knn_vector", + "dimension": {{:dimension}}, + "method": { + "name": "hnsw", + "space_type": "{{:space_type}}", + "engine": "nmslib", + "parameters": { + "ef_construction": {{:ef_construction}}, + "m": {{:m}} + } + } + } + } + } +} diff --git a/ext_test/test_opensearch_ext.jl b/ext_test/test_opensearch_ext.jl new file mode 100644 index 0000000..79d5b32 --- /dev/null +++ b/ext_test/test_opensearch_ext.jl @@ -0,0 +1,75 @@ +test_index_name = "test_gpt_plugin_chunks_index" + +ENV["DATASTORE"] = "opensearch" +ENV["CHUNKS_INDEX_NAME"] = test_index_name +ENV["KNN_DIMENSION"] = 10 + +using ElasticsearchClient +using TimeZones +using Dates +using AWS +using GptSearchPlugin +using GptSearchPlugin.AppServer.DataStore +using Test +using Mocking +using OpenAI: create_embeddings +using HTTP + +Mocking.activate() + +total_docs_in_index(storage) = + ElasticsearchClient.search( + storage.client, + index=test_index_name + ).body["hits"]["total"]["value"] + +storage = DataStore.STORAGE + +DataStore.delete_all(storage) +@test total_docs_in_index(storage) == 0 + +text = "Respond with a JSON containing the extracted metadata in key value pairs." + +build_embeddings_mock(text_vectors) = ( + status=200, + response=Dict("data" => map(_ -> Dict("embedding" => rand(10)), text_vectors)) +) +chunk_size = 4 +doc = GptSearchPlugin.AppServer.Document(text=text) +patch = @patch create_embeddings(api_key::String, text_vectors) = build_embeddings_mock(text_vectors) +arr_chunks = apply(patch) do + GptSearchPlugin.AppServer.get_document_chunks(repeat([doc], 10), chunk_size) +end +chunks_count = values(arr_chunks) |> Iterators.flatten |> collect |> length + +document_ids = DataStore.upsert( + storage, + arr_chunks +) + +@test length(document_ids.ids) == length(arr_chunks) +@test all(in(document_ids.ids), keys(arr_chunks)) +@test total_docs_in_index(storage) == chunks_count + +query_with_emb = GptSearchPlugin.AppServer.QueryWithEmbedding( + query = "Some query", + embedding = rand(10) +) + +query_results = DataStore.query( + storage, + [query_with_emb] +) +@test length(first(query_results).results) == query_with_emb.top_k +@test first(query_results).query == query_with_emb.query + +doc_ids_for_delete = rand(document_ids.ids, 2) + +@test DataStore.delete( + storage, + filter=map( + doc_id -> GptSearchPlugin.AppServer.DocumentMetadataFilter(document_id=doc_id), + doc_ids_for_delete + ) +) +@test total_docs_in_index(storage) == chunks_count - 10 diff --git a/src/datastore/factory.jl b/src/datastore/factory.jl index 8f45be6..4ab7487 100644 --- a/src/datastore/factory.jl +++ b/src/datastore/factory.jl @@ -15,6 +15,11 @@ global DATASTORE_MODULE = let include("../../ext/ElasticsearchClientExt/ElasticsearchClientExt.jl") ElasticsearchClientExt + elseif isequal(datastore, "OPENSEARCH") + @info "Pluging OpenSearchExt" + include("../../ext/OpenSearchExt/OpenSearchExt.jl") + + OpenSearchExt elseif isequal(datastore, "TEST") TestStorageExt else