Skip to content

Commit

Permalink
Merge branch 'main' into opensearch
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Jun 10, 2024
2 parents ee99ea5 + 2f150c0 commit 836311b
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 24 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
v9.1.9 (2024-06-10)
-------------------------
* Record stats inside indexing batch loop
* Split up created vs updated in progress logging
* Add track_total_hits to GetESLastModified

v9.1.8 (2024-06-05)
-------------------------
* Update github actions versions
* Add healthcheck for elastic service in CI tests
* Update goreleaser config to v2

v9.1.7 (2024-06-05)
-------------------------
* Remove multi-search-db CI testing because it's unreliable
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Indexer
# 🗃️ Indexer

[![Build Status](https://github.com/nyaruka/rp-indexer/workflows/CI/badge.svg)](https://github.com/nyaruka/rp-indexer/actions?query=workflow%3ACI)
[![codecov](https://codecov.io/gh/nyaruka/rp-indexer/branch/main/graph/badge.svg)](https://codecov.io/gh/nyaruka/rp-indexer)
Expand Down
25 changes: 15 additions & 10 deletions indexers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const deleteCommand = `{ "delete" : { "_id": %d, "version": %d, "version_type":
type Stats struct {
Indexed int64 // total number of documents indexed
Deleted int64 // total number of documents deleted
Elapsed time.Duration // total time spent actually indexing
Elapsed time.Duration // total time spent actually indexing (excludes poll delay)
}

// Indexer is base interface for indexers
Expand Down Expand Up @@ -84,8 +84,8 @@ func (i *baseIndexer) log() *slog.Logger {
return slog.With("indexer", i.name)
}

// records a complete index and updates statistics
func (i *baseIndexer) recordComplete(indexed, deleted int, elapsed time.Duration) {
// records indexing activity and updates statistics
func (i *baseIndexer) recordActivity(indexed, deleted int, elapsed time.Duration) {
i.stats.Indexed += int64(indexed)
i.stats.Deleted += int64(deleted)
i.stats.Elapsed += elapsed
Expand Down Expand Up @@ -267,20 +267,23 @@ type indexResponse struct {
}

// indexes the batch of contacts
func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) {
func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, int, error) {
response := indexResponse{}
indexURL := fmt.Sprintf("%s/%s/_bulk", i.elasticURL, index)

_, err := utils.MakeJSONRequest(http.MethodPut, indexURL, batch, &response)
if err != nil {
return 0, 0, err
return 0, 0, 0, err
}

createdCount, deletedCount, conflictedCount := 0, 0, 0
createdCount, updatedCount, deletedCount, conflictedCount := 0, 0, 0, 0

for _, item := range response.Items {
if item.Index.ID != "" {
slog.Debug("index response", "id", item.Index.ID, "status", item.Index.Status)
if item.Index.Status == 200 || item.Index.Status == 201 {
if item.Index.Status == 200 {
updatedCount++
} else if item.Index.Status == 201 {
createdCount++
} else if item.Index.Status == 409 {
conflictedCount++
Expand All @@ -298,8 +301,10 @@ func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) {
slog.Error("unparsed item in response")
}
}
slog.Debug("indexed batch", "created", createdCount, "deleted", deletedCount, "conflicted", conflictedCount)
return createdCount, deletedCount, nil

slog.Debug("indexed batch", "created", createdCount, "updated", updatedCount, "deleted", deletedCount, "conflicted", conflictedCount)

return createdCount, updatedCount, deletedCount, nil
}

// our response for finding the last modified document
Expand All @@ -326,7 +331,7 @@ func (i *baseIndexer) GetESLastModified(index string) (time.Time, error) {
_, err := utils.MakeJSONRequest(
http.MethodPost,
fmt.Sprintf("%s/%s/_search", i.elasticURL, index),
[]byte(`{ "sort": [{ "modified_on_mu": "desc" }], "_source": {"includes": ["modified_on", "id"]}, "size": 1}`),
[]byte(`{ "sort": [{ "modified_on_mu": "desc" }], "_source": {"includes": ["modified_on", "id"]}, "size": 1, "track_total_hits": false}`),
queryResponse,
)
if err != nil {
Expand Down
30 changes: 17 additions & 13 deletions indexers/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,11 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error
i.log().Debug("indexing newer than last modified", "index", physicalIndex, "last_modified", lastModified)

// now index our docs
start := time.Now()
indexed, deleted, err := i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
err = i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
if err != nil {
return "", fmt.Errorf("error indexing documents: %w", err)
}

i.recordComplete(indexed, deleted, time.Since(start))

// if the index didn't previously exist or we are rebuilding, remap to our alias
if remapAlias {
err := i.updateAlias(physicalIndex)
Expand Down Expand Up @@ -153,8 +150,8 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM (
`

// IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time
func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) (int, int, error) {
totalFetched, totalCreated, totalDeleted := 0, 0, 0
func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) error {
totalFetched, totalCreated, totalUpdated, totalDeleted := 0, 0, 0, 0

var modifiedOn time.Time
var contactJSON string
Expand All @@ -168,18 +165,20 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
batchStart := time.Now() // start time for this batch
batchFetched := 0 // contacts fetched in this batch
batchCreated := 0 // contacts created in ES
batchUpdated := 0 // contacts updated in ES
batchDeleted := 0 // contacts deleted in ES
batchESTime := time.Duration(0) // time spent indexing for this batch

indexSubBatch := func(b *bytes.Buffer) error {
t := time.Now()
created, deleted, err := i.indexBatch(index, b.Bytes())
created, updated, deleted, err := i.indexBatch(index, b.Bytes())
if err != nil {
return err
}

batchESTime += time.Since(t)
batchCreated += created
batchUpdated += updated
batchDeleted += deleted
b.Reset()
return nil
Expand All @@ -191,17 +190,17 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st

// no more rows? return
if err == sql.ErrNoRows {
return 0, 0, nil
return nil
}
if err != nil {
return 0, 0, err
return err
}
defer rows.Close()

for rows.Next() {
err = rows.Scan(&orgID, &id, &modifiedOn, &isActive, &contactJSON)
if err != nil {
return 0, 0, err
return err
}

batchFetched++
Expand All @@ -224,21 +223,22 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
// write to elastic search in batches
if batchFetched%i.batchSize == 0 {
if err := indexSubBatch(subBatch); err != nil {
return 0, 0, err
return err
}
}
}

if subBatch.Len() > 0 {
if err := indexSubBatch(subBatch); err != nil {
return 0, 0, err
return err
}
}

rows.Close()

totalFetched += batchFetched
totalCreated += batchCreated
totalUpdated += batchUpdated
totalDeleted += batchDeleted

totalTime := time.Since(start)
Expand All @@ -249,10 +249,12 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
"rate", batchRate,
"batch_fetched", batchFetched,
"batch_created", batchCreated,
"batch_updated", batchUpdated,
"batch_elapsed", batchTime,
"batch_elapsed_es", batchESTime,
"total_fetched", totalFetched,
"total_created", totalCreated,
"total_updated", totalUpdated,
"total_elapsed", totalTime,
)

Expand All @@ -263,13 +265,15 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
log.Debug("indexed contact batch")
}

i.recordActivity(batchCreated+batchUpdated, batchDeleted, time.Since(batchStart))

// last modified stayed the same and we didn't add anything, seen it all, break out
if lastModified.Equal(queryModified) && batchCreated == 0 {
break
}
}

return totalCreated, totalDeleted, nil
return nil
}

func (i *ContactIndexer) GetDBLastModified(ctx context.Context, db *sql.DB) (time.Time, error) {
Expand Down

0 comments on commit 836311b

Please sign in to comment.