Skip to content

Commit

Permalink
Merge pull request #48 from vishnuchalla/indexer-logs
Browse files Browse the repository at this point in the history
Adding verbose logging for indexing failures
  • Loading branch information
vishnuchalla authored Oct 26, 2024
2 parents 35a376f + 40573be commit 0e47012
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 5 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/opensearch-project/opensearch-go v1.1.0
github.com/prometheus/client_golang v1.15.1
github.com/prometheus/common v0.44.0
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.8.2
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.28.4
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdO
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down Expand Up @@ -123,6 +125,7 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
13 changes: 11 additions & 2 deletions indexers/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

elasticsearch "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esutil"
log "github.com/sirupsen/logrus"
)

// Elastic ElasticSearch instance
Expand Down Expand Up @@ -100,14 +101,17 @@ func (esIndexer *Elastic) Index(documents []interface{}, opts IndexingOpts) (str
for _, document := range documents {
j, err := json.Marshal(document)
if err != nil {
return "", fmt.Errorf("Cannot encode document %s: %s", document, err)
return "", fmt.Errorf("Cannot encode document %v: %s", document, err)
}

hasher.Write(j)
docId := hex.EncodeToString(hasher.Sum(nil))
if _, exists := docHash[docId]; exists {
redundantSkipped += 1
log.Debugf("Skipping redundant document with ID: %s", docId)
redundantSkipped++
continue
}

err = bi.Add(
context.Background(),
esutil.BulkIndexerItem{
Expand All @@ -119,11 +123,16 @@ func (esIndexer *Elastic) Index(documents []interface{}, opts IndexingOpts) (str
defer indexerStatsLock.Unlock()
indexerStats[biri.Result]++
},
OnFailure: func(c context.Context, bii esutil.BulkIndexerItem, biri esutil.BulkIndexerResponseItem, err error) {
log.Infof("Failed to index document with ID %s: %s, error: %v", bii.DocumentID, biri.Error.Reason, err)
},
},
)
if err != nil {
log.Infof("Error adding document with ID %s: %s", docId, err)
return "", fmt.Errorf("Unexpected ES indexing error: %s", err)
}

docHash[docId] = true
hasher.Reset()
}
Expand Down
2 changes: 1 addition & 1 deletion indexers/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package indexers

import (
"errors"
"log"
"net/http"
"net/http/httptest"
"log"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down
12 changes: 10 additions & 2 deletions indexers/opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

opensearch "github.com/opensearch-project/opensearch-go"
opensearchutil "github.com/opensearch-project/opensearch-go/opensearchutil"
log "github.com/sirupsen/logrus"
)

// OSClient OpenSearch client instance
Expand Down Expand Up @@ -100,14 +101,17 @@ func (OpenSearchIndexer *OpenSearch) Index(documents []interface{}, opts Indexin
for _, document := range documents {
j, err := json.Marshal(document)
if err != nil {
return "", fmt.Errorf("Cannot encode document %s: %s", document, err)
return "", fmt.Errorf("Cannot encode document %v: %s", document, err)
}

hasher.Write(j)
docId := hex.EncodeToString(hasher.Sum(nil))
if _, exists := docHash[docId]; exists {
redundantSkipped += 1
log.Debugf("Skipping redundant document: %s", docId)
redundantSkipped++
continue
}

err = bi.Add(
context.Background(),
opensearchutil.BulkIndexerItem{
Expand All @@ -119,9 +123,13 @@ func (OpenSearchIndexer *OpenSearch) Index(documents []interface{}, opts Indexin
defer indexerStatsLock.Unlock()
indexerStats[biri.Result]++
},
OnFailure: func(c context.Context, bii opensearchutil.BulkIndexerItem, beri opensearchutil.BulkIndexerResponseItem, err error) {
log.Infof("Failed to index document %s: %s, error: %v", bii.DocumentID, beri.Error.Reason, err)
},
},
)
if err != nil {
log.Infof("Error adding document with ID %s: %s", docId, err)
return "", fmt.Errorf("Unexpected OpenSearch indexing error: %s", err)
}
docHash[docId] = true
Expand Down

0 comments on commit 0e47012

Please sign in to comment.