Skip to content

[ENHANCEMENT] Standardize Error Handling in Event Producer to Enable Retries and Filter Poison Pills #2124

@jcscottiii

Description

@jcscottiii

Description:
Currently, the ProcessSearch function in the Event Producer (workers/event_producer/pkg/producer/producer.go) only wraps errors from AcquireLock with event.ErrTransientFailure. All other errors (Database reads/writes, GCS operations, external API calls) are returned as standard errors.

The Problem:
The Pub/Sub subscriber adapter is configured to ACK (drop) any message where the handler returns a non-transient error. This means that a temporary network glitch (e.g., Spanner read timeout, GCS 503) will cause the notification job to be permanently lost instead of retried.

err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// Execute the worker's handler
workErr := handler(ctx, msg.Data)
if workErr == nil {
// ACK: Success
msg.Ack()
} else if errors.Is(workErr, event.ErrTransientFailure) {
// NACK: Retry later
msg.Nack()
} else {
// ACK: Permanent failure or unknown error, do not retry
slog.ErrorContext(ctx, "permanent failure", "error", workErr)
msg.Ack()
}
})

Conversely, simply wrapping all errors as transient would cause "poison pills" (e.g., invalid Saved Search IDs) to loop infinitely.

Objective:
We need to implement a robust error handling strategy that retries by default (to catch network issues) but fails fast on known invalid states.

Proposed Changes:

  1. Define Standard Errors: Create lib/workertypes/errors.go to define sentinel errors for adapters:
  • var ErrEntityNotFound = errors.New("entity not found")
  • var ErrInvalidData = errors.New("invalid data format")
  1. Update Adapters: Modify Spanner and GCS adapters to return these specific errors where appropriate (e.g., GetLatestEvent returning ErrEntityNotFound if the search ID is invalid).
  2. Update ProcessSearch Logic: Refactor error handling in ProcessSearch to follow this pattern:
  • Permanent Failure Check: If the error matches workertypes.ErrEntityNotFound or workertypes.ErrInvalidData, return it directly (triggering an ACK/Drop).
  • Transient Default: For all other errors (unknown or network-related), wrap them with event.ErrTransientFailure (triggering a NACK/Retry).

Acceptance Criteria:

  • Transient failures (network blips) in DB, Storage, or Differ now trigger a retry (NACK).
  • Permanent failures (missing search ID, bad data) trigger an immediate failure (ACK) and are not retried.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    Status

    No status

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions