Skip to content

Commit

Permalink
Merge pull request #1706 from Permify/ufuk/watchapi
Browse files Browse the repository at this point in the history
refactor: transaction fetching updated as bulk queries
  • Loading branch information
tolgaOzen authored Oct 21, 2024
2 parents 92321dd + f480205 commit fd01040
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- +goose NO TRANSACTION
-- +goose Up
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_attributes_txid ON attributes (tenant_id, created_tx_id, expired_tx_id);
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_relation_tuples_txid ON relation_tuples (tenant_id, created_tx_id, expired_tx_id);

-- +goose Down
DROP INDEX CONCURRENTLY IF EXISTS idx_attributes_txid;
DROP INDEX CONCURRENTLY IF EXISTS idx_relation_tuples_txid;
37 changes: 24 additions & 13 deletions internal/storage/postgres/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ func (w *Watch) Watch(ctx context.Context, tenantID, snap string) (<-chan *base.
changes := make(chan *base.DataChanges, w.database.GetWatchBufferSize())
errs := make(chan error, 1)

var sleep *time.Timer
const maxSleepDuration = 2 * time.Second
const defaultSleepDuration = 100 * time.Millisecond
sleepDuration := defaultSleepDuration

slog.DebugContext(ctx, "watching for changes in the database", slog.Any("tenant_id", tenantID), slog.Any("snapshot", snap))

// Decode the snapshot value.
Expand Down Expand Up @@ -91,38 +96,48 @@ func (w *Watch) Watch(ctx context.Context, tenantID, snap string) (<-chan *base.
updates, err := w.getChanges(ctx, id, tenantID)
if err != nil {
// If there is an error in getting the changes, send the error and return.

slog.ErrorContext(ctx, "failed to get changes for transaction", slog.Any("id", id), slog.Any("error", err))

errs <- err
return
}

// Send the changes, but respect the context cancellation.
select {
case changes <- updates: // Send updates to the changes channel.
slog.DebugContext(ctx, "sent updates to the changes channel for transaction", slog.Any("id", id))
case <-ctx.Done(): // If the context is done, send an error and return.
slog.ErrorContext(ctx, "context canceled, stopping watch")
errs <- errors.New(base.ErrorCode_ERROR_CODE_CANCELLED.String())
return
case changes <- updates: // Send updates to the changes channel.
slog.DebugContext(ctx, "sent updates to the changes channel for transaction", slog.Any("id", id))
}

// Update the transaction ID for the next round.
cr = id.Uint
sleepDuration = defaultSleepDuration
}

// If there are no recent transaction IDs, wait for a short period before trying again.
if len(recentIDs) == 0 {
sleep := time.NewTimer(100 * time.Millisecond)

if sleep == nil {
sleep = time.NewTimer(sleepDuration)
} else {
sleep.Reset(sleepDuration)
}

// Increase the sleep duration exponentially, but cap it at maxSleepDuration.
if sleepDuration < maxSleepDuration {
sleepDuration *= 2
} else {
sleepDuration = maxSleepDuration
}

select {
case <-sleep.C: // If the timer is done, continue the loop.
slog.DebugContext(ctx, "no recent transaction IDs, waiting for changes")
case <-ctx.Done(): // If the context is done, send an error and return.
slog.ErrorContext(ctx, "context canceled, stopping watch")
errs <- errors.New(base.ErrorCode_ERROR_CODE_CANCELLED.String())
return
case <-sleep.C: // If the timer is done, continue the loop.
slog.DebugContext(ctx, "no recent transaction IDs, waiting for changes")
}
}
}
Expand Down Expand Up @@ -174,9 +189,7 @@ func (w *Watch) getRecentXIDs(ctx context.Context, value uint64, tenantID string
// Execute the SQL query.
rows, err := w.database.ReadPool.Query(ctx, query, args...)
if err != nil {

slog.ErrorContext(ctx, "failed to execute sql query", slog.Any("error", err))

slog.ErrorContext(ctx, "failed to execute SQL query", slog.Any("error", err))
return nil, err
}
defer rows.Close()
Expand All @@ -187,9 +200,7 @@ func (w *Watch) getRecentXIDs(ctx context.Context, value uint64, tenantID string
var xid types.XID8
err := rows.Scan(&xid)
if err != nil {

slog.ErrorContext(ctx, "error while scanning row", slog.Any("error", err))

return nil, err
}
xids = append(xids, xid)
Expand Down

0 comments on commit fd01040

Please sign in to comment.