Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove functionality for deleting inactive contacts #84

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,13 @@ func (d *Daemon) reportStats(includeLag bool) {
prev := d.prevStats[ix]

indexedInPeriod := stats.Indexed - prev.Indexed
deletedInPeriod := stats.Deleted - prev.Deleted
elapsedInPeriod := stats.Elapsed - prev.Elapsed
rateInPeriod := float64(0)
if indexedInPeriod > 0 && elapsedInPeriod > 0 {
rateInPeriod = float64(indexedInPeriod) / (float64(elapsedInPeriod) / float64(time.Second))
}

metrics[ix.Name()+"_indexed"] = float64(indexedInPeriod)
metrics[ix.Name()+"_deleted"] = float64(deletedInPeriod)
metrics[ix.Name()+"_rate"] = rateInPeriod

d.prevStats[ix] = stats
Expand Down
26 changes: 7 additions & 19 deletions indexers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@
// indexes a document
const indexCommand = `{ "index": { "_id": %d, "version": %d, "version_type": "external", "routing": %d} }`

// deletes a document
const deleteCommand = `{ "delete" : { "_id": %d, "version": %d, "version_type": "external", "routing": %d} }`

type Stats struct {
Indexed int64 // total number of documents indexed
Deleted int64 // total number of documents deleted
Elapsed time.Duration // total time spent actually indexing (excludes poll delay)
}

Expand Down Expand Up @@ -85,12 +81,11 @@
}

// records indexing activity and updates statistics
func (i *baseIndexer) recordActivity(indexed, deleted int, elapsed time.Duration) {
func (i *baseIndexer) recordActivity(indexed int, elapsed time.Duration) {
i.stats.Indexed += int64(indexed)
i.stats.Deleted += int64(deleted)
i.stats.Elapsed += elapsed

i.log().Info("completed indexing", "indexed", indexed, "deleted", deleted, "elapsed", elapsed)
i.log().Info("completed indexing", "indexed", indexed, "elapsed", elapsed)
}

// our response for figuring out the physical index for an alias
Expand Down Expand Up @@ -267,16 +262,16 @@
}

// indexes the batch of contacts
func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, int, error) {
func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) {
response := indexResponse{}
indexURL := fmt.Sprintf("%s/%s/_bulk", i.elasticURL, index)

_, err := utils.MakeJSONRequest(http.MethodPut, indexURL, batch, &response)
if err != nil {
return 0, 0, 0, err
return 0, 0, err

Check warning on line 271 in indexers/base.go

View check run for this annotation

Codecov / codecov/patch

indexers/base.go#L271

Added line #L271 was not covered by tests
}

createdCount, updatedCount, deletedCount, conflictedCount := 0, 0, 0, 0
createdCount, updatedCount, conflictedCount := 0, 0, 0

for _, item := range response.Items {
if item.Index.ID != "" {
Expand All @@ -290,21 +285,14 @@
} else {
slog.Error("error indexing document", "id", item.Index.ID, "status", item.Index.Status, "result", item.Index.Result)
}
} else if item.Delete.ID != "" {
slog.Debug("delete response", "id", item.Index.ID, "status", item.Index.Status)
if item.Delete.Status == 200 {
deletedCount++
} else if item.Delete.Status == 409 {
conflictedCount++
}
} else {
slog.Error("unparsed item in response")
}
}

slog.Debug("indexed batch", "created", createdCount, "updated", updatedCount, "deleted", deletedCount, "conflicted", conflictedCount)
slog.Debug("indexed batch", "created", createdCount, "updated", updatedCount, "conflicted", conflictedCount)

return createdCount, updatedCount, deletedCount, nil
return createdCount, updatedCount, nil
}

// our response for finding the last modified document
Expand Down
3 changes: 1 addition & 2 deletions indexers/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,9 @@ func assertIndexesWithPrefix(t *testing.T, cfg *indexer.Config, prefix string, e
assert.Equal(t, expected, actual)
}

func assertIndexerStats(t *testing.T, ix indexers.Indexer, expectedIndexed, expectedDeleted int64) {
func assertIndexerStats(t *testing.T, ix indexers.Indexer, expectedIndexed int64) {
actual := ix.Stats()
assert.Equal(t, expectedIndexed, actual.Indexed, "indexed mismatch")
assert.Equal(t, expectedDeleted, actual.Deleted, "deleted mismatch")
}

func elasticRequest(t *testing.T, cfg *indexer.Config, method, path string, data map[string]any) map[string]any {
Expand Down
34 changes: 11 additions & 23 deletions indexers/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error
}

const sqlSelectModifiedContacts = `
SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM (
SELECT org_id, id, modified_on, row_to_json(t) FROM (
SELECT
id,
org_id,
Expand All @@ -98,7 +98,6 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM (
language,
status,
ticket_count AS tickets,
is_active,
created_on,
modified_on,
last_seen_on,
Expand Down Expand Up @@ -143,20 +142,19 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM (
SELECT array_to_json(array_agg(DISTINCT fr.flow_id)) FROM flows_flowrun fr WHERE fr.contact_id = contacts_contact.id
) AS flow_history_ids
FROM contacts_contact
WHERE modified_on >= $1
WHERE modified_on >= $1 AND is_active
ORDER BY modified_on ASC
LIMIT 100000
) t;
`

// IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time
func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) error {
totalFetched, totalCreated, totalUpdated, totalDeleted := 0, 0, 0, 0
totalFetched, totalCreated, totalUpdated := 0, 0, 0

var modifiedOn time.Time
var contactJSON string
var id, orgID int64
var isActive bool

subBatch := &bytes.Buffer{}
start := time.Now()
Expand All @@ -166,20 +164,18 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
batchFetched := 0 // contacts fetched in this batch
batchCreated := 0 // contacts created in ES
batchUpdated := 0 // contacts updated in ES
batchDeleted := 0 // contacts deleted in ES
batchESTime := time.Duration(0) // time spent indexing for this batch

indexSubBatch := func(b *bytes.Buffer) error {
t := time.Now()
created, updated, deleted, err := i.indexBatch(index, b.Bytes())
created, updated, err := i.indexBatch(index, b.Bytes())
if err != nil {
return err
}

batchESTime += time.Since(t)
batchCreated += created
batchUpdated += updated
batchDeleted += deleted
b.Reset()
return nil
}
Expand All @@ -198,27 +194,20 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
defer rows.Close()

for rows.Next() {
err = rows.Scan(&orgID, &id, &modifiedOn, &isActive, &contactJSON)
err = rows.Scan(&orgID, &id, &modifiedOn, &contactJSON)
if err != nil {
return err
}

batchFetched++
lastModified = modifiedOn

if isActive {
i.log().Debug("modified contact", "id", id, "modifiedOn", modifiedOn, "contact", contactJSON)
i.log().Debug("modified contact", "id", id, "modifiedOn", modifiedOn, "contact", contactJSON)

subBatch.WriteString(fmt.Sprintf(indexCommand, id, modifiedOn.UnixNano(), orgID))
subBatch.WriteString("\n")
subBatch.WriteString(contactJSON)
subBatch.WriteString("\n")
} else {
i.log().Debug("deleted contact", "id", id, "modifiedOn", modifiedOn)

subBatch.WriteString(fmt.Sprintf(deleteCommand, id, modifiedOn.UnixNano(), orgID))
subBatch.WriteString("\n")
}
subBatch.WriteString(fmt.Sprintf(indexCommand, id, modifiedOn.UnixNano(), orgID))
subBatch.WriteString("\n")
subBatch.WriteString(contactJSON)
subBatch.WriteString("\n")

// write to elastic search in batches
if batchFetched%i.batchSize == 0 {
Expand All @@ -239,7 +228,6 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
totalFetched += batchFetched
totalCreated += batchCreated
totalUpdated += batchUpdated
totalDeleted += batchDeleted

totalTime := time.Since(start)
batchTime := time.Since(batchStart)
Expand All @@ -265,7 +253,7 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
log.Debug("indexed contact batch")
}

i.recordActivity(batchCreated+batchUpdated, batchDeleted, time.Since(batchStart))
i.recordActivity(batchCreated+batchUpdated, time.Since(batchStart))

// last modified stayed the same and we didn't add anything, seen it all, break out
if lastModified.Equal(queryModified) && batchCreated == 0 {
Expand Down
17 changes: 8 additions & 9 deletions indexers/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestContacts(t *testing.T) {
assert.NoError(t, err)
assert.WithinDuration(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), esModified, 0)

assertIndexerStats(t, ix1, 9, 0)
assertIndexerStats(t, ix1, 9)
assertIndexesWithPrefix(t, cfg, cfg.ContactsIndex, []string{expectedIndexName})

for _, tc := range contactQueryTests {
Expand All @@ -200,25 +200,24 @@ func TestContacts(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), lastModified.In(time.UTC))

// now make some contact changes, removing one contact, updating another
// now make some contact changes
_, err = db.Exec(`
DELETE FROM contacts_contactgroup_contacts WHERE id = 3;
UPDATE contacts_contact SET name = 'John Deer', modified_on = '2020-08-20 14:00:00+00' where id = 2;
UPDATE contacts_contact SET is_active = FALSE, modified_on = '2020-08-22 15:00:00+00' where id = 4;`)
UPDATE contacts_contact SET name = 'John Deer', modified_on = '2020-08-20 14:00:00+00' where id = 2;`)
require.NoError(t, err)

// and index again...
indexName, err = ix1.Index(db, false, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName, indexName) // same index used
assertIndexerStats(t, ix1, 10, 1)
assertIndexerStats(t, ix1, 10)

time.Sleep(1 * time.Second)

assertIndexesWithPrefix(t, cfg, cfg.ContactsIndex, []string{expectedIndexName})

// should only match new john, old john is gone
assertQuery(t, cfg, elastic.Match("name", "john"), []int64{2})
// still have two johns
assertQuery(t, cfg, elastic.Match("name", "john"), []int64{2, 4})

// 3 is no longer in our group
assertQuery(t, cfg, elastic.Match("group_ids", 4), []int64{1})
Expand All @@ -234,7 +233,7 @@ func TestContacts(t *testing.T) {
indexName2, err := ix2.Index(db, true, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName+"_1", indexName2) // new index used
assertIndexerStats(t, ix2, 8, 0)
assertIndexerStats(t, ix2, 9)

time.Sleep(1 * time.Second)

Expand All @@ -249,7 +248,7 @@ func TestContacts(t *testing.T) {
indexName3, err := ix3.Index(db, true, true)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName+"_2", indexName3) // new index used
assertIndexerStats(t, ix3, 8, 0)
assertIndexerStats(t, ix3, 9)

// check we cleaned up indexes besides the new one
assertIndexesWithPrefix(t, cfg, cfg.ContactsIndex, []string{expectedIndexName + "_2"})
Expand Down
Loading