diff --git a/daemon.go b/daemon.go index 2a3517d..1f41550 100644 --- a/daemon.go +++ b/daemon.go @@ -114,7 +114,6 @@ func (d *Daemon) reportStats(includeLag bool) { prev := d.prevStats[ix] indexedInPeriod := stats.Indexed - prev.Indexed - deletedInPeriod := stats.Deleted - prev.Deleted elapsedInPeriod := stats.Elapsed - prev.Elapsed rateInPeriod := float64(0) if indexedInPeriod > 0 && elapsedInPeriod > 0 { @@ -122,7 +121,6 @@ func (d *Daemon) reportStats(includeLag bool) { } metrics[ix.Name()+"_indexed"] = float64(indexedInPeriod) - metrics[ix.Name()+"_deleted"] = float64(deletedInPeriod) metrics[ix.Name()+"_rate"] = rateInPeriod d.prevStats[ix] = stats diff --git a/indexers/base.go b/indexers/base.go index 6737329..43f484b 100644 --- a/indexers/base.go +++ b/indexers/base.go @@ -18,12 +18,8 @@ import ( // indexes a document const indexCommand = `{ "index": { "_id": %d, "version": %d, "version_type": "external", "routing": %d} }` -// deletes a document -const deleteCommand = `{ "delete" : { "_id": %d, "version": %d, "version_type": "external", "routing": %d} }` - type Stats struct { Indexed int64 // total number of documents indexed - Deleted int64 // total number of documents deleted Elapsed time.Duration // total time spent actually indexing (excludes poll delay) } @@ -85,12 +81,11 @@ func (i *baseIndexer) log() *slog.Logger { } // records indexing activity and updates statistics -func (i *baseIndexer) recordActivity(indexed, deleted int, elapsed time.Duration) { +func (i *baseIndexer) recordActivity(indexed int, elapsed time.Duration) { i.stats.Indexed += int64(indexed) - i.stats.Deleted += int64(deleted) i.stats.Elapsed += elapsed - i.log().Info("completed indexing", "indexed", indexed, "deleted", deleted, "elapsed", elapsed) + i.log().Info("completed indexing", "indexed", indexed, "elapsed", elapsed) } // our response for figuring out the physical index for an alias @@ -267,16 +262,16 @@ type indexResponse struct { } // indexes the batch of contacts -func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, int, error) { +func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) { response := indexResponse{} indexURL := fmt.Sprintf("%s/%s/_bulk", i.elasticURL, index) _, err := utils.MakeJSONRequest(http.MethodPut, indexURL, batch, &response) if err != nil { - return 0, 0, 0, err + return 0, 0, err } - createdCount, updatedCount, deletedCount, conflictedCount := 0, 0, 0, 0 + createdCount, updatedCount, conflictedCount := 0, 0, 0 for _, item := range response.Items { if item.Index.ID != "" { @@ -290,21 +285,14 @@ func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, int, err } else { slog.Error("error indexing document", "id", item.Index.ID, "status", item.Index.Status, "result", item.Index.Result) } - } else if item.Delete.ID != "" { - slog.Debug("delete response", "id", item.Index.ID, "status", item.Index.Status) - if item.Delete.Status == 200 { - deletedCount++ - } else if item.Delete.Status == 409 { - conflictedCount++ - } } else { slog.Error("unparsed item in response") } } - slog.Debug("indexed batch", "created", createdCount, "updated", updatedCount, "deleted", deletedCount, "conflicted", conflictedCount) + slog.Debug("indexed batch", "created", createdCount, "updated", updatedCount, "conflicted", conflictedCount) - return createdCount, updatedCount, deletedCount, nil + return createdCount, updatedCount, nil } // our response for finding the last modified document diff --git a/indexers/base_test.go b/indexers/base_test.go index b63882f..0b0bf8c 100644 --- a/indexers/base_test.go +++ b/indexers/base_test.go @@ -78,10 +78,9 @@ func assertIndexesWithPrefix(t *testing.T, cfg *indexer.Config, prefix string, e assert.Equal(t, expected, actual) } -func assertIndexerStats(t *testing.T, ix indexers.Indexer, expectedIndexed, expectedDeleted int64) { +func assertIndexerStats(t *testing.T, ix indexers.Indexer, expectedIndexed int64) { actual := ix.Stats() assert.Equal(t, expectedIndexed, actual.Indexed, "indexed mismatch") - assert.Equal(t, expectedDeleted, actual.Deleted, "deleted mismatch") } func elasticRequest(t *testing.T, cfg *indexer.Config, method, path string, data map[string]any) map[string]any { diff --git a/indexers/contacts.go b/indexers/contacts.go index e15520d..c3a6c25 100644 --- a/indexers/contacts.go +++ b/indexers/contacts.go @@ -89,7 +89,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error } const sqlSelectModifiedContacts = ` -SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM ( +SELECT org_id, id, modified_on, row_to_json(t) FROM ( SELECT id, org_id, @@ -98,7 +98,6 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM ( language, status, ticket_count AS tickets, - is_active, created_on, modified_on, last_seen_on, @@ -143,7 +142,7 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM ( SELECT array_to_json(array_agg(DISTINCT fr.flow_id)) FROM flows_flowrun fr WHERE fr.contact_id = contacts_contact.id ) AS flow_history_ids FROM contacts_contact - WHERE modified_on >= $1 + WHERE modified_on >= $1 AND is_active ORDER BY modified_on ASC LIMIT 100000 ) t; @@ -151,12 +150,11 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM ( // IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) error { - totalFetched, totalCreated, totalUpdated, totalDeleted := 0, 0, 0, 0 + totalFetched, totalCreated, totalUpdated := 0, 0, 0 var modifiedOn time.Time var contactJSON string var id, orgID int64 - var isActive bool subBatch := &bytes.Buffer{} start := time.Now() @@ -166,12 +164,11 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st batchFetched := 0 // contacts fetched in this batch batchCreated := 0 // contacts created in ES batchUpdated := 0 // contacts updated in ES - batchDeleted := 0 // contacts deleted in ES batchESTime := time.Duration(0) // time spent indexing for this batch indexSubBatch := func(b *bytes.Buffer) error { t := time.Now() - created, updated, deleted, err := i.indexBatch(index, b.Bytes()) + created, updated, err := i.indexBatch(index, b.Bytes()) if err != nil { return err } @@ -179,7 +176,6 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st batchESTime += time.Since(t) batchCreated += created batchUpdated += updated - batchDeleted += deleted b.Reset() return nil } @@ -198,7 +194,7 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st defer rows.Close() for rows.Next() { - err = rows.Scan(&orgID, &id, &modifiedOn, &isActive, &contactJSON) + err = rows.Scan(&orgID, &id, &modifiedOn, &contactJSON) if err != nil { return err } @@ -206,19 +202,12 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st batchFetched++ lastModified = modifiedOn - if isActive { - i.log().Debug("modified contact", "id", id, "modifiedOn", modifiedOn, "contact", contactJSON) + i.log().Debug("modified contact", "id", id, "modifiedOn", modifiedOn, "contact", contactJSON) - subBatch.WriteString(fmt.Sprintf(indexCommand, id, modifiedOn.UnixNano(), orgID)) - subBatch.WriteString("\n") - subBatch.WriteString(contactJSON) - subBatch.WriteString("\n") - } else { - i.log().Debug("deleted contact", "id", id, "modifiedOn", modifiedOn) - - subBatch.WriteString(fmt.Sprintf(deleteCommand, id, modifiedOn.UnixNano(), orgID)) - subBatch.WriteString("\n") - } + subBatch.WriteString(fmt.Sprintf(indexCommand, id, modifiedOn.UnixNano(), orgID)) + subBatch.WriteString("\n") + subBatch.WriteString(contactJSON) + subBatch.WriteString("\n") // write to elastic search in batches if batchFetched%i.batchSize == 0 { @@ -239,7 +228,6 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st totalFetched += batchFetched totalCreated += batchCreated totalUpdated += batchUpdated - totalDeleted += batchDeleted totalTime := time.Since(start) batchTime := time.Since(batchStart) @@ -265,7 +253,7 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st log.Debug("indexed contact batch") } - i.recordActivity(batchCreated+batchUpdated, batchDeleted, time.Since(batchStart)) + i.recordActivity(batchCreated+batchUpdated, time.Since(batchStart)) // last modified stayed the same and we didn't add anything, seen it all, break out if lastModified.Equal(queryModified) && batchCreated == 0 { diff --git a/indexers/contacts_test.go b/indexers/contacts_test.go index bcc9943..1fc0da4 100644 --- a/indexers/contacts_test.go +++ b/indexers/contacts_test.go @@ -189,7 +189,7 @@ func TestContacts(t *testing.T) { assert.NoError(t, err) assert.WithinDuration(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), esModified, 0) - assertIndexerStats(t, ix1, 9, 0) + assertIndexerStats(t, ix1, 9) assertIndexesWithPrefix(t, cfg, cfg.ContactsIndex, []string{expectedIndexName}) for _, tc := range contactQueryTests { @@ -200,25 +200,24 @@ func TestContacts(t *testing.T) { assert.NoError(t, err) assert.Equal(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), lastModified.In(time.UTC)) - // now make some contact changes, removing one contact, updating another + // now make some contact changes _, err = db.Exec(` DELETE FROM contacts_contactgroup_contacts WHERE id = 3; - UPDATE contacts_contact SET name = 'John Deer', modified_on = '2020-08-20 14:00:00+00' where id = 2; - UPDATE contacts_contact SET is_active = FALSE, modified_on = '2020-08-22 15:00:00+00' where id = 4;`) + UPDATE contacts_contact SET name = 'John Deer', modified_on = '2020-08-20 14:00:00+00' where id = 2;`) require.NoError(t, err) // and index again... indexName, err = ix1.Index(db, false, false) assert.NoError(t, err) assert.Equal(t, expectedIndexName, indexName) // same index used - assertIndexerStats(t, ix1, 10, 1) + assertIndexerStats(t, ix1, 10) time.Sleep(1 * time.Second) assertIndexesWithPrefix(t, cfg, cfg.ContactsIndex, []string{expectedIndexName}) - // should only match new john, old john is gone - assertQuery(t, cfg, elastic.Match("name", "john"), []int64{2}) + // still have two johns + assertQuery(t, cfg, elastic.Match("name", "john"), []int64{2, 4}) // 3 is no longer in our group assertQuery(t, cfg, elastic.Match("group_ids", 4), []int64{1}) @@ -234,7 +233,7 @@ func TestContacts(t *testing.T) { indexName2, err := ix2.Index(db, true, false) assert.NoError(t, err) assert.Equal(t, expectedIndexName+"_1", indexName2) // new index used - assertIndexerStats(t, ix2, 8, 0) + assertIndexerStats(t, ix2, 9) time.Sleep(1 * time.Second) @@ -249,7 +248,7 @@ func TestContacts(t *testing.T) { indexName3, err := ix3.Index(db, true, true) assert.NoError(t, err) assert.Equal(t, expectedIndexName+"_2", indexName3) // new index used - assertIndexerStats(t, ix3, 8, 0) + assertIndexerStats(t, ix3, 9) // check we cleaned up indexes besides the new one assertIndexesWithPrefix(t, cfg, cfg.ContactsIndex, []string{expectedIndexName + "_2"})