From b1b71b748ff3a8436249ed90041bf1666fb9bbee Mon Sep 17 00:00:00 2001 From: Darshan Lukhi Date: Mon, 17 Jun 2024 19:56:28 +0530 Subject: [PATCH 1/2] Add support for native queries and aggregation by nested fields --- .gitignore | 2 +- CHANGELOG.md | 13 + README.md | 161 +++-- cli/cli.go | 54 +- cli/initialize.go | 36 +- cli/update.go | 60 +- cli/upgrade.go | 38 ++ cli/validate.go | 80 ++- connector/aggregate.go | 191 ++++-- connector/arguments.go | 70 ++ connector/configuration.go | 92 +++ connector/connector.go | 29 +- connector/connector_test.go | 20 + connector/filter.go | 22 +- connector/native_query.go | 145 ++++ connector/query.go | 98 +-- connector/response.go | 15 +- connector/schema.go | 97 ++- connector/select.go | 45 +- connector/sort.go | 32 +- connector/static_types.go | 10 +- connector/variables.go | 12 +- docker-compose.yaml | 4 +- docs/configuration.md | 96 +++ docs/index.md | 1 + docs/limitations.md | 3 +- go.mod | 22 +- go.sum | 44 +- internal/utils.go | 65 ++ resources/configuration.json | 309 ++++----- resources/prometheus/prometheus.yaml | 4 +- testdata/capabilities.json | 5 +- testdata/configuration.json | 619 ++++++++++-------- testdata/native_queries/aggregate.json | 14 + testdata/native_queries/range.json | 10 + .../aggregation/native_query_request.json | 22 + .../aggregation/native_query_response.json | 16 + .../aggregation/nested_type_request.json | 15 + .../aggregation/nested_type_response.json | 7 + .../aggregation/object_type_request.json | 15 + .../aggregation/object_type_response.json | 7 + .../query/filter/native_query_request.json | 55 ++ .../query/filter/native_query_response.json | 14 + testdata/schema.json | 78 ++- types/types.go | 28 +- 45 files changed, 2030 insertions(+), 745 deletions(-) create mode 100644 cli/upgrade.go create mode 100644 connector/arguments.go create mode 100644 connector/configuration.go create mode 100644 connector/native_query.go create mode 100644 docs/configuration.md create mode 100644 testdata/native_queries/aggregate.json create mode 100644 testdata/native_queries/range.json create mode 100644 testdata/query/aggregation/native_query_request.json create mode 100644 testdata/query/aggregation/native_query_response.json create mode 100644 testdata/query/aggregation/nested_type_request.json create mode 100644 testdata/query/aggregation/nested_type_response.json create mode 100644 testdata/query/aggregation/object_type_request.json create mode 100644 testdata/query/aggregation/object_type_response.json create mode 100644 testdata/query/filter/native_query_request.json create mode 100644 testdata/query/filter/native_query_response.json diff --git a/.gitignore b/.gitignore index 4b9e912..5e93211 100644 --- a/.gitignore +++ b/.gitignore @@ -26,4 +26,4 @@ go.work.sum # codegen assets .codegen/ -ndc-elasticsearch \ No newline at end of file +ndc-elasticsearch diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fdbd28..68acd94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.2.0] + +### Added + +- Support for native queries. +- Support ndc-spec v0.1.4 and aggregate by nested fields. + +### Changed + +- Configuration structure to be compatible with the latest connector version. + ## [0.1.1] +### Fixed + - Fixed the configuration directory environment variable in the CLI. - Handled null values for nested fields in the response. diff --git a/README.md b/README.md index 1c9971a..e1b85de 100644 --- a/README.md +++ b/README.md @@ -18,11 +18,9 @@ This connector is built using the [Go Data Connector SDK](https://github.com/has Below, you'll find a matrix of all supported features for the Elasticsearch connector: - - | Feature | Supported | Notes | | ------------------------------- | --------- | ----- | -| Native Queries + Logical Models | ❌ | | +| Native Queries + Logical Models | ✅ | | | Simple Object Query | ✅ | | | Filter / Search | ✅ | | | Simple Aggregation | ✅ | | @@ -39,65 +37,31 @@ Below, you'll find a matrix of all supported features for the Elasticsearch conn 1. Create a [Hasura Cloud account](https://console.hasura.io) 2. Install the [CLI](https://hasura.io/docs/3.0/cli/installation/) -3. [Create a project](https://hasura.io/docs/3.0/getting-started/create-a-project) +3. [Create a supergraph](https://hasura.io/docs/3.0/getting-started/init-supergraph) +4. [Create a subgraph](https://hasura.io/docs/3.0/getting-started/init-subgraph) ## Using the connector To use the Elasticsearch connector, follow these steps in a Hasura project: +(Note: for more information on the following steps, please refer to the Postgres connector documentation [here](https://hasura.io/docs/3.0/getting-started/connect-to-data/connect-a-source)) -1. Add the connector: - - ```bash - ddn add connector-manifest es_connector --subgraph app --hub-connector hasura/elasticsearch --type cloud - ``` - - In the snippet above, we've used the subgraph `app` as it's available by default; however, you can change this - value to match any [subgraph](https://hasura.io/docs/3.0/project-configuration/subgraphs) which you've created in your project. - -2. Add your Elasticsearch credentials: - - Open your project in your text editor and open the `base.env.yaml` file in the root of your project. Then, add - `ES_CONNECTOR_URL`, `ES_CONNECTOR_USERNAME` and `ES_CONNECTOR_PASSWORD` environment variables under the `app` subgraph: - - ```yaml - supergraph: {} - subgraphs: - app: - ES_CONNECTOR_URL: "" - ES_CONNECTOR_USERNAME: "" - ES_CONNECTOR_PASSWORD: "" - ``` - - Next, update your `/app/es_connector/connector/es_connector.build.hml` file to reference this new environment - variable: - - ```yaml - # other configuration above - ELASTICSEARCH_URL: - valueFromEnv: ES_CONNECTOR_URL - ELASTICSEARCH_USERNAME: - valueFromEnv: ES_CONNECTOR_USERNAME - ELASTICSEARCH_PASSWORD: - valueFromEnv: ES_CONNECTOR_PASSWORD - ``` - - Notice, when we use an environment variable, we must change the key to `valueFromEnv` instead of `value`. This tells - Hasura DDN to look for the value in the environment variable we've defined instead of using the value directly. - -3. Update the connector manifest and the connector link - These two steps will (1) allow Hasura to introspect your data source and complete the configuration and (2) deploy the - connector to Hasura DDN: +### 1. Init the connector +(Note: here and following we are naming the subgraph "my_subgraph" and the connector "my_elastic") ```bash - ddn update connector-manifest es_connector + ddn connector init my_elastic --subgraph my_subgraph --hub-connector hasura/elasticsearch ``` - ```bash - ddn update connector-link es_connector - ``` +### 2. Add your Elasticsearch credentials: -4. Add Environment Variables +```env title="my_subgraph/connector/my_elastic/.env.local" +OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://local.hasura.dev:4317 +OTEL_SERVICE_NAME=my_subgraph_my_elastic +ELASTICSEARCH_URL= +ELASTICSEARCH_USERNAME= +ELASTICSEARCH_PASSWORD= +``` To configure the connector, the following environment variables need to be set: @@ -110,9 +74,98 @@ To configure the connector, the following environment variables need to be set: | `ELASTICSEARCH_CA_CERT_PATH` | The path to the Certificate Authority (CA) certificate for verifying the Elasticsearch server's SSL certificate | No | `/etc/connector/cacert.pem` | | `ELASTICSEARCH_INDEX_PATTERN` | The pattern for matching Elasticsearch indices, potentially including wildcards, used by the connector | No | `hasura*` | -## Documentation -View the full documentation for the Elasticsearch connector [here](./docs/index.md). +### 3. Intropsect your indices + +```bash title="From the root of your project run:" +ddn connector introspect --connector my_subgraph/connector/my_elastic/connector.yaml +``` + +If you look at the `configuration.json` for your connector, you'll see metadata describing your Elasticsearch mappings. + +### 4. Create the Hasura metadata + +```bash title="Run the following from the root of your project:" +ddn connector-link add my_elastic --subgraph my_subgraph +``` + +The generated file has two environment variables — one for reads and one for writes — that you'll need to add to your +subgraph's `.env.my_subgraph` file. Each key is prefixed by the subgraph name, an underscore, and the name of the +connector. Ensure the port value matches what is published in your connector's docker compose file. + +```env title="my_subgraph/.env.my_subgraph" +MY_SUBGRAPH_MY_ELASTIC_READ_URL=http://local.hasura.dev:8082 +MY_SUBGRAPH_MY_ELASTIC_WRITE_URL=http://local.hasura.dev:8082 +``` + +### 5. Start the connector's docker compose + +Let's start our connector's docker compose file. + +```bash title="Run the following from the connector's subdirectory inside a subgraph:" +docker compose -f docker-compose.my_elastic.yaml up +``` + +This starts our PostgreSQL connector on the specified port. We can navigate to the following address, with the port +modified, to see the schema of our Elasticsearch data source: + +```bash +http://localhost:8081/schema +``` + +### 6. Include the connector in your docker compose + +Kill the connector by pressing `CTRL+C` in the terminal tab in which the connector is running. + +Then, add the following inclusion to the docker compose in your project's root directory, taking care to modify the +subgraph's name. + +```yaml title="docker-compose.hasura.yaml" +include: + - path: my_subgraph/connector/my_elastic/docker-compose.my_elastic.yaml +``` + +Now, whenever running the following, you'll bring up the GraphQL engine, observability tools, and any connectors you've +included: + +```bash title="From the root of your project, run:" +HASURA_DDN_PAT=$(ddn auth print-pat) docker compose -f docker-compose.hasura.yaml watch +``` + +### 7. Update the new DataConnectorLink object + +Finally, now that our `DataConnectorLink` has the correct environment variables configured for the Elasticsearch connector, +we can run the update command to have the CLI look at the configuration JSON and transform it to reflect our database's +schema in `hml` format. In a new terminal tab, run: + +```bash title="From the root of your project, run:" +ddn connector-link update my_elastic --subgraph my_subgraph +``` + +After this command runs, you can open your `my_subgraph/metadata/my_elastic.hml` file and see your metadata completely +scaffolded out for you 🎉 + +### 8. Import _all_ your indices + +You can do this in one convenience command. + +```bash title="From the root of your project, run:" +ddn connector-link update my_elastic --subgraph my_subgraph --add-all-resources +``` + +### 9. Create a supergraph build + +Pass the `local` subcommand along with specifying the output directory as `./engine` in the root of the project. This +directory is used by the docker-compose file to serve the engine locally: + +```bash title="From the root of your project, run:" +ddn supergraph build local --output-dir ./engine +``` + +You can now navigate to +[`https://console.hasura.io/local/graphql?url=http://localhost:3000`](https://console.hasura.io/local/graphql?url=http://localhost:3000) +and interact with your API using the Hasura Console. + ## Contributing @@ -120,4 +173,4 @@ Check out our [contributing guide](./docs/contributing.md) for more details. ## License -The Elasticsearch connector is available under the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). +The Elasticsearch connector is available under the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). \ No newline at end of file diff --git a/cli/cli.go b/cli/cli.go index 7c96f6e..5a6d055 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -12,43 +12,57 @@ type Arguments struct { Configuration string `help:"Configuration directory." env:"HASURA_PLUGIN_CONNECTOR_CONTEXT_PATH"` } +type UpgradeArgs struct { + // dirFrom is the path to the old configuration directory. + DirFrom string `help:"Directory to upgrade from." required:"true"` + // dirTo is the path to the new configuration directory. + DirTo string `help:"Directory to upgrade to." required:"true"` +} + type CLI struct { connector.ServeCLI - Version struct{} `cmd:"" help:"Print the version."` - Initialize Arguments `cmd:"" help:"Initialize configuration directory."` - Update Arguments `cmd:"" help:"Update configuration directory."` - Validate Arguments `cmd:"" help:"Validate configuration directory."` + Version struct{} `cmd:"" help:"Print the version."` + Initialize Arguments `cmd:"" help:"Initialize the configuration directory."` + Update Arguments `cmd:"" help:"Update the configuration directory."` + Validate Arguments `cmd:"" help:"Validate the configuration directory."` + Upgrade UpgradeArgs `cmd:"" help:"Upgrade the configuration directory to be compatible with the latest connector version."` } // Execute executes the CLI command based on the provided command string. func (cli *CLI) Execute(ctx context.Context, command string) error { logger := connector.GetLogger(ctx) + switch command { case "version": - logger.InfoContext(ctx, "v0.1.0") - return nil + logger.InfoContext(ctx, "v0.2.0") case "initialize": - err := initialize(cli.Initialize.Configuration) - if err != nil { - return err + if err := initializeConfig(cli.Initialize.Configuration); err != nil { + return fmt.Errorf("failed to initialize configuration: %w", err) } - logger.InfoContext(ctx, "Configuration Initialized Successfully.") - return nil + logger.InfoContext(ctx, "Configuration initialized successfully.") case "update": - err := updateConfiguration(ctx, cli.Update.Configuration) - if err != nil { - return err + if err := updateConfig(ctx, cli.Update.Configuration); err != nil { + return fmt.Errorf("failed to update configuration: %w", err) } - logger.InfoContext(ctx, "Configuration Updated Successfully.") - return nil + logger.InfoContext(ctx, "Configuration updated successfully.") case "validate": - err := validate(cli.Validate.Configuration) + if err := validateConfig(cli.Validate.Configuration); err != nil { + return fmt.Errorf("failed to validate configuration: %w", err) + } + logger.InfoContext(ctx, "Configuration validated successfully.") + case "upgrade": + upgraded, err := upgradeConfig(cli.Upgrade.DirFrom, cli.Upgrade.DirTo) if err != nil { - return err + return fmt.Errorf("failed to upgrade configuration: %w", err) } - logger.InfoContext(ctx, "Configuration Validated Successfully.") - return nil + if !upgraded { + logger.InfoContext(ctx, "Configuration already up-to-date.") + return nil + } + logger.InfoContext(ctx, "Configuration upgraded successfully.") default: return fmt.Errorf("unknown command <%s>", command) } + + return nil } diff --git a/cli/initialize.go b/cli/initialize.go index b0a2350..8db2fb4 100644 --- a/cli/initialize.go +++ b/cli/initialize.go @@ -1,19 +1,39 @@ package cli import ( - "os" + "encoding/json" + "fmt" "path/filepath" + + "github.com/hasura/ndc-elasticsearch/internal" + "github.com/hasura/ndc-elasticsearch/types" ) const ConfigFileName = "configuration.json" -// initialize creates a configuration file at the specified path. -func initialize(configPath string) error { - configFilePath := filepath.Join(configPath, ConfigFileName) - file, err := os.Create(configFilePath) +// initializeConfig creates a configuration file at the specified path. +// It returns an error if the configuration file already exists. +// It creates an empty configuration file with "indices" and "queries" fields. +func initializeConfig(path string) error { + configPath := filepath.Join(path, ConfigFileName) + + // Return an error if the configuration file already exists + if internal.FileExists(configPath) { + return fmt.Errorf("configuration file already exists at %s", configPath) + } + + // Create an empty configuration file + config := types.Configuration{ + Indices: make(map[string]interface{}), + Queries: make(map[string]types.NativeQuery), + } + + data, err := json.MarshalIndent(config, "", " ") if err != nil { - return err + return fmt.Errorf("failed to marshal JSON: %w", err) } - defer file.Close() - return nil + + // Write the configuration file + err = internal.WriteJsonFile(configPath, data) + return err } diff --git a/cli/update.go b/cli/update.go index 40cda7e..cd0c464 100644 --- a/cli/update.go +++ b/cli/update.go @@ -3,45 +3,81 @@ package cli import ( "context" "encoding/json" - "os" + "fmt" "path/filepath" + "github.com/hasura/ndc-elasticsearch/connector" "github.com/hasura/ndc-elasticsearch/elasticsearch" + "github.com/hasura/ndc-elasticsearch/internal" + "github.com/hasura/ndc-elasticsearch/types" ) -// updateConfiguration updates the configuration file with the mappings retrieved from Elasticsearch. -func updateConfiguration(ctx context.Context, configPath string) error { +// updateConfiguration updates the configuration file with Elasticsearch mappings. +// It creates a new client, retrieves the indices and mappings from Elasticsearch, +// marshals the mappings into a JSON configuration file, and writes the updated +// configuration to disk atomically. +func updateConfig(ctx context.Context, configDir string) error { + // Create a new Elasticsearch client. client, err := elasticsearch.NewClient() if err != nil { return err } + // Get the indices from Elasticsearch. indices, err := client.GetIndices(ctx) if err != nil { return err } - result, err := client.GetMappings(ctx, indices) + // Get the mappings for the indices from Elasticsearch. + mappings, err := client.GetMappings(ctx, indices) if err != nil { return err } - jsonData, err := json.MarshalIndent(result, "", " ") - if err != nil { - return err - } + configPath := filepath.Join(configDir, ConfigFileName) - configFilePath := filepath.Join(configPath, ConfigFileName) - file, err := os.Create(configFilePath) + // Marshal the mappings into a JSON configuration file. + configData, err := marshalMappings(configPath, mappings) if err != nil { return err } - defer file.Close() - _, err = file.Write(jsonData) + // Write the updated configuration to disk atomically. + err = internal.WriteJsonFile(configPath, configData) if err != nil { return err } return nil } + +// marshalMappings marshals the Elasticsearch mappings into a JSON configuration file. +// It reads the existing configuration file if it exists, or creates a new one with an empty "queries" field. +// It then overwrites the "indices" field with the provided mappings. +// Returns the JSON data as a byte array and any error encountered. +func marshalMappings(configPath string, mappings interface{}) ([]byte, error) { + // Define the initial configuration template. + configuration := &types.Configuration{ + Indices: make(map[string]interface{}), + Queries: make(map[string]types.NativeQuery), + } + + // If the configuration file exists, read it using the decoder. + if internal.FileExists(configPath) { + var err error + configuration, err = connector.GetConfiguration(configPath, "") + if err != nil { + return nil, err + } + } + // Overwrite the "indices" field with the provided mappings. + indices, ok := mappings.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("failed to convert mappings to map[string]interface{}") + } + configuration.Indices = indices + + // Marshal the configuration data into a JSON byte array with indentation. + return json.MarshalIndent(configuration, "", " ") +} diff --git a/cli/upgrade.go b/cli/upgrade.go new file mode 100644 index 0000000..1785391 --- /dev/null +++ b/cli/upgrade.go @@ -0,0 +1,38 @@ +package cli + +import ( + "encoding/json" + "path/filepath" + + "github.com/hasura/ndc-elasticsearch/internal" + "github.com/hasura/ndc-elasticsearch/types" +) + +// Upgrade upgrades the configuration directory to be compatible with the latest connector version. +func upgradeConfig(sourceDir string, destDir string) (bool, error) { + sourceDir = filepath.Join(sourceDir, ConfigFileName) + destDir = filepath.Join(destDir, ConfigFileName) + oldConfig, err := internal.ReadJsonFileUsingDecoder(sourceDir) + if err != nil { + return false, err + } + if _, ok := oldConfig["indices"].(map[string]interface{}); ok { + return false, nil + } + upgradedConfig := upgradeToLatest(oldConfig) + + jsonData, err := json.MarshalIndent(upgradedConfig, "", " ") + if err != nil { + return false, nil + } + + return true, internal.WriteJsonFile(destDir, jsonData) +} + +// upgradeToLatest upgrades a configuration file from an older version of the connector to the latest version. +func upgradeToLatest(oldConfig map[string]interface{}) types.Configuration { + return types.Configuration{ + Indices: oldConfig, + Queries: map[string]types.NativeQuery{}, + } +} diff --git a/cli/validate.go b/cli/validate.go index e1e46fc..b1ed893 100644 --- a/cli/validate.go +++ b/cli/validate.go @@ -3,38 +3,90 @@ package cli import ( "encoding/json" "fmt" - "os" - "path/filepath" + + "github.com/hasura/ndc-elasticsearch/connector" + "github.com/hasura/ndc-elasticsearch/internal" + "github.com/hasura/ndc-elasticsearch/types" ) // validate validates the configuration file. -func validate(configPath string) error { - configFilePath := filepath.Join(configPath, ConfigFileName) - file, err := os.Open(configFilePath) +// validate validates the configuration file. It parses the configuration file, +// validates the mappings of the indices, and validates the native queries. +func validateConfig(configPath string) error { + // Parse the configuration file + configuration, err := connector.GetConfiguration(configPath, ConfigFileName) + if err != nil { + return err + } + + // Validate the mappings of the indices + err = validateMappings(configuration.Indices) if err != nil { - return fmt.Errorf("error opening file: %v", err) + return err } - defer file.Close() - // Decode the JSON file - var data map[string]interface{} - err = json.NewDecoder(file).Decode(&data) + // Validate the native queries + err = validateNativeQueries(configuration.Queries) if err != nil { - return fmt.Errorf("error decoding JSON: %v", err) + return err } - // Validate the mappings - for indexName, indexData := range data { - // Assuming each index in the file has a "mappings" key + return nil +} + +// validateMappings validates the mappings of the indices in the configuration file. +func validateMappings(mappings map[string]interface{}) error { + for indexName, indexData := range mappings { + // Check if the index data has a 'mappings' key mappings, ok := indexData.(map[string]interface{})["mappings"] if !ok { return fmt.Errorf("index %s is missing 'mappings' key", indexName) } + // Check if the 'mappings' value has a 'properties' key _, ok = mappings.(map[string]interface{})["properties"] if !ok { return fmt.Errorf("index %s is missing 'properties' key", indexName) } + } + return nil +} + +// validateNativeQueries validates the native queries in the configuration file. +// It checks for the presence of required keys and their types. +func validateNativeQueries(nativeQueries map[string]types.NativeQuery) error { + for queryName, queryConfig := range nativeQueries { + dsl := queryConfig.DSL + if dsl.File != nil && *dsl.File != "" { + _, err := internal.ReadJsonFileUsingDecoder(*dsl.File) + if err != nil { + return fmt.Errorf("invalid 'file' value in %s: %w", queryName, err) + } + } else if dsl.Internal != nil { + _, err := json.Marshal(*dsl.Internal) + if err != nil { + return fmt.Errorf("invalid 'internal' value in %s: %w", queryName, err) + } + } else { + return fmt.Errorf("missing 'file' or 'internal' key for 'dsl' in %s", queryName) + } + + if queryConfig.Index == "" { + return fmt.Errorf("missing 'index' value in %s", queryName) + } + + if queryConfig.ReturnType == nil { + return fmt.Errorf("missing 'return_type' value in %s", queryName) + } + + if queryConfig.ReturnType.Kind != "defination" && queryConfig.ReturnType.Kind != "index" { + return fmt.Errorf("invalid 'kind' value '%s' in %s", queryConfig.ReturnType.Kind, queryName) + } + + if queryConfig.ReturnType.Kind == "defination" && queryConfig.ReturnType.Mappings == nil { + return fmt.Errorf("missing 'mappings' value for kind 'defination' in %s", queryName) + } } + return nil } diff --git a/connector/aggregate.go b/connector/aggregate.go index 8d738a2..0470abe 100644 --- a/connector/aggregate.go +++ b/connector/aggregate.go @@ -3,65 +3,158 @@ package connector import ( "context" + "github.com/hasura/ndc-elasticsearch/internal" "github.com/hasura/ndc-elasticsearch/types" "github.com/hasura/ndc-sdk-go/schema" ) -// prepareAggregateQuery prepares the aggregate query based on the aggregates in the query request. +// prepareAggregateQuery prepares the aggregation query based on the aggregates in the query request. func prepareAggregateQuery(ctx context.Context, aggregates schema.QueryAggregates, state *types.State, collection string) (map[string]interface{}, error) { + var path string + aggregations := make(map[string]interface{}) postProcessor := ctx.Value("postProcessor").(*types.PostProcessor) - aggs := make(map[string]interface{}) - for name, aggregate := range aggregates { - aggregateColumn, ok := aggregate["column"].(string) + postProcessor.ColumnAggregate = make(map[string]bool) + + for aggregationName, aggregation := range aggregates { + aggregationType, err := aggregation.Type() + if err != nil { + return nil, err + } + + if aggregationType == schema.AggregateTypeStarCount { + postProcessor.StarAggregates = aggregationName + continue + } + + aggregationColumn := aggregation["column"].(string) + fieldPath, ok := aggregation["field_path"].([]string) if ok { - if collectionAggregateFields, ok := state.SupportedAggregateFields[collection]; ok { - if aggregateField, ok := collectionAggregateFields.(map[string]string)[aggregateColumn]; !ok { - return nil, schema.BadRequestError("aggregation not supported on this field", map[string]any{"value": aggregateColumn}) - } else { - aggregateColumn = aggregateField - } - } + aggregationColumn, path = joinFieldPath(state, fieldPath, aggregationColumn, collection) } - switch agg := aggregate.Interface().(type) { - case *schema.AggregateStarCount: - postProcessor.StarAggregates = name - case *schema.AggregateColumnCount: - if agg.Distinct { - aggs[name] = map[string]interface{}{ - "cardinality": map[string]interface{}{ - "field": aggregateColumn, - }, - } - } else { - aggs[name] = map[string]interface{}{ - "filter": map[string]interface{}{ - "exists": map[string]interface{}{ - "field": aggregateColumn, - }, - }, - } - } - postProcessor.ColumnAggregate = append(postProcessor.ColumnAggregate, name) - case *schema.AggregateSingleColumn: - postProcessor.ColumnAggregate = append(postProcessor.ColumnAggregate, name) - switch agg.Function { - case "sum", "min", "max", "avg", "value_count", "cardinality", "stats", "string_stats": - aggs[name] = map[string]interface{}{ - agg.Function: map[string]interface{}{ - "field": aggregateColumn, - }, - } - default: - return nil, schema.BadRequestError("invalid aggregate function", map[string]any{ - "value": agg.Function, - }) - } - default: - return nil, schema.BadRequestError("invalid aggregate field", map[string]any{ - "value": aggregate["type"], + validField := internal.ValidateFieldOperation(state, "aggregate", collection, aggregationColumn) + + if validField == "" { + return nil, schema.UnprocessableContentError("aggregation not supported on this field", map[string]any{ + "value": aggregationColumn, }) } + aggregationColumn = validField + + postProcessor.ColumnAggregate[aggregationName] = false + aggregation, err := prepareAggregate(ctx, aggregationName, aggregation, aggregationColumn, path) + if err != nil { + return nil, err + } + aggregations[aggregationName] = aggregation + path = "" + } + + return aggregations, nil +} + +// prepareAggregate prepares the columnCount and SingleColumn query based on the aggregates in the query request. +func prepareAggregate(ctx context.Context, aggName string, agg schema.Aggregate, column string, path string) (map[string]interface{}, error) { + var aggregation map[string]interface{} + switch a := agg.Interface().(type) { + case *schema.AggregateColumnCount: + aggregation = prepareAggregateColumnCount(ctx, column, path, a.Distinct, aggName) + case *schema.AggregateSingleColumn: + var err error + aggregation, err = prepareAggregateSingleColumn(ctx, a.Function, column, path, aggName) + if err != nil { + return nil, err + } + default: + return nil, schema.UnprocessableContentError("invalid aggregate field", map[string]any{ + "value": agg["type"], + }) + } + return aggregation, nil +} + +// prepareAggregateColumnCount prepares the column count query based on the aggregates in the query request. +// If the field is nested, it generates a nested query to count the occurrences of the field in the nested document. +func prepareAggregateColumnCount(ctx context.Context, field string, path string, isDistinct bool, aggName string) map[string]interface{} { + // Prepare the base aggregation query + aggregation := map[string]interface{}{ + "field": field, + } + + // If distinct flag is set, count distinct values + if isDistinct { + aggregation = map[string]interface{}{ + "cardinality": aggregation, + } + } else { + // Otherwise, count all occurrences + aggregation = map[string]interface{}{ + "filter": map[string]interface{}{ + "exists": aggregation, + }, + } + } + + // If the field is nested, generate a nested query + if path != "" { + aggregation = prepareNestedAggregate(ctx, aggName, aggregation, path) + } + + return aggregation +} + +// prepareAggregateSingleColumn prepares the single column query based on the aggregates in the query request. +// If the field is nested, it generates a nested query to perform the specified function on the field in the nested document. +func prepareAggregateSingleColumn(ctx context.Context, function, field string, path string, aggName string) (map[string]interface{}, error) { + // Validate the function + validFunctions := []string{"sum", "min", "max", "avg", "value_count", "cardinality", "stats", "string_stats"} + if !contains(validFunctions, function) { + return nil, schema.UnprocessableContentError("invalid aggregate function", map[string]any{ + "value": function, + }) } - return aggs, nil + // Prepare the aggregation query + aggregation := map[string]interface{}{ + function: map[string]interface{}{ + "field": field, + }, + } + + // If the field is nested, generate a nested query + if path != "" { + aggregation = prepareNestedAggregate(ctx, aggName, aggregation, path) + } + + return aggregation, nil +} + +// prepareNestedAggregate generates a nested query to perform the specified function on the field in the nested document. +// The generated query is added to the aggregation map. +func prepareNestedAggregate(ctx context.Context, aggName string, aggregation map[string]interface{}, path string) map[string]interface{} { + // Update the postProcessor to indicate that the aggregation is on a nested field + postProcessor := ctx.Value("postProcessor").(*types.PostProcessor) + postProcessor.ColumnAggregate[aggName] = true + + aggregation = map[string]interface{}{ + "aggs": map[string]interface{}{ + aggName: aggregation, + }, + } + + // Add the path to the nested field + aggregation["nested"] = map[string]interface{}{ + "path": path, + } + + return aggregation +} + +// contains checks if a string slice contains a specific element. +func contains(s []string, e string) bool { + for _, a := range s { + if a == e { + return true + } + } + return false } diff --git a/connector/arguments.go b/connector/arguments.go new file mode 100644 index 0000000..50b1d1f --- /dev/null +++ b/connector/arguments.go @@ -0,0 +1,70 @@ +package connector + +import ( + "fmt" + + "github.com/hasura/ndc-elasticsearch/types" + "github.com/hasura/ndc-sdk-go/schema" +) + +// evalArgument evaluates the argument and returns its value. +func evalArgument(argument *schema.Argument) (any, error) { + switch argument.Type { + case schema.ArgumentTypeVariable: + // If the type is a variable, create a types.Variable object with the + // argument name and return it. + return types.Variable(argument.Name), nil + case schema.ArgumentTypeLiteral: + return argument.Value, nil + default: + return nil, schema.UnprocessableContentError(fmt.Sprintf("invalid argument type: %s", argument.Type), nil) + } +} + +// validateArguments validates that all arguments have been provided, and that no arguments have been given that do not +// map collection's defined arguments. +func validateArguments(params map[string]interface{}, args map[string]schema.Argument) error { + // Check for missing arguments + missing := findMissingArgs(params, args) + if len(missing) > 0 { + return schema.UnprocessableContentError(fmt.Sprintf("missing arguments: %s", missing), nil) + } + + // Check for excess arguments + excess := findExcessArgs(params, args) + if len(excess) > 0 { + return schema.UnprocessableContentError(fmt.Sprintf("excess arguments: %s", excess), nil) + } + + return nil +} + +// findMissingArgs returns a slice of argument names that are required by the +// collection but not present in the QueryRequest. +func findMissingArgs(params map[string]interface{}, args map[string]schema.Argument) []string { + missing := []string{} + + for param := range params { + // Check if the parameter is not present in the QueryRequest + if _, ok := args[param]; !ok { + missing = append(missing, param) + } + } + + return missing +} + +// findExcessArgs returns a slice of argument names that are present in the +// QueryRequst but not defined in the collection. +func findExcessArgs(parameters map[string]interface{}, arguments map[string]schema.Argument) []string { + excessArgs := []string{} + + for argName := range arguments { + // check if the argument name is not defined in the collection + if _, ok := parameters[argName]; !ok { + excessArgs = append(excessArgs, argName) + } + } + + return excessArgs +} diff --git a/connector/configuration.go b/connector/configuration.go new file mode 100644 index 0000000..520e657 --- /dev/null +++ b/connector/configuration.go @@ -0,0 +1,92 @@ +package connector + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/hasura/ndc-elasticsearch/types" +) + +// ParseConfiguration parses the connector's configuration. +func (c *Connector) ParseConfiguration(ctx context.Context, configurationDir string) (*types.Configuration, error) { + // Parse the configuration file + configuration, err := GetConfiguration(configurationDir, configFileName) + if err != nil { + return nil, err + } + + return configuration, nil +} + +// GetConfiguration reads the configuration file, parses it into a Configuration struct, +// and initializes the native queries. +func GetConfiguration(configurationDir string, configFileName string) (*types.Configuration, error) { + // Define the path to the configuration file. + configFilePath := filepath.Join(configurationDir, configFileName) + + file, err := os.Open(configFilePath) + if err != nil { + return nil, fmt.Errorf("failed to open file %s: %w", configFilePath, err) + } + defer file.Close() + + decoder := json.NewDecoder(file) + + var configuration types.Configuration + + if err := decoder.Decode(&configuration); err != nil { + return nil, fmt.Errorf("failed to decode JSON for file %s: %w", configFileName, err) + } + + if configuration.Indices == nil { + return nil, fmt.Errorf("unable to parse configuration from %s, run upgrade command to upgrade the configuration", configFilePath) + } + + // Initialize the native queries + configuration.Queries, err = parseNativeQueries(&configuration, configurationDir) + if err != nil { + return nil, err + } + + return &configuration, nil +} + +// parseNativeQueries parses the native queries in the configuration file and initializes them. +// It sets the 'file' key in the DSL to the absolute path of the file. +func parseNativeQueries(config *types.Configuration, configDir string) (map[string]types.NativeQuery, error) { + queries := config.Queries + parsedQueries := make(map[string]types.NativeQuery, len(queries)) + + for name, query := range queries { + var queryFile string + if query.DSL.File != nil && *query.DSL.File != "" { + queryFile = filepath.Join(configDir, *query.DSL.File) + } else if query.DSL.Internal == nil { + return nil, fmt.Errorf("invalid 'dsl' definition in %s", name) + } + + if query.Index == "" { + return nil, fmt.Errorf("missing 'index' value in %s", name) + } + + returnTyep := query.ReturnType + if returnTyep == nil { + return nil, fmt.Errorf("missing 'return_type' value in %s", name) + } + + parsedQueries[name] = types.NativeQuery{ + DSL: types.DSL{ + File: &queryFile, + Internal: query.DSL.Internal, + }, + Index: query.Index, + Arguments: query.Arguments, + ReturnType: returnTyep, + } + } + + return parsedQueries, nil +} diff --git a/connector/connector.go b/connector/connector.go index 786ef49..6b32a6f 100644 --- a/connector/connector.go +++ b/connector/connector.go @@ -2,9 +2,6 @@ package connector import ( "context" - "encoding/json" - "os" - "path/filepath" "github.com/hasura/ndc-elasticsearch/elasticsearch" "github.com/hasura/ndc-elasticsearch/types" @@ -17,25 +14,6 @@ var configFileName = "configuration.json" // Connector implements the SDK interface of NDC specification type Connector struct{} -// ParseConfiguration parses the connector's configuration -func (c *Connector) ParseConfiguration(ctx context.Context, configurationDir string) (*types.Configuration, error) { - - configFilePath := filepath.Join(configurationDir, configFileName) - - config, err := os.ReadFile(configFilePath) - if err != nil { - return nil, err - } - - var configuration types.Configuration - err = json.Unmarshal(config, &configuration) - if err != nil { - return nil, err - } - - return &configuration, nil -} - // TryInitState initializes the connector's in-memory state. func (c *Connector) TryInitState(ctx context.Context, configuration *types.Configuration, metrics *connector.TelemetryState) (*types.State, error) { client, err := elasticsearch.NewClient() @@ -74,14 +52,15 @@ func (c *Connector) HealthCheck(ctx context.Context, configuration *types.Config // GetCapabilities get the connector's capabilities. func (c *Connector) GetCapabilities(configuration *types.Configuration) schema.CapabilitiesResponseMarshaler { return &schema.CapabilitiesResponse{ - Version: "0.1.3", + Version: "0.1.4", Capabilities: schema.Capabilities{ Query: schema.QueryCapabilities{ Variables: schema.LeafCapability{}, Aggregates: schema.LeafCapability{}, NestedFields: schema.NestedFieldCapabilities{ - OrderBy: schema.LeafCapability{}, - FilterBy: schema.LeafCapability{}, + OrderBy: schema.LeafCapability{}, + FilterBy: schema.LeafCapability{}, + Aggregates: schema.LeafCapability{}, }, }, }, diff --git a/connector/connector_test.go b/connector/connector_test.go index efb6a4c..181292b 100644 --- a/connector/connector_test.go +++ b/connector/connector_test.go @@ -131,6 +131,16 @@ var testCases = []struct { requestFile: "../testdata/query/aggregation/single_column_request.json", responseFile: "../testdata/query/aggregation/single_column_response.json", }, + { + name: "aggregation_on_nested_type_field", + requestFile: "../testdata/query/aggregation/nested_type_request.json", + responseFile: "../testdata/query/aggregation/nested_type_response.json", + }, + { + name: "aggregation_on_object_type_field", + requestFile: "../testdata/query/aggregation/object_type_request.json", + responseFile: "../testdata/query/aggregation/object_type_response.json", + }, // Test cases for variables { name: "single_column_aggregation_using_variables", @@ -147,6 +157,11 @@ var testCases = []struct { requestFile: "../testdata/query/variables/aggregation/star_count_request.json", responseFile: "../testdata/query/variables/aggregation/star_count_response.json", }, + { + name: "aggregation_with_native_query", + requestFile: "../testdata/query/aggregation/native_query_request.json", + responseFile: "../testdata/query/aggregation/native_query_response.json", + }, { name: "sort_asc_using_variables", requestFile: "../testdata/query/variables/sort/sort_asc_request.json", @@ -207,6 +222,11 @@ var testCases = []struct { requestFile: "../testdata/query/variables/filter/predicate_on_nested_type_request.json", responseFile: "../testdata/query/variables/filter/predicate_on_nested_type_response.json", }, + { + name: "predicate_with_native_query", + requestFile: "../testdata/query/filter/native_query_request.json", + responseFile: "../testdata/query/filter/native_query_response.json", + }, } // createTestServer creates a test server for the given configuration. diff --git a/connector/filter.go b/connector/filter.go index 8adb3c3..2f76b14 100644 --- a/connector/filter.go +++ b/connector/filter.go @@ -111,8 +111,11 @@ func handleExpressionBinaryComparisonOperator(expr *schema.ExpressionBinaryCompa return filter, nil } +// joinFieldPath joins the fieldPath and columnName to form a fully qualified field path. +// It also checks if the field is nested and returns the nested path. func joinFieldPath(state *types.State, fieldPath []string, columnName string, collection string) (string, string) { nestedPath := "" + if nestedFields, ok := state.NestedFields[collection]; ok { if _, ok := nestedFields.(map[string]string)[columnName]; ok { nestedPath = columnName @@ -120,14 +123,18 @@ func joinFieldPath(state *types.State, fieldPath []string, columnName string, co } joinedPath := columnName + for _, field := range fieldPath { joinedPath = joinedPath + "." + field + + // Check if the joined path is nested. if nestedFields, ok := state.NestedFields[collection]; ok { if _, ok := nestedFields.(map[string]string)[joinedPath]; ok { nestedPath = nestedPath + "." + field } } } + return joinedPath, nestedPath } @@ -136,6 +143,7 @@ func joinNestedFieldPath(state *types.State, operator string, value map[string]i // Create the innermost query operators := strings.Split(operator, ".") query := value + // Iterate over the operators in reverse order for i := len(operators) - 1; i >= 0; i-- { // Wrap the current query part inside the new level query = map[string]interface{}{ @@ -145,8 +153,10 @@ func joinNestedFieldPath(state *types.State, operator string, value map[string]i // Iterate over the fieldPath in to build the nested structure pathIdx := strings.LastIndex(fieldName, ".") for i := 0; i <= nestedLevel-1; i++ { + // Check if the current field is nested if nestedFields, ok := state.NestedFields[collection]; ok { if _, ok := nestedFields.(map[string]string)[fieldName[:pathIdx]]; ok { + // Create the nested query with the current path and query query = map[string]interface{}{ "nested": map[string]interface{}{ "path": fieldName[:pathIdx], @@ -155,13 +165,16 @@ func joinNestedFieldPath(state *types.State, operator string, value map[string]i } } } + // Update the pathIdx to the next level pathIdx = strings.LastIndex(fieldName[:pathIdx], ".") } return query } -// getKeywordFieldFromState retrieves best matching field for term level queries. +// getKeywordFieldFromState retrieves the best matching field for term level queries +// from the state. If the field is found, it returns the corresponding field +// name; otherwise, it returns the original columnName. func getKeywordFieldFromState(state *types.State, columnName string, collection string) string { if collectionFields, ok := state.SupportedFilterFields[collection]; ok { if keywordFields, ok := collectionFields.(map[string]interface{})["term_level_queries"].(map[string]string); ok { @@ -178,6 +191,9 @@ func getKeywordFieldFromState(state *types.State, columnName string, collection return columnName } +// getTextFieldFromState retrieves the best matching field for full text queries +// from the state. If the field is found, it returns the corresponding field +// name; otherwise, it returns the original columnName. func getTextFieldFromState(state *types.State, columnName string, collection string) string { if collectionField, ok := state.SupportedFilterFields[collection]; ok { if textFields, ok := collectionField.(map[string]interface{})["full_text_queries"].(map[string]string); ok { @@ -189,7 +205,9 @@ func getTextFieldFromState(state *types.State, columnName string, collection str return columnName } -// getWildcardFieldFromState retrieves best matching field for wildcard and regexp queries. +// getWildcardFieldFromState retrieves the best matching field for wildcard and regexp +// queries from the state. If the field is found, it returns the corresponding field +// name; otherwise, it returns the original columnName. func getWildcardFieldFromState(state *types.State, columnName string, collection string) string { if collectionFields, ok := state.SupportedFilterFields[collection]; ok { if wildcardFields, ok := collectionFields.(map[string]interface{})["unstructured_text"].(map[string]string); ok { diff --git a/connector/native_query.go b/connector/native_query.go new file mode 100644 index 0000000..cc10a61 --- /dev/null +++ b/connector/native_query.go @@ -0,0 +1,145 @@ +package connector + +import ( + "context" + "regexp" + + "github.com/hasura/ndc-elasticsearch/internal" + "github.com/hasura/ndc-elasticsearch/types" + "github.com/hasura/ndc-sdk-go/schema" +) + +// handleNativeQuery is a function that processes a native query by reading and parsing it from a file, +// replacing arguments in the template, and preparing the final native query. +func handleNativeQuery( + ctx context.Context, + queryConfig types.NativeQuery, + query map[string]interface{}, + arguments map[string]schema.Argument, +) (map[string]interface{}, error) { + dsl := queryConfig.DSL + + nativeQuery := make(map[string]interface{}) + var err error + if dsl.File != nil && *dsl.File != "" { + nativeQuery, err = internal.ReadJsonFileUsingDecoder(*dsl.File) + if err != nil { + return nil, err + } + } else if dsl.Internal != nil { + nativeQuery = *dsl.Internal + } + + if queryConfig.Arguments != nil { + nativeQuery, err = processArguments(nativeQuery, arguments, *queryConfig.Arguments) + if err != nil { + return nil, err + } + } + + return prepareNativeQuery(ctx, nativeQuery, query), nil +} + +// prepareNativeQuery prepares the native query based on the input query parameters. +// It merges the aggregates and filters from the native query into the main query. +func prepareNativeQuery( + ctx context.Context, + nativeQuery map[string]interface{}, + query map[string]interface{}, +) map[string]interface{} { + postProcessor := ctx.Value("postProcessor").(*types.PostProcessor) + + // Merge aggregates + if aggs, ok := nativeQuery["aggs"].(map[string]interface{}); ok { + if aggregates, ok := query["aggs"].(map[string]interface{}); ok { + for aggregateName, aggregate := range aggs { + postProcessor.ColumnAggregate[aggregateName] = false + aggregates[aggregateName] = aggregate + } + } else { + query["aggs"] = aggs + } + } + + // Merge filters + if nqFilters, ok := nativeQuery["query"]; ok { + if filters, ok := query["query"]; ok { + // append native query filters to the boolean query if already present + if boolFilter, ok := filters.(map[string]interface{})["bool"]; ok { + if mustFilter, ok := boolFilter.(map[string]interface{})["must"].([]interface{}); ok { + query["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = append(mustFilter, nqFilters) + } + } else { + // create new boolean query if not present + query["query"] = map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []interface{}{filters, nqFilters}, + }, + } + } + } + } + + return query +} + +// replacePlaceholders recursively replaces placeholders in the template map with actual values +func replaceArguments(template interface{}, arguments map[string]schema.Argument) (interface{}, error) { + re := regexp.MustCompile(`\{\{(\w+)\}\}`) + + switch v := template.(type) { + case map[string]interface{}: + for key, value := range v { + replacedValue, err := replaceArguments(value, arguments) + if err != nil { + return nil, err + } + v[key] = replacedValue + } + return v, nil + case []interface{}: + for i, item := range v { + replacedItem, err := replaceArguments(item, arguments) + if err != nil { + return nil, err + } + v[i] = replacedItem + } + return v, nil + case string: + placeholder := re.FindStringSubmatch(v) + if len(placeholder) > 0 { + arg := arguments[placeholder[1]] + argValue, err := evalArgument(&arg) + if err != nil { + return nil, err + } + return argValue, nil + } + return v, nil + default: + return v, nil + } +} + +// ProcessArguments replaces placeholders in the template with values and returns the processed template +func processArguments( + template map[string]interface{}, + arguments map[string]schema.Argument, + queryArgs map[string]interface{}, +) (map[string]interface{}, error) { + err := validateArguments(queryArgs, arguments) + if err != nil { + return nil, err + } + + if len(arguments) > 0 { + processedTemplate, err := replaceArguments(template, arguments) + if err != nil { + return nil, err + } + return processedTemplate.(map[string]interface{}), nil + } + + return template, nil +} diff --git a/connector/query.go b/connector/query.go index cdb81fd..2953637 100644 --- a/connector/query.go +++ b/connector/query.go @@ -15,7 +15,7 @@ import ( // Query executes a query request. func (c *Connector) Query(ctx context.Context, configuration *types.Configuration, state *types.State, request *schema.QueryRequest) (schema.QueryResponse, error) { span := trace.SpanFromContext(ctx) - response, err := executeQuery(ctx, state, request, span) + response, err := executeQuery(ctx, configuration, state, request, span) if err != nil { span.SetStatus(codes.Error, err.Error()) return nil, err @@ -23,78 +23,74 @@ func (c *Connector) Query(ctx context.Context, configuration *types.Configuratio return response, nil } -// executeQuery prepares equivalent elasticsearch query, executes it and returns the ndc response -func executeQuery(ctx context.Context, state *types.State, request *schema.QueryRequest, span trace.Span) (schema.QueryResponse, error) { +// executeQuery prepares equivalent elasticsearch query, executes it and returns the ndc response. +func executeQuery(ctx context.Context, configuration *types.Configuration, state *types.State, request *schema.QueryRequest, span trace.Span) (schema.QueryResponse, error) { // Set the postProcessor in ctx ctx = context.WithValue(ctx, "postProcessor", &types.PostProcessor{}) logger := connector.GetLogger(ctx) rowSets := make([]schema.RowSet, 0) + index := request.Collection + // Identify the index from configuration + nativeQueries := configuration.Queries + queryConfig, ok := nativeQueries[request.Collection] + if ok { + index = queryConfig.Index + } + + // Prepare the elasticsearch query prepareContext, prepareSpan := state.Tracer.Start(ctx, "prepare_elasticsearch_query") defer prepareSpan.End() - body, err := prepareElasticsearchQuery(prepareContext, request, state) + dslQuery, err := prepareElasticsearchQuery(prepareContext, request, state, index) if err != nil { prepareSpan.SetStatus(codes.Error, err.Error()) return nil, err } prepareSpan.End() - if request.Variables == nil || len(request.Variables) == 0 { - searchContext, searchSpan := state.Tracer.Start(ctx, "database_request") - defer searchSpan.End() - - queryJson, _ := json.Marshal(body) - setDatabaseAttribute(span, state, request.Collection, string(queryJson)) - addSpanEvent(searchSpan, logger, "search_elasticsearch", map[string]any{ - "elasticsearch_request": body, - }) - - res, err := state.Client.Search(searchContext, request.Collection, body) + // Handle native queries if present + if ok { + dslQuery, err = handleNativeQuery(ctx, queryConfig, dslQuery, request.Arguments) if err != nil { - searchSpan.SetStatus(codes.Error, err.Error()) return nil, err } - searchSpan.End() - - responseContext, responseSpan := state.Tracer.Start(ctx, "prepare_ndc_response") - defer responseSpan.End() + } - addSpanEvent(responseSpan, logger, "prepare_ndc_response", map[string]any{ - "elasticsearch_response": res, - }) - result := prepareResponse(responseContext, res) - rowSets = append(rowSets, *result) - responseSpan.End() - } else { + // Prepare query with variables if present + if len(request.Variables) != 0 { _, variableSpan := state.Tracer.Start(ctx, "prepare_query_with_variables") defer variableSpan.End() addSpanEvent(variableSpan, logger, "prepare_query_with_variables", map[string]any{ "variables": request.Variables, }) - variableQuery, err := executeQueryWithVariables(request.Variables, body) + dslQuery, err = executeQueryWithVariables(request.Variables, dslQuery) if err != nil { variableSpan.SetStatus(codes.Error, err.Error()) return nil, err } variableSpan.End() + } - searchContext, searchSpan := state.Tracer.Start(ctx, "database_request") - defer searchSpan.End() + // Execute the elasticsearch query + searchContext, searchSpan := state.Tracer.Start(ctx, "database_request") + defer searchSpan.End() - queryJson, _ := json.Marshal(variableQuery) - setDatabaseAttribute(span, state, request.Collection, string(queryJson)) - addSpanEvent(searchSpan, logger, "search_elasticsearch", map[string]any{ - "elasticsearch_request": variableQuery, - }) - res, err := state.Client.Search(searchContext, request.Collection, variableQuery) - if err != nil { - searchSpan.SetStatus(codes.Error, err.Error()) - return nil, err - } - searchSpan.End() + queryJson, _ := json.Marshal(dslQuery) + setDatabaseAttribute(span, state, index, string(queryJson)) + addSpanEvent(searchSpan, logger, "search_elasticsearch", map[string]any{ + "elasticsearch_request": dslQuery, + }) + res, err := state.Client.Search(searchContext, index, dslQuery) + if err != nil { + searchSpan.SetStatus(codes.Error, err.Error()) + return nil, err + } + searchSpan.End() + // Prepare response based on variables + if len(request.Variables) != 0 { responseContext, responseSpan := state.Tracer.Start(ctx, "prepare_ndc_response") defer responseSpan.End() @@ -102,12 +98,22 @@ func executeQuery(ctx context.Context, state *types.State, request *schema.Query "elasticsearch_response": res, }) rowSets = prepareResponseWithVariables(responseContext, res) + } else { + responseContext, responseSpan := state.Tracer.Start(ctx, "prepare_ndc_response") + defer responseSpan.End() + + addSpanEvent(responseSpan, logger, "prepare_ndc_response", map[string]any{ + "elasticsearch_response": res, + }) + result := prepareResponse(responseContext, res) + rowSets = append(rowSets, *result) + responseSpan.End() } return rowSets, nil } // prepareElasticsearchQuery prepares an Elasticsearch query based on the provided query request. -func prepareElasticsearchQuery(ctx context.Context, request *schema.QueryRequest, state *types.State) (map[string]interface{}, error) { +func prepareElasticsearchQuery(ctx context.Context, request *schema.QueryRequest, state *types.State, index string) (map[string]interface{}, error) { query := map[string]interface{}{ "_source": map[string]interface{}{ "excludes": []string{"*"}, @@ -143,7 +149,7 @@ func prepareElasticsearchQuery(ctx context.Context, request *schema.QueryRequest span.AddEvent("prepare_sort_query") // Order by if request.Query.OrderBy != nil && len(request.Query.OrderBy.Elements) != 0 { - sort, err := prepareSortQuery(request.Query.OrderBy, state, request.Collection) + sort, err := prepareSortQuery(request.Query.OrderBy, state, index) if err != nil { return nil, err } @@ -153,7 +159,7 @@ func prepareElasticsearchQuery(ctx context.Context, request *schema.QueryRequest span.AddEvent("prepare_aggregate_query") // Aggregations if request.Query.Aggregates != nil { - aggs, err := prepareAggregateQuery(ctx, request.Query.Aggregates, state, request.Collection) + aggs, err := prepareAggregateQuery(ctx, request.Query.Aggregates, state, index) if err != nil { return nil, err } @@ -165,7 +171,7 @@ func prepareElasticsearchQuery(ctx context.Context, request *schema.QueryRequest span.AddEvent("prepare_filter_query") // Filter if request.Query.Predicate != nil { - filter, err := prepareFilterQuery(request.Query.Predicate, state, request.Collection) + filter, err := prepareFilterQuery(request.Query.Predicate, state, index) if err != nil { return nil, err } @@ -174,7 +180,7 @@ func prepareElasticsearchQuery(ctx context.Context, request *schema.QueryRequest } } - // Pretty print query + // Pretty print the query queryJSON, _ := json.MarshalIndent(query, "", " ") fmt.Println(string(queryJSON)) diff --git a/connector/response.go b/connector/response.go index bcce2ed..44a257b 100644 --- a/connector/response.go +++ b/connector/response.go @@ -79,7 +79,7 @@ func prepareResponse(ctx context.Context, response map[string]interface{}) *sche return rowSet } -// extractDocument extracts document fields based on the selected fields from the source data. +// extractDocument extracts selected fields from the source data. func extractDocument(source map[string]interface{}, selectedFields map[string]types.Field) map[string]interface{} { document := make(map[string]interface{}) for fieldName, fieldData := range selectedFields { @@ -122,15 +122,18 @@ func extractAggregates(aggregates schema.RowSetAggregates, aggregations map[stri return aggregates } - for _, column := range postProcessor.ColumnAggregate { - if record, ok := aggregations[column].(map[string]interface{}); ok { + for aggName, isNested := range postProcessor.ColumnAggregate { + if record, ok := aggregations[aggName].(map[string]interface{}); ok { + if isNested { + record = record[aggName].(map[string]interface{}) + } val, ok := record["value"] if ok { - aggregates[column] = val + aggregates[aggName] = val } else if val, ok := record["doc_count"]; ok { - aggregates[column] = val + aggregates[aggName] = val } else { - aggregates[column] = record + aggregates[aggName] = record } } } diff --git a/connector/schema.go b/connector/schema.go index 6739b7c..eecb2dc 100644 --- a/connector/schema.go +++ b/connector/schema.go @@ -22,7 +22,9 @@ func parseConfigurationToSchema(configuration *types.Configuration, state *types Procedures: []schema.ProcedureInfo{}, } - for indexName, mappings := range *configuration { + indices := configuration.Indices + + for indexName, mappings := range indices { state.SupportedFilterFields[indexName] = map[string]interface{}{ "term_level_queries": make(map[string]string), "unstructured_text": make(map[string]string), @@ -45,7 +47,7 @@ func parseConfigurationToSchema(configuration *types.Configuration, state *types } fields, objects := getScalarTypesAndObjects(properties, state, indexName, "") - prepareNDCSchema(&ndcSchema, indexName, fields, objects) + prepareNdcSchema(&ndcSchema, indexName, fields, objects) ndcSchema.Collections = append(ndcSchema.Collections, schema.CollectionInfo{ Name: indexName, @@ -59,9 +61,89 @@ func parseConfigurationToSchema(configuration *types.Configuration, state *types ForeignKeys: schema.CollectionInfoForeignKeys{}, }) } + + nativeQueries := configuration.Queries + + ndcSchema = parseNativeQueryToSchema(&ndcSchema, state, nativeQueries) + return &ndcSchema } +// parseNativeQueryToSchema parses the given native queries and adds them to the schema response. +// It also handles return types of kind "defination" and updates the state accordingly. +func parseNativeQueryToSchema(schemaResponse *schema.SchemaResponse, state *types.State, nativeQueries map[string]types.NativeQuery) schema.SchemaResponse { + for queryName, queryConfig := range nativeQueries { + indexName := queryConfig.Index + + returnType := queryConfig.ReturnType + returnTypeKind := returnType.Kind + + if returnTypeKind == "defination" { + indexName = queryName + mapping := returnType.Mappings + + properties, ok := (*mapping)["properties"].(map[string]interface{}) + if !ok { + continue + } + + state.SupportedFilterFields[indexName] = map[string]interface{}{ + "term_level_queries": make(map[string]string), + "unstructured_text": make(map[string]string), + "full_text_queries": make(map[string]string), + } + state.NestedFields[indexName] = make(map[string]string) + state.SupportedAggregateFields[indexName] = make(map[string]string) + state.SupportedSortFields[indexName] = make(map[string]string) + fields, objects := getScalarTypesAndObjects(properties, state, indexName, "") + prepareNdcSchema(schemaResponse, indexName, fields, objects) + } + + // Get arguments for the collection info + arguments := schema.CollectionInfoArguments{} + if queryConfig.Arguments != nil { + arguments = getNdcArguments(*queryConfig.Arguments) + } + + collectionInfo := schema.CollectionInfo{ + Name: queryName, + Arguments: arguments, + Type: indexName, + UniquenessConstraints: schema.CollectionInfoUniquenessConstraints{ + queryName + "_by_id": schema.UniquenessConstraint{ + UniqueColumns: []string{"_id"}, + }, + }, + ForeignKeys: schema.CollectionInfoForeignKeys{}, + } + + schemaResponse.Collections = append(schemaResponse.Collections, collectionInfo) + } + + return *schemaResponse +} + +// getNdcArguments converts the query parameters to NDC ArgumentInfo objects. +func getNdcArguments(parameters map[string]interface{}) schema.CollectionInfoArguments { + arguments := schema.CollectionInfoArguments{} + + for argName, argData := range parameters { + argMap, ok := argData.(map[string]interface{}) + if !ok { + continue + } + + typeStr := argMap["type"].(string) + typeObj := schema.NewNamedType(typeStr) + + arguments[argName] = schema.ArgumentInfo{ + Type: typeObj.Encode(), + } + } + + return arguments +} + // getScalarTypesAndObjects retrieves scalar types and objects from properties. func getScalarTypesAndObjects(properties map[string]interface{}, state *types.State, indexName string, parentField string) ([]map[string]interface{}, []map[string]interface{}) { fields := make([]map[string]interface{}, 0) @@ -182,8 +264,8 @@ func getScalarTypesAndObjects(properties map[string]interface{}, state *types.St return fields, objects } -// prepareNDCSchema prepares the NDC schema. -func prepareNDCSchema(ndcSchema *schema.SchemaResponse, index string, fields []map[string]interface{}, objects []map[string]interface{}) { +// prepareNdcSchema prepares the NDC schema. +func prepareNdcSchema(ndcSchema *schema.SchemaResponse, index string, fields []map[string]interface{}, objects []map[string]interface{}) { collectionFields := make(schema.ObjectTypeFields) for _, field := range fields { @@ -254,6 +336,8 @@ func prepareNDCSchema(ndcSchema *schema.SchemaResponse, index string, fields []m } } +// isSortSupported checks if a field type is supported for sorting +// based on fielddata and unsupported sort data types. func isSortSupported(fieldType string, fieldDataEnalbed bool) bool { if fieldDataEnalbed { return true @@ -265,14 +349,19 @@ func isSortSupported(fieldType string, fieldDataEnalbed bool) bool { } return true } + +// isAggregateSupported checks if a field type is supported for aggregation +// based on fielddata and unsupported aggregate data types. func isAggregateSupported(fieldType string, fieldDataEnalbed bool) bool { if fieldDataEnalbed { return true } + for _, unSupportedType := range unSupportedAggregateTypes { if fieldType == unSupportedType { return false } } + return true } diff --git a/connector/select.go b/connector/select.go index 4d0d863..5b1a57d 100644 --- a/connector/select.go +++ b/connector/select.go @@ -7,26 +7,45 @@ import ( "github.com/hasura/ndc-sdk-go/schema" ) -func prepareSelectFields(ctx context.Context, fields schema.QueryFields, postProcessor *types.PostProcessor, parentField string) ([]string, map[string]types.Field, error) { +// prepareSelectFields prepares the fields to be selected from the +// Elasticsearch document. +func prepareSelectFields( + ctx context.Context, + fields schema.QueryFields, + postProcessor *types.PostProcessor, + parentField string, +) ( + []string, + map[string]types.Field, + error, +) { source := make([]string, 0) selectedFields := make(map[string]types.Field) + for fieldName, fieldData := range fields { columnData, err := fieldData.AsColumn() if err != nil { - return nil, nil, schema.UnprocessableContentError("relationship has not been supported yet", map[string]interface{}{"value": fieldData}) + return nil, nil, schema.UnprocessableContentError( + "relationship has not been supported yet", + map[string]interface{}{"value": fieldData}, + ) } + column := columnData.Column field := types.Field{ Name: column, } + // If the field has a parent field, update the column name if parentField != "" { column = parentField + "." + columnData.Column } else { + // If the column is "_id", update the post processor if columnData.Column == "_id" { postProcessor.IsIDSelected = true } } + if columnData.Fields == nil { source = append(source, column) } else { @@ -37,20 +56,34 @@ func prepareSelectFields(ctx context.Context, fields schema.QueryFields, postPro field.Fields = nestedSelectFields source = append(source, nestedFields...) } + selectedFields[fieldName] = field } + return source, selectedFields, nil } -func prepareNestedSelectField(ctx context.Context, field schema.NestedField, postProcessor *types.PostProcessor, parentField string) ([]string, map[string]types.Field, error) { +// prepareNestedSelectField prepares the nested select field for the given context. +// It recursively prepares the select fields for nested objects and arrays. +func prepareNestedSelectField( + ctx context.Context, + field schema.NestedField, + postProcessor *types.PostProcessor, + parentField string, +) ( + []string, + map[string]types.Field, + error, +) { switch nestedField := field.Interface().(type) { case *schema.NestedObject: return prepareSelectFields(ctx, nestedField.Fields, postProcessor, parentField) case *schema.NestedArray: return prepareNestedSelectField(ctx, nestedField.Fields, postProcessor, parentField) default: - return nil, nil, schema.UnprocessableContentError("invalid nested field", map[string]any{ - "value": field, - }) + return nil, nil, schema.UnprocessableContentError( + "invalid nested field", + map[string]any{"value": field}, + ) } } diff --git a/connector/sort.go b/connector/sort.go index 0d425be..b8cfc1f 100644 --- a/connector/sort.go +++ b/connector/sort.go @@ -1,6 +1,7 @@ package connector import ( + "github.com/hasura/ndc-elasticsearch/internal" "github.com/hasura/ndc-elasticsearch/types" "github.com/hasura/ndc-sdk-go/schema" ) @@ -18,29 +19,35 @@ func prepareSortQuery(orderBy *schema.OrderBy, state *types.State, collection st return sort, nil } +// prepareSortElement prepares the sort element for Elasticsearch query. +// +// It takes in the OrderByElement, state, and collection as parameters. +// It returns the prepared sort element and an error if any. func prepareSortElement(element *schema.OrderByElement, state *types.State, collection string) (map[string]interface{}, error) { sort := make(map[string]interface{}) switch target := element.Target.Interface().(type) { case *schema.OrderByColumn: - fieldName, fieldPath := joinFieldPath(state, target.FieldPath, target.Name, collection) + // Join the field path to get the field path and nested path. + fieldPath, nestedPath := joinFieldPath(state, target.FieldPath, target.Name, collection) - if collectionSortFields, ok := state.SupportedSortFields[collection]; ok { - if sortField, ok := collectionSortFields.(map[string]string)[fieldName]; ok { - fieldName = sortField - } else { - return nil, schema.UnprocessableContentError("sorting not supported on this field", map[string]any{ - "value": fieldName, - }) - } + validField := internal.ValidateFieldOperation(state, "sort", collection, fieldPath) + if validField == "" { + return nil, schema.UnprocessableContentError("sorting not supported on this field", map[string]any{ + "value": fieldPath, + }) } - sort[fieldName] = map[string]interface{}{ + fieldPath = validField + sort[fieldPath] = map[string]interface{}{ "order": string(element.OrderDirection), } + + // Check if the field is nested. if nestedFields, ok := state.NestedFields[collection]; ok { if _, ok := nestedFields.(map[string]string)[target.Name]; ok { - sort[fieldName] = map[string]interface{}{ + // If the field is nested, add the nested path to the sort element. + sort[fieldPath] = map[string]interface{}{ "nested": map[string]interface{}{ - "path": fieldPath, + "path": nestedPath, }, "order": string(element.OrderDirection), } @@ -51,5 +58,6 @@ func prepareSortElement(element *schema.OrderByElement, state *types.State, coll "value": element.Target, }) } + return sort, nil } diff --git a/connector/static_types.go b/connector/static_types.go index 82f5aa5..d1087d5 100644 --- a/connector/static_types.go +++ b/connector/static_types.go @@ -246,22 +246,27 @@ func getComparisonOperatorDefinition(dataType string) map[string]schema.Comparis return comparisonOperators } +// getAggregationFunctions generates and returns a map of aggregation functions based on the provided list of functions and data type. func getAggregationFunctions(functions []string, typeName string) schema.ScalarTypeAggregateFunctions { aggregationFunctions := make(schema.ScalarTypeAggregateFunctions) + for _, function := range functions { if function == "cardinality" || function == "value_count" { typeName = "integer" - } - if function == "stats" || function == "string_stats" { + } else if function == "stats" || function == "string_stats" { typeName = function } + + // Generate the function definition and add it to the map aggregationFunctions[function] = schema.AggregateFunctionDefinition{ ResultType: schema.NewNamedType(typeName).Encode(), } } + return aggregationFunctions } +// unSupportedAggregateTypes are lists of data types that do not support aggregation in elasticsearch. var unSupportedAggregateTypes = []string{ "text", "search_as_you_type", @@ -270,6 +275,7 @@ var unSupportedAggregateTypes = []string{ "binary", } +// unsupportedSortDataTypes are lists of data types that do not support sorting in elasticsearch. var unsupportedSortDataTypes = []string{ "text", "search_as_you_type", diff --git a/connector/variables.go b/connector/variables.go index 5e1725a..c049afa 100644 --- a/connector/variables.go +++ b/connector/variables.go @@ -1,22 +1,23 @@ package connector import ( - "encoding/json" - "fmt" - "github.com/hasura/ndc-elasticsearch/types" "github.com/hasura/ndc-sdk-go/schema" ) // executeQueryWithVariables prepares a dsl query for query with variables. +// It takes a list of variable sets and a query body as input. +// It replaces the variable names in the query body with values from the variable sets. +// It returns the prepared query and an error if any. func executeQueryWithVariables(variableSets []schema.QueryRequestVariablesElem, body map[string]interface{}) (map[string]interface{}, error) { variableQuery := make(map[string]interface{}) + // do not to return any documents in the search results while performing aggregations variableQuery["size"] = 0 var filters []interface{} if filter, ok := body["query"]; ok { for _, variableSet := range variableSets { - // Replace variable names in the map with values from memoization + // Replace variable names in the filter map with values from variableSet updatedFilter, err := replaceVariables(filter, variableSet) if err != nil { return nil, err @@ -53,9 +54,6 @@ func executeQueryWithVariables(variableSets []schema.QueryRequestVariablesElem, "aggs": aggregate, }, } - // Pretty print query - queryJSON, _ := json.MarshalIndent(variableQuery, "", " ") - fmt.Println("Variable Query", string(queryJSON)) return variableQuery, nil } diff --git a/docker-compose.yaml b/docker-compose.yaml index cec7d4b..d53a101 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,7 +1,7 @@ version: '3.8' services: ndc-elasticsearch: - build: . + image: ghcr.io/hasura/ndc-elasticsearch:latest volumes: - ./resources/configuration.json:/etc/connector/configuration.json - certs:/usr/share/elasticsearch/config/certs @@ -140,7 +140,7 @@ services: retries: 120 engine: - image: ghcr.io/hasura/v3-engine:6c9979ddaad50d476c0996d1ece48f0cf1c8e99d + image: ghcr.io/hasura/v3-engine:latest platform: linux/amd64 environment: - METADATA_PATH=/metadata/metadata.json diff --git a/docs/configuration.md b/docs/configuration.md new file mode 100644 index 0000000..d7a2b1b --- /dev/null +++ b/docs/configuration.md @@ -0,0 +1,96 @@ +# Configuration + +## Initializing & Updating a Configuration Directory + +The connector requires a configuration directory to run. + +### Using the ndc-elasticsearch Executable + +If you have the executable: + +```bash +ndc-elasticsearch update --configuration=STRING +``` +It initialize and updates the configuration directory by fetching mappings from Elasticsearch. + +See also: [development instructions](./development.md) + +## Index mappings + +Index mappings are added by introspecting the Elasticsearch provided during update of the configuration directory. +These mappings are similar to what we get in [Elasticsearch's mappings API](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-mapping.html). + +## Native Queries + +Native Queries allow you to run custom DSL queries on your Elasticsearch. This enables you to run queries that are not supported by Hasura DDN's GraphQL engine. This unlocks the full power of your search-engine, allowing you to run complex queries all directly from your Hasura GraphQL API. + +In the internal section within the `dsl`, you have the option to write an Elasticsearch DSL query. Additionally, you can create a native query in a `.json` file located in your configuration directory, usually within a specific subdirectory. When specifying the query in the `dsl`, use the `file` option. + +Your file may only contain only a single JSON DSL query. + +Native Queries can take arguments using the `{{argument_name}}` syntax. Arguments must be specified along with their type. + +```json +{ + "query": { + "range": { + "age": { + "gte": "{{gte}}" + } + } + } +} +``` + +Then add the query to your `configuration.json` file. You'll need to determine the query return type. + +The return type can either be of kind `defination` or `index`. kind defination requires a `mappnigs` section in the return type where +you can define custom mappings for your returned documents (Logical Models). + +Set `kind` to `index` to use mappings of the existing index in the `indices` section of the configuration. + +```json +{ + "range_query": { + "dsl": { + "file": "native_queries/range.json" + }, + "index": "my_sample_index", + "return_type": { + "kind": "defination", + "mappings": { + "properties": { + "range": { + "age": { + "type": "integer" + } + } + } + } + }, + "arguments": { + "gte": { + "type": "integer" + }, + "lte": { + "type": "integer" + } + } + } +} +``` + +The CLI provides `validate` command to validate your configuration directory: + +```bash +ndc-elasticsearch validate --configuration=STRING +``` + +### Note on Configuration Structure Changes + +During this active development phase, the configuration structure may change. + +The CLI also provides an `upgrade` command to upgrade your configuration directory to be compatible with the latest connector version + +```bash +ndc-elasticsearch upgrade --dir-from=STRING --dir-to=STRING diff --git a/docs/index.md b/docs/index.md index 6d47a05..94e2ee6 100644 --- a/docs/index.md +++ b/docs/index.md @@ -4,6 +4,7 @@ Elasticsearch is a [Hasura](https://hasura.io/) Native Data Connector. - [Architecture](./architecture.md) - [Development](./development.md) +- [Configuration](./configuration.md) - [Code of Conduct](./code-of-conduct.md) - [Contributing](./contributing.md) - [Limitations](./limitations.md) diff --git a/docs/limitations.md b/docs/limitations.md index b67a00d..56646c7 100644 --- a/docs/limitations.md +++ b/docs/limitations.md @@ -2,8 +2,9 @@ ## Query -- Native Queries and logical models are not supported - Nested relationships are not supported +- Comparison by column value is currently not supported. +- Order by aggregate is currently not supported. ## Mutations diff --git a/go.mod b/go.mod index 0bf686f..9aebb47 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.21 require ( github.com/elastic/go-elasticsearch/v8 v8.13.0 - github.com/hasura/ndc-sdk-go v1.2.0 + github.com/hasura/ndc-sdk-go v1.2.1 go.opentelemetry.io/otel v1.27.0 go.opentelemetry.io/otel/trace v1.27.0 ) @@ -17,13 +17,13 @@ require ( github.com/elastic/elastic-transport-go/v8 v8.5.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect + github.com/go-viper/mapstructure/v2 v2.0.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect - github.com/prometheus/common v0.53.0 // indirect - github.com/prometheus/procfs v0.15.0 // indirect + github.com/prometheus/common v0.54.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect go.opentelemetry.io/contrib/propagators/b3 v1.27.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.27.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0 // indirect @@ -34,12 +34,12 @@ require ( go.opentelemetry.io/otel/metric v1.27.0 // indirect go.opentelemetry.io/otel/sdk v1.27.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.27.0 // indirect - go.opentelemetry.io/proto/otlp v1.2.0 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect + go.opentelemetry.io/proto/otlp v1.3.1 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240610135401-a8a62080eff3 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240610135401-a8a62080eff3 // indirect google.golang.org/grpc v1.64.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect + google.golang.org/protobuf v1.34.2 // indirect ) diff --git a/go.sum b/go.sum index 2a4e369..1764138 100644 --- a/go.sum +++ b/go.sum @@ -21,16 +21,16 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 h1:TQcrn6Wq+sKGkpyPvppOz99zsMBaUOKXq6HSv655U1c= -github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/go-viper/mapstructure/v2 v2.0.0 h1:dhn8MZ1gZ0mzeodTG3jt5Vj/o87xZKuNAprG2mQfMfc= +github.com/go-viper/mapstructure/v2 v2.0.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= -github.com/hasura/ndc-sdk-go v1.2.0 h1:7OrELJbn6Vaj1bsG7pgHkn3rhivD5e2gxjWS/4fR21Q= -github.com/hasura/ndc-sdk-go v1.2.0/go.mod h1:2P+xMUuLE4xgFh1bOYGK04+KwLEFH2i+bCy9a8cWnlM= +github.com/hasura/ndc-sdk-go v1.2.1 h1:pGpdS+2vGtaMQnKvbUB2ezdH+gj94uAkJ3OS9yxEwd4= +github.com/hasura/ndc-sdk-go v1.2.1/go.mod h1:e1t6tWlONuvrnM38QGRl1UfiM7CDskvXSdguu2IwNpE= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -39,10 +39,10 @@ github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQ github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= -github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+aLCE= -github.com/prometheus/common v0.53.0/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U= -github.com/prometheus/procfs v0.15.0 h1:A82kmvXJq2jTu5YUhSGNlYoxh85zLnKgPz4bMZgI5Ek= -github.com/prometheus/procfs v0.15.0/go.mod h1:Y0RJ/Y5g5wJpkTisOtqwDSo4HwhGmLB4VQSw2sQJLHk= +github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8= +github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.opentelemetry.io/contrib/propagators/b3 v1.27.0 h1:IjgxbomVrV9za6bRi8fWCNXENs0co37SZedQilP2hm0= @@ -69,23 +69,23 @@ go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2N go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw= go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= -go.opentelemetry.io/proto/otlp v1.2.0 h1:pVeZGk7nXDC9O2hncA6nHldxEjm6LByfA2aN8IOkz94= -go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naRm/KDsgaHD8A= +go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= +go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw= -google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +google.golang.org/genproto/googleapis/api v0.0.0-20240610135401-a8a62080eff3 h1:QW9+G6Fir4VcRXVH8x3LilNAb6cxBGLa6+GM4hRwexE= +google.golang.org/genproto/googleapis/api v0.0.0-20240610135401-a8a62080eff3/go.mod h1:kdrSS/OiLkPrNUpzD4aHgCq2rVuC/YRxok32HXZ4vRE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240610135401-a8a62080eff3 h1:9Xyg6I9IWQZhRVfCWjKK+l6kI0jHcPesVlMnT//aHNo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240610135401-a8a62080eff3/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= -google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= -google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/utils.go b/internal/utils.go index efca919..4238f6f 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -2,11 +2,15 @@ package internal import ( "encoding/json" + "fmt" "log" "math/rand" + "os" "reflect" "strings" "time" + + "github.com/hasura/ndc-elasticsearch/types" ) const ( @@ -103,3 +107,64 @@ func DeepEqual(v1, v2 any) bool { _ = json.Unmarshal(bytesB, &x2) return reflect.DeepEqual(x1, x2) } + +// validateFieldOperation checks if the given field is supported for the specified operation in the given collection. +// It returns the valid field name if it is supported, otherwise an empty string. +func ValidateFieldOperation(state *types.State, operation, collection, field string) string { + supportedFields := state.SupportedSortFields + if operation == "aggregate" { + supportedFields = state.SupportedAggregateFields + } + + supportedFieldsMap, ok := supportedFields[collection].(map[string]string) + if !ok { + return field + } + + validField, ok := supportedFieldsMap[field] + if !ok { + return "" + } + + return validField +} + +// ReadJsonFileUsingDecoder reads a JSON file using a decoder. +func ReadJsonFileUsingDecoder(filename string) (map[string]interface{}, error) { + file, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("failed to open file %s: %w", filename, err) + } + defer file.Close() + + var data map[string]interface{} + decoder := json.NewDecoder(file) + if err := decoder.Decode(&data); err != nil { + return nil, fmt.Errorf("failed to decode JSON for file %s: %w", filename, err) + } + + return data, nil +} + +// WriteJsonFile writes the given byte slice to a temporary file first +// and then renaming it to the destination to avoid partial writes in case of a failure. +func WriteJsonFile(filename string, data []byte) error { + file, err := os.Create(filename) + if err != nil { + return fmt.Errorf("failed to create file: %w", err) + } + defer file.Close() + + _, err = file.Write(data) + if err != nil { + return fmt.Errorf("failed to write to file: %w", err) + } + + return nil +} + +// FileExists checks whether a file exists and returns a boolean value. +func FileExists(filename string) bool { + _, err := os.Stat(filename) + return !os.IsNotExist(err) +} diff --git a/resources/configuration.json b/resources/configuration.json index 81c28e3..d09e6db 100644 --- a/resources/configuration.json +++ b/resources/configuration.json @@ -1,173 +1,176 @@ { - ".ds-kibana_sample_data_logs-*": { - "mappings": { - "_data_stream_timestamp": { - "enabled": true - }, - "properties": { - "@timestamp": { - "type": "date" - }, - "agent": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 + "indices": { + ".ds-kibana_sample_data_logs-*": { + "mappings": { + "_data_stream_timestamp": { + "enabled": true + }, + "properties": { + "@timestamp": { + "type": "date" + }, + "agent": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } } - } - }, - "bytes": { - "type": "long" - }, - "bytes_counter": { - "type": "long", - "time_series_metric": "counter" - }, - "bytes_gauge": { - "type": "long", - "time_series_metric": "gauge" - }, - "clientip": { - "type": "ip" - }, - "event": { - "properties": { - "dataset": { - "type": "keyword" + }, + "bytes": { + "type": "long" + }, + "bytes_counter": { + "type": "long", + "time_series_metric": "counter" + }, + "bytes_gauge": { + "type": "long", + "time_series_metric": "gauge" + }, + "clientip": { + "type": "ip" + }, + "event": { + "properties": { + "dataset": { + "type": "keyword" + } } - } - }, - "extension": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 + }, + "extension": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } } - } - }, - "geo": { - "properties": { - "coordinates": { - "type": "geo_point" - }, - "dest": { - "type": "keyword" - }, - "src": { - "type": "keyword" - }, - "srcdest": { - "type": "keyword" + }, + "geo": { + "properties": { + "coordinates": { + "type": "geo_point" + }, + "dest": { + "type": "keyword" + }, + "src": { + "type": "keyword" + }, + "srcdest": { + "type": "keyword" + } } - } - }, - "host": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 + }, + "host": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } } - } - }, - "index": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 + }, + "index": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } } - } - }, - "ip": { - "type": "ip" - }, - "ip_range": { - "type": "ip_range" - }, - "machine": { - "properties": { - "os": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 + }, + "ip": { + "type": "ip" + }, + "ip_range": { + "type": "ip_range" + }, + "machine": { + "properties": { + "os": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } } + }, + "ram": { + "type": "long" } - }, - "ram": { - "type": "long" } - } - }, - "memory": { - "type": "double" - }, - "message": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 + }, + "memory": { + "type": "double" + }, + "message": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } } - } - }, - "phpmemory": { - "type": "long" - }, - "referer": { - "type": "keyword" - }, - "request": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "time_series_dimension": true + }, + "phpmemory": { + "type": "long" + }, + "referer": { + "type": "keyword" + }, + "request": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "time_series_dimension": true + } } - } - }, - "response": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 + }, + "response": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } } - } - }, - "tags": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 + }, + "tags": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } } - } - }, - "timestamp": { - "type": "alias", - "path": "@timestamp" - }, - "timestamp_range": { - "type": "date_range" - }, - "url": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 + }, + "timestamp": { + "type": "alias", + "path": "@timestamp" + }, + "timestamp_range": { + "type": "date_range" + }, + "url": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } } + }, + "utc_time": { + "type": "date" } - }, - "utc_time": { - "type": "date" } } } - } + }, + "queries": {} } \ No newline at end of file diff --git a/resources/prometheus/prometheus.yaml b/resources/prometheus/prometheus.yaml index df89e8e..9ab66f6 100644 --- a/resources/prometheus/prometheus.yaml +++ b/resources/prometheus/prometheus.yaml @@ -10,7 +10,7 @@ alerting: timeout: 10s api_version: v1 scrape_configs: - - job_name: ndc-learn + - job_name: ndc-elasticsearch honor_timestamps: true scrape_interval: 15s scrape_timeout: 10s @@ -18,4 +18,4 @@ scrape_configs: scheme: http static_configs: - targets: - - connector:8080 \ No newline at end of file + - jaeger:14269 \ No newline at end of file diff --git a/testdata/capabilities.json b/testdata/capabilities.json index d61db21..c9eedc7 100644 --- a/testdata/capabilities.json +++ b/testdata/capabilities.json @@ -5,10 +5,11 @@ "aggregates": {}, "nested_fields": { "filter_by": {}, - "order_by": {} + "order_by": {}, + "aggregates": {} }, "variables": {} } }, - "version": "0.1.3" + "version": "0.1.4" } \ No newline at end of file diff --git a/testdata/configuration.json b/testdata/configuration.json index c6ed51f..3815c55 100644 --- a/testdata/configuration.json +++ b/testdata/configuration.json @@ -1,314 +1,373 @@ { - "kibana_sample_data_ecommerce": { - "mappings": { - "properties": { - "category": { - "fields": { - "keyword": { - "type": "keyword" - } + "indices": { + "kibana_sample_data_ecommerce": { + "mappings": { + "properties": { + "category": { + "fields": { + "keyword": { + "type": "keyword" + } + }, + "type": "text" }, - "type": "text" - }, - "currency": { - "type": "keyword" - }, - "customer_birth_date": { - "type": "date" - }, - "customer_first_name": { - "fields": { - "keyword": { - "ignore_above": 256, - "type": "keyword" - } + "currency": { + "type": "keyword" }, - "type": "text" - }, - "customer_full_name": { - "fields": { - "keyword": { - "ignore_above": 256, - "type": "keyword" - } + "customer_birth_date": { + "type": "date" }, - "type": "text" - }, - "customer_gender": { - "type": "keyword" - }, - "customer_id": { - "type": "keyword" - }, - "customer_last_name": { - "fields": { - "keyword": { - "ignore_above": 256, - "type": "keyword" - } - }, - "type": "text" - }, - "customer_phone": { - "type": "keyword" - }, - "day_of_week": { - "type": "keyword" - }, - "day_of_week_i": { - "type": "integer" - }, - "email": { - "type": "keyword" - }, - "event": { - "properties": { - "dataset": { - "type": "keyword" - } - } - }, - "geoip": { - "properties": { - "city_name": { - "type": "keyword" + "customer_first_name": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } }, - "continent_name": { - "type": "keyword" - }, - "country_iso_code": { - "type": "keyword" + "type": "text" + }, + "customer_full_name": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } }, - "location": { - "type": "geo_point" + "type": "text" + }, + "customer_gender": { + "type": "keyword" + }, + "customer_id": { + "type": "keyword" + }, + "customer_last_name": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } }, - "region_name": { - "type": "keyword" - } - } - }, - "manufacturer": { - "fields": { - "keyword": { - "type": "keyword" + "type": "text" + }, + "customer_phone": { + "type": "keyword" + }, + "day_of_week": { + "type": "keyword" + }, + "day_of_week_i": { + "type": "integer" + }, + "email": { + "type": "keyword" + }, + "event": { + "properties": { + "dataset": { + "type": "keyword" + } } }, - "type": "text" - }, - "order_date": { - "type": "date" - }, - "order_id": { - "type": "keyword" - }, - "products": { - "properties": { - "_id": { - "fields": { - "keyword": { - "ignore_above": 256, - "type": "keyword" - } + "geoip": { + "properties": { + "city_name": { + "type": "keyword" }, - "type": "text" - }, - "base_price": { - "type": "half_float" - }, - "base_unit_price": { - "type": "half_float" - }, - "category": { - "fields": { - "keyword": { - "type": "keyword" - } + "continent_name": { + "type": "keyword" }, - "type": "text" - }, - "created_on": { - "type": "date" - }, - "discount_amount": { - "type": "half_float" - }, - "discount_percentage": { - "type": "half_float" - }, - "manufacturer": { - "fields": { - "keyword": { - "type": "keyword" - } + "country_iso_code": { + "type": "keyword" }, - "type": "text" - }, - "min_price": { - "type": "half_float" - }, - "price": { - "type": "half_float" - }, - "product_id": { - "type": "long" - }, - "product_name": { - "analyzer": "english", - "fields": { - "keyword": { - "type": "keyword" - } + "location": { + "type": "geo_point" }, - "type": "text" - }, - "quantity": { - "type": "integer" - }, - "sku": { - "type": "keyword" - }, - "tax_amount": { - "type": "half_float" - }, - "taxful_price": { - "type": "half_float" - }, - "taxless_price": { - "type": "half_float" + "region_name": { + "type": "keyword" + } + } + }, + "manufacturer": { + "fields": { + "keyword": { + "type": "keyword" + } }, - "unit_discount_amount": { - "type": "half_float" + "type": "text" + }, + "order_date": { + "type": "date" + }, + "order_id": { + "type": "keyword" + }, + "products": { + "properties": { + "_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "base_price": { + "type": "half_float" + }, + "base_unit_price": { + "type": "half_float" + }, + "category": { + "fields": { + "keyword": { + "type": "keyword" + } + }, + "type": "text" + }, + "created_on": { + "type": "date" + }, + "discount_amount": { + "type": "half_float" + }, + "discount_percentage": { + "type": "half_float" + }, + "manufacturer": { + "fields": { + "keyword": { + "type": "keyword" + } + }, + "type": "text" + }, + "min_price": { + "type": "half_float" + }, + "price": { + "type": "half_float" + }, + "product_id": { + "type": "long" + }, + "product_name": { + "analyzer": "english", + "fields": { + "keyword": { + "type": "keyword" + } + }, + "type": "text" + }, + "quantity": { + "type": "integer" + }, + "sku": { + "type": "keyword" + }, + "tax_amount": { + "type": "half_float" + }, + "taxful_price": { + "type": "half_float" + }, + "taxless_price": { + "type": "half_float" + }, + "unit_discount_amount": { + "type": "half_float" + } } + }, + "sku": { + "type": "keyword" + }, + "taxful_total_price": { + "type": "half_float" + }, + "taxless_total_price": { + "type": "half_float" + }, + "total_quantity": { + "type": "integer" + }, + "total_unique_products": { + "type": "integer" + }, + "type": { + "type": "keyword" + }, + "user": { + "type": "keyword" } - }, - "sku": { - "type": "keyword" - }, - "taxful_total_price": { - "type": "half_float" - }, - "taxless_total_price": { - "type": "half_float" - }, - "total_quantity": { - "type": "integer" - }, - "total_unique_products": { - "type": "integer" - }, - "type": { - "type": "keyword" - }, - "user": { - "type": "keyword" } } - } - }, - "my_sample_index": { - "mappings": { - "properties": { - "age": { - "type": "integer" - }, - "custom_rank": { - "type": "rank_feature" - }, - "description": { - "type": "text" - }, - "driver": { - "properties": { - "last_name": { - "type": "text" - }, - "vehicle": { - "properties": { - "make": { - "type": "text" + }, + "my_sample_index": { + "mappings": { + "properties": { + "age": { + "type": "integer" + }, + "custom_rank": { + "type": "rank_feature" + }, + "description": { + "type": "text" + }, + "driver": { + "properties": { + "last_name": { + "type": "text" + }, + "vehicle": { + "properties": { + "make": { + "type": "text" + }, + "model": { + "fields": { + "keyword": { + "type": "keyword" + }, + "wildcard": { + "type": "wildcard" + } + }, + "type": "text" + } }, - "model": { - "fields": { - "keyword": { - "type": "keyword" + "type": "nested" + } + }, + "type": "nested" + }, + "file": { + "type": "binary" + }, + "ip_address": { + "type": "ip" + }, + "is_active": { + "type": "boolean" + }, + "join_date": { + "type": "date" + }, + "location": { + "type": "geo_point" + }, + "name": { + "fields": { + "keyword": { + "type": "keyword" + }, + "wildcard": { + "type": "wildcard" + } + }, + "type": "text" + }, + "profile": { + "properties": { + "address": { + "type": "text" + }, + "city": { + "type": "keyword" + }, + "contact": { + "properties": { + "email": { + "fields": { + "raw": { + "type": "text" + } }, - "wildcard": { - "type": "wildcard" - } + "type": "keyword" }, - "type": "text" + "phone": { + "fields": { + "wildcard": { + "type": "wildcard" + } + }, + "type": "keyword" + } } }, - "type": "nested" - } - }, - "type": "nested" - }, - "file": { - "type": "binary" - }, - "ip_address": { - "type": "ip" - }, - "is_active": { - "type": "boolean" - }, - "join_date": { - "type": "date" - }, - "location": { - "type": "geo_point" - }, - "name": { - "fields": { - "keyword": { - "type": "keyword" + "zipcode": { + "type": "integer" + } }, - "wildcard": { - "type": "wildcard" - } + "type": "nested" }, - "type": "text" - }, - "profile": { - "properties": { - "address": { - "type": "text" - }, - "city": { - "type": "keyword" - }, - "zipcode": { - "type": "integer" - } + "salary": { + "type": "float" }, - "type": "nested" - }, - "salary": { - "type": "float" - }, - "tags": { - "fields": { - "text": { - "type": "text" + "tags": { + "fields": { + "text": { + "type": "text" + }, + "wildcard": { + "type": "wildcard" + } }, - "wildcard": { - "type": "wildcard" - } + "type": "keyword" }, - "type": "keyword" - }, - "wildcard_field": { - "fields": { - "keyword": { - "type": "keyword" + "wildcard_field": { + "fields": { + "keyword": { + "type": "keyword" + }, + "text": { + "type": "text" + } }, - "text": { - "type": "text" + "type": "wildcard" + } + } + } + } + }, + "queries": { + "range_query": { + "dsl": { + "file": "native_queries/range.json" + }, + "index": "my_sample_index", + "return_type": { + "kind": "defination", + "mappings": { + "properties": { + "range": { + "age": { + "type": "integer" + } } - }, - "type": "wildcard" + } } + }, + "arguments": { + "gte": { + "type": "integer" + }, + "lte": { + "type": "integer" + } + } + }, + "aggregate_query": { + "dsl": { + "file": "native_queries/aggregate.json" + }, + "index": "kibana_sample_data_ecommerce", + "return_type": { + "kind": "index" } } } diff --git a/testdata/native_queries/aggregate.json b/testdata/native_queries/aggregate.json new file mode 100644 index 0000000..ef82f79 --- /dev/null +++ b/testdata/native_queries/aggregate.json @@ -0,0 +1,14 @@ +{ + "aggs": { + "price_outlier": { + "percentiles": { + "field": "products.base_price", + "percents": [ + 95, + 99, + 99.9 + ] + } + } + } +} \ No newline at end of file diff --git a/testdata/native_queries/range.json b/testdata/native_queries/range.json new file mode 100644 index 0000000..ff45542 --- /dev/null +++ b/testdata/native_queries/range.json @@ -0,0 +1,10 @@ +{ + "query": { + "range": { + "age": { + "gte": "{{gte}}", + "lte": "{{lte}}" + } + } + } +} \ No newline at end of file diff --git a/testdata/query/aggregation/native_query_request.json b/testdata/query/aggregation/native_query_request.json new file mode 100644 index 0000000..2f739d7 --- /dev/null +++ b/testdata/query/aggregation/native_query_request.json @@ -0,0 +1,22 @@ +{ + "collection": "aggregate_query", + "query": { + "aggregates": { + "sku_count": { + "type": "column_count", + "column": "sku", + "distinct": false + }, + "orders_total": { + "type": "single_column", + "function": "sum", + "column": "total_quantity" + }, + "orders_count": { + "type": "star_count" + } + } + }, + "arguments": {}, + "collection_relationships": {} +} \ No newline at end of file diff --git a/testdata/query/aggregation/native_query_response.json b/testdata/query/aggregation/native_query_response.json new file mode 100644 index 0000000..effc46b --- /dev/null +++ b/testdata/query/aggregation/native_query_response.json @@ -0,0 +1,16 @@ +[ + { + "aggregates": { + "orders_count": 4675, + "orders_total": 10091, + "price_outlier": { + "values": { + "95.0": 81.16773745888158, + "99.0": 128.32947368421006, + "99.9": 200 + } + }, + "sku_count": 4675 + } + } +] \ No newline at end of file diff --git a/testdata/query/aggregation/nested_type_request.json b/testdata/query/aggregation/nested_type_request.json new file mode 100644 index 0000000..51741f0 --- /dev/null +++ b/testdata/query/aggregation/nested_type_request.json @@ -0,0 +1,15 @@ +{ + "collection": "my_sample_index", + "arguments": {}, + "query": { + "aggregates": { + "totalEmail": { + "type": "single_column", + "function": "value_count", + "column": "profile", + "field_path": ["contact", "email"] + } + } + }, + "collection_relationships": {} +} \ No newline at end of file diff --git a/testdata/query/aggregation/nested_type_response.json b/testdata/query/aggregation/nested_type_response.json new file mode 100644 index 0000000..f4dc14a --- /dev/null +++ b/testdata/query/aggregation/nested_type_response.json @@ -0,0 +1,7 @@ +[ + { + "aggregates": { + "totalEmail": 2 + } + } + ] \ No newline at end of file diff --git a/testdata/query/aggregation/object_type_request.json b/testdata/query/aggregation/object_type_request.json new file mode 100644 index 0000000..cc86250 --- /dev/null +++ b/testdata/query/aggregation/object_type_request.json @@ -0,0 +1,15 @@ +{ + "collection": "kibana_sample_data_ecommerce", + "arguments": {}, + "query": { + "aggregates": { + "uniqueDatasets": { + "type": "column_count", + "column": "event", + "field_path": ["dataset"], + "distinct": true + } + } + }, + "collection_relationships": {} +} \ No newline at end of file diff --git a/testdata/query/aggregation/object_type_response.json b/testdata/query/aggregation/object_type_response.json new file mode 100644 index 0000000..969aca8 --- /dev/null +++ b/testdata/query/aggregation/object_type_response.json @@ -0,0 +1,7 @@ +[ + { + "aggregates": { + "uniqueDatasets": 1 + } + } + ] \ No newline at end of file diff --git a/testdata/query/filter/native_query_request.json b/testdata/query/filter/native_query_request.json new file mode 100644 index 0000000..a5a6e20 --- /dev/null +++ b/testdata/query/filter/native_query_request.json @@ -0,0 +1,55 @@ +{ + "collection": "range_query", + "query": { + "fields": { + "age": { + "type": "column", + "column": "age", + "fields": null + }, + "driver": { + "type": "column", + "column": "driver", + "fields": { + "type": "object", + "fields": { + "lastName": { + "type": "column", + "column": "last_name" + } + } + } + } + }, + "predicate": { + "type": "binary_comparison_operator", + "column": { + "type": "column", + "name": "driver", + "path": [], + "field_path": ["last_name"] + }, + "operator": "match", + "value": { + "type": "scalar", + "value": "Hudson" + } + } + }, + "arguments": { + "lte": { + "type": "literal", + "value": 25 + }, + "gte": { + "type": "variable", + "name": "$gte" + } + }, + "variables": [ + { + "$gte": 20 + } + ], + "collection_relationships": {} +} \ No newline at end of file diff --git a/testdata/query/filter/native_query_response.json b/testdata/query/filter/native_query_response.json new file mode 100644 index 0000000..c8e568e --- /dev/null +++ b/testdata/query/filter/native_query_response.json @@ -0,0 +1,14 @@ +[ + { + "rows": [ + { + "age": 25, + "driver": [ + { + "lastName": "Hudson" + } + ] + } + ] + } +] \ No newline at end of file diff --git a/testdata/schema.json b/testdata/schema.json index ce34fb6..59733f2 100644 --- a/testdata/schema.json +++ b/testdata/schema.json @@ -1,5 +1,18 @@ { "collections": [ + { + "arguments": {}, + "foreign_keys": {}, + "name": "kibana_sample_data_ecommerce", + "type": "kibana_sample_data_ecommerce", + "uniqueness_constraints": { + "kibana_sample_data_ecommerce_by_id": { + "unique_columns": [ + "_id" + ] + } + } + }, { "arguments": {}, "foreign_keys": {}, @@ -13,13 +26,39 @@ } } }, + { + "arguments": { + "gte": { + "type": { + "name": "integer", + "type": "named" + } + }, + "lte": { + "type": { + "name": "integer", + "type": "named" + } + } + }, + "foreign_keys": {}, + "name": "range_query", + "type": "range_query", + "uniqueness_constraints": { + "range_query_by_id": { + "unique_columns": [ + "_id" + ] + } + } + }, { "arguments": {}, "foreign_keys": {}, - "name": "kibana_sample_data_ecommerce", + "name": "aggregate_query", "type": "kibana_sample_data_ecommerce", "uniqueness_constraints": { - "kibana_sample_data_ecommerce_by_id": { + "aggregate_query_by_id": { "unique_columns": [ "_id" ] @@ -32,6 +71,22 @@ "alias": { "fields": {} }, + "contact": { + "fields": { + "email": { + "type": { + "name": "keyword", + "type": "named" + } + }, + "phone": { + "type": { + "name": "keyword", + "type": "named" + } + } + } + }, "date_range": { "fields": {} }, @@ -542,6 +597,15 @@ "type": "named" } }, + "contact": { + "type": { + "element_type": { + "name": "contact", + "type": "named" + }, + "type": "array" + } + }, "zipcode": { "type": { "name": "integer", @@ -550,6 +614,16 @@ } } }, + "range_query": { + "fields": { + "_id": { + "type": { + "name": "_id", + "type": "named" + } + } + } + }, "rank_feature": { "fields": {} }, diff --git a/types/types.go b/types/types.go index 87ea8d8..d4b8cf2 100644 --- a/types/types.go +++ b/types/types.go @@ -19,17 +19,41 @@ type State struct { } // Configuration contains required settings for the connector. -type Configuration map[string]interface{} +type Configuration struct { + Indices map[string]interface{} `json:"indices"` + Queries map[string]NativeQuery `json:"queries"` +} + +// NativeQuery contains the definition of the native query. +type NativeQuery struct { + DSL DSL `json:"dsl"` + Index string `json:"index"` + ReturnType *ReturnType `json:"return_type,omitempty"` + Arguments *map[string]interface{} `json:"arguments,omitempty"` +} + +// DSL contains the dsl query of the native query. +type DSL struct { + File *string `json:"file,omitempty"` + Internal *map[string]interface{} `json:"internal,omitempty"` +} + +// ReturnType contains the return type of the native query. +type ReturnType struct { + Kind string `json:"kind"` + Mappings *map[string]interface{} `json:"mappings,omitempty"` +} // PostProcessor is used to post process the query response. type PostProcessor struct { IsFields bool StarAggregates string - ColumnAggregate []string + ColumnAggregate map[string]bool IsIDSelected bool SelectedFields map[string]Field } +// Field is used to represent a field in the query response. type Field struct { Name string Fields map[string]Field From e8b53cd4ca061b84dde2339d9cd657760526e57e Mon Sep 17 00:00:00 2001 From: Darshan Lukhi Date: Thu, 20 Jun 2024 11:45:24 +0530 Subject: [PATCH 2/2] fix ports in readme --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index e1b85de..304112e 100644 --- a/README.md +++ b/README.md @@ -94,8 +94,8 @@ subgraph's `.env.my_subgraph` file. Each key is prefixed by the subgraph name, a connector. Ensure the port value matches what is published in your connector's docker compose file. ```env title="my_subgraph/.env.my_subgraph" -MY_SUBGRAPH_MY_ELASTIC_READ_URL=http://local.hasura.dev:8082 -MY_SUBGRAPH_MY_ELASTIC_WRITE_URL=http://local.hasura.dev:8082 +MY_SUBGRAPH_MY_ELASTIC_READ_URL=http://local.hasura.dev:8081 +MY_SUBGRAPH_MY_ELASTIC_WRITE_URL=http://local.hasura.dev:8081 ``` ### 5. Start the connector's docker compose