diff --git a/go.mod b/go.mod index a2ad9f1..a854a0a 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/opensearch-project/opensearch-go v1.1.0 github.com/prometheus/client_golang v1.15.1 github.com/prometheus/common v0.44.0 + github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.8.2 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.28.4 diff --git a/go.sum b/go.sum index e73b387..18157ca 100644 --- a/go.sum +++ b/go.sum @@ -84,6 +84,8 @@ github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdO github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -123,6 +125,7 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/indexers/elastic.go b/indexers/elastic.go index 78ec9ce..160cb9b 100644 --- a/indexers/elastic.go +++ b/indexers/elastic.go @@ -30,6 +30,7 @@ import ( elasticsearch "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esutil" + log "github.com/sirupsen/logrus" ) // Elastic ElasticSearch instance @@ -100,14 +101,17 @@ func (esIndexer *Elastic) Index(documents []interface{}, opts IndexingOpts) (str for _, document := range documents { j, err := json.Marshal(document) if err != nil { - return "", fmt.Errorf("Cannot encode document %s: %s", document, err) + return "", fmt.Errorf("Cannot encode document %v: %s", document, err) } + hasher.Write(j) docId := hex.EncodeToString(hasher.Sum(nil)) if _, exists := docHash[docId]; exists { - redundantSkipped += 1 + log.Debugf("Skipping redundant document with ID: %s", docId) + redundantSkipped++ continue } + err = bi.Add( context.Background(), esutil.BulkIndexerItem{ @@ -119,11 +123,16 @@ func (esIndexer *Elastic) Index(documents []interface{}, opts IndexingOpts) (str defer indexerStatsLock.Unlock() indexerStats[biri.Result]++ }, + OnFailure: func(c context.Context, bii esutil.BulkIndexerItem, biri esutil.BulkIndexerResponseItem, err error) { + log.Infof("Failed to index document with ID %s: %s, error: %v", bii.DocumentID, biri.Error.Reason, err) + }, }, ) if err != nil { + log.Infof("Error adding document with ID %s: %s", docId, err) return "", fmt.Errorf("Unexpected ES indexing error: %s", err) } + docHash[docId] = true hasher.Reset() } diff --git a/indexers/factory_test.go b/indexers/factory_test.go index 136e3b2..ed13784 100644 --- a/indexers/factory_test.go +++ b/indexers/factory_test.go @@ -2,9 +2,9 @@ package indexers import ( "errors" + "log" "net/http" "net/http/httptest" - "log" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" diff --git a/indexers/opensearch.go b/indexers/opensearch.go index 4d4bf7d..39f7099 100644 --- a/indexers/opensearch.go +++ b/indexers/opensearch.go @@ -30,6 +30,7 @@ import ( opensearch "github.com/opensearch-project/opensearch-go" opensearchutil "github.com/opensearch-project/opensearch-go/opensearchutil" + log "github.com/sirupsen/logrus" ) // OSClient OpenSearch client instance @@ -100,14 +101,17 @@ func (OpenSearchIndexer *OpenSearch) Index(documents []interface{}, opts Indexin for _, document := range documents { j, err := json.Marshal(document) if err != nil { - return "", fmt.Errorf("Cannot encode document %s: %s", document, err) + return "", fmt.Errorf("Cannot encode document %v: %s", document, err) } + hasher.Write(j) docId := hex.EncodeToString(hasher.Sum(nil)) if _, exists := docHash[docId]; exists { - redundantSkipped += 1 + log.Debugf("Skipping redundant document: %s", docId) + redundantSkipped++ continue } + err = bi.Add( context.Background(), opensearchutil.BulkIndexerItem{ @@ -119,9 +123,13 @@ func (OpenSearchIndexer *OpenSearch) Index(documents []interface{}, opts Indexin defer indexerStatsLock.Unlock() indexerStats[biri.Result]++ }, + OnFailure: func(c context.Context, bii opensearchutil.BulkIndexerItem, beri opensearchutil.BulkIndexerResponseItem, err error) { + log.Infof("Failed to index document %s: %s, error: %v", bii.DocumentID, beri.Error.Reason, err) + }, }, ) if err != nil { + log.Infof("Error adding document with ID %s: %s", docId, err) return "", fmt.Errorf("Unexpected OpenSearch indexing error: %s", err) } docHash[docId] = true