Skip to content

Commit

Permalink
Comparison library (#9)
Browse files Browse the repository at this point in the history
Benchmark comparison library

Signed-off-by: Raul Sevilla <rsevilla@redhat.com>

---------

Signed-off-by: Raul Sevilla <rsevilla@redhat.com>
  • Loading branch information
rsevilla87 authored May 25, 2023
1 parent 7865712 commit 6dbc71e
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 26 deletions.
123 changes: 123 additions & 0 deletions comparison/elastic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package comparison

import (
"encoding/json"
"fmt"
"strings"

elasticsearch "github.com/elastic/go-elasticsearch/v7"
)

type Comparator struct {
client elasticsearch.Client
index string
}

// NewComparator returns a new comparator for the given index and elasticsearch client
func NewComparator(client elasticsearch.Client, index string) Comparator {
return Comparator{
client: client,
index: index,
}
}

// Compare returns error if value does not meet the tolerance as
// compared with the field extracted from the given query
//
// Where field is the field we want to compare, query is the query string
// to use for the search, stat is the type of aggregation to compare with value
// and toleration is the percentaje difference tolerated, it can negative
// it returns an error when the field doesn't meet the tolerancy, and an
// informative message when it does
func (c *Comparator) Compare(field, query string, stat Stat, value float64, tolerancy int) (string, error) {
stats, err := c.queryStringStats(field, query)
var baseline float64
if err != nil {
return "", err
}
switch stat {
case Avg:
baseline = stats.Avg
case Max:
baseline = stats.Max
case Min:
baseline = stats.Min
case Sum:
baseline = stats.Sum
}
if tolerancy >= 0 {
baselineTolerancy := baseline * (100 - float64(tolerancy)) / 100
if value < baselineTolerancy {
return "", fmt.Errorf("with a tolerancy of %d%%: %.2f rps is %.2f%% lower than baseline: %.2f rps", tolerancy, value, 100-(value*100/baseline), baseline)
}
} else if tolerancy < 0 {
baselineTolerancy := baseline * (100 + float64(tolerancy)) / 100
if value > baselineTolerancy {
return "", fmt.Errorf("with a tolerancy of %d%%: %.2f is %.2f%% rps higher than baseline: %.2f rps", tolerancy, value, (value*100/baseline)-100, baseline)
}
}
return fmt.Sprintf("%2.f rps meets %d%% tolerancy against %.2f rps", value, tolerancy, baseline), nil
}

// queryStringStats perform a query of type query_string,to fetch the stats of a specific field
// this type of query accepts a simple query format similar to the kibana queries, i.e:
//
// {
// "aggs": {
// "stats": {
// "stats": {
// "field": "our_field"
// }
// }
// },
// "query": {
// "query_string": {
// "query": "uuid.keyword: our_uuid AND param1.keyword: value1"
// }
// },
// "size": 0
// }
func (c *Comparator) queryStringStats(field, query string) (stats, error) {
var response QueryStringResponse
var queryStringRequest map[string]interface{} = map[string]interface{}{
"size": 0,
"query": map[string]interface{}{
"query_string": map[string]interface{}{
"query": query,
},
},
"aggs": map[string]interface{}{
"stats": map[string]interface{}{
"stats": map[string]string{
"field": field,
},
},
},
}
queryStringRequestJSON, _ := json.Marshal(queryStringRequest)
res, err := c.client.Search(
c.client.Search.WithBody(strings.NewReader(string(queryStringRequestJSON))),
c.client.Search.WithIndex(c.index),
)
if err != nil {
return stats{}, err
}
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
return stats{}, fmt.Errorf("error parsing the response body: %s", err)
} else {
// Return the response status and error information.
return stats{}, fmt.Errorf("%s %s %s",
res.Status(),
e["error"].(map[string]interface{})["type"],
e["error"].(map[string]interface{})["reason"],
)
}
}
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
return stats{}, fmt.Errorf("error parsing the response body: %s", err)
}

return response.Aggregations.stats, nil
}
23 changes: 23 additions & 0 deletions comparison/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package comparison

type Stat string

const (
Min Stat = "min"
Max Stat = "max"
Avg Stat = "avg"
Sum Stat = "sum"
)

type QueryStringResponse struct {
Aggregations struct {
stats `json:"stats"`
} `json:"aggregations"`
}

type stats struct {
Min float64 `json:"min"`
Max float64 `json:"max"`
Avg float64 `json:"avg"`
Sum float64 `json:"sum"`
}
27 changes: 14 additions & 13 deletions indexers/elastic.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 The Kube-burner Authors.
// Copyright 2023 The go-commons Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,27 +36,29 @@ const elastic = "elastic"

// Elastic ElasticSearch instance
type Elastic struct {
client *elasticsearch.Client
index string
index string
}

// ESClient elasticsearch client instance
var ESClient *elasticsearch.Client

// Init function
func init() {
indexerMap[elastic] = &Elastic{}
}

// Returns new indexer for elastic search
func (esIndexer *Elastic) new(indexerConfig IndexerConfig) error {
esConfig := indexerConfig
if esConfig.Index == "" {
var err error
if indexerConfig.Index == "" {
return fmt.Errorf("index name not specified")
}
esIndex := strings.ToLower(esConfig.Index)
esIndex := strings.ToLower(indexerConfig.Index)
cfg := elasticsearch.Config{
Addresses: esConfig.Servers,
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: esConfig.InsecureSkipVerify}},
Addresses: indexerConfig.Servers,
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: indexerConfig.InsecureSkipVerify}},
}
ESClient, err := elasticsearch.NewClient(cfg)
ESClient, err = elasticsearch.NewClient(cfg)
if err != nil {
return fmt.Errorf("error creating the ES client: %s", err)
}
Expand All @@ -67,11 +69,10 @@ func (esIndexer *Elastic) new(indexerConfig IndexerConfig) error {
if r.StatusCode != 200 {
return fmt.Errorf("unexpected ES status code: %d", r.StatusCode)
}
esIndexer.client = ESClient
esIndexer.index = esIndex
r, _ = esIndexer.client.Indices.Exists([]string{esIndex})
r, _ = ESClient.Indices.Exists([]string{esIndex})
if r.IsError() {
r, _ = esIndexer.client.Indices.Create(esIndex)
r, _ = ESClient.Indices.Create(esIndex)
if r.IsError() {
return fmt.Errorf("error creating index %s on ES: %s", esIndex, r.String())
}
Expand All @@ -86,7 +87,7 @@ func (esIndexer *Elastic) Index(documents []interface{}, opts IndexingOpts) (str
indexerStats := make(map[string]int)
hasher := sha256.New()
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: esIndexer.client,
Client: ESClient,
Index: esIndexer.index,
FlushBytes: 5e+6,
NumWorkers: runtime.NumCPU(),
Expand Down
27 changes: 14 additions & 13 deletions indexers/opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (

const indexer = "opensearch"

// OSClient OpenSearch client instance
var OSClient *opensearch.Client

// OpenSearch OpenSearch instance
type OpenSearch struct {
client *opensearch.Client
index string
index string
}

// Init function
Expand All @@ -33,31 +35,30 @@ func init() {

// Returns new indexer for OpenSearch
func (OpenSearchIndexer *OpenSearch) new(indexerConfig IndexerConfig) error {
OpenSearchConfig := indexerConfig
if OpenSearchConfig.Index == "" {
var err error
if indexerConfig.Index == "" {
return fmt.Errorf("index name not specified")
}
OpenSearchIndex := strings.ToLower(OpenSearchConfig.Index)
OpenSearchIndex := strings.ToLower(indexerConfig.Index)
cfg := opensearch.Config{
Addresses: OpenSearchConfig.Servers,
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: OpenSearchConfig.InsecureSkipVerify}},
Addresses: indexerConfig.Servers,
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: indexerConfig.InsecureSkipVerify}},
}
OpenSearchClient, err := opensearch.NewClient(cfg)
OSClient, err = opensearch.NewClient(cfg)
if err != nil {
return fmt.Errorf("error creating the OpenSearch client: %s", err)
}
r, err := OpenSearchClient.Cluster.Health()
r, err := OSClient.Cluster.Health()
if err != nil {
return fmt.Errorf("OpenSearch health check failed: %s", err)
}
if r.StatusCode != 200 {
return fmt.Errorf("unexpected OpenSearch status code: %d", r.StatusCode)
}
OpenSearchIndexer.client = OpenSearchClient
OpenSearchIndexer.index = OpenSearchIndex
r, _ = OpenSearchIndexer.client.Indices.Exists([]string{OpenSearchIndex})
r, _ = OSClient.Indices.Exists([]string{OpenSearchIndex})
if r.IsError() {
r, _ = OpenSearchIndexer.client.Indices.Create(OpenSearchIndex)
r, _ = OSClient.Indices.Create(OpenSearchIndex)
if r.IsError() {
return fmt.Errorf("error creating index %s on OpenSearch: %s", OpenSearchIndex, r.String())
}
Expand All @@ -72,7 +73,7 @@ func (OpenSearchIndexer *OpenSearch) Index(documents []interface{}, opts Indexin
indexerStats := make(map[string]int)
hasher := sha256.New()
bi, err := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
Client: OpenSearchIndexer.client,
Client: OSClient,
Index: OpenSearchIndexer.index,
FlushBytes: 5e+6,
NumWorkers: runtime.NumCPU(),
Expand Down

0 comments on commit 6dbc71e

Please sign in to comment.