From b0f70b57a00347b11a5fb72f05f3e587a87579e4 Mon Sep 17 00:00:00 2001 From: Scott Sunarto Date: Thu, 27 Jun 2024 20:55:05 -0700 Subject: [PATCH] refactor: make internal constants private --- job.go | 4 ++-- jobqueue.go | 18 +++++++++--------- jobqueue_test.go | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/job.go b/job.go index c746768..2a59df0 100644 --- a/job.go +++ b/job.go @@ -5,7 +5,7 @@ import ( "time" ) -const JobDBKeyPrefix = "job-" +const jobDBKeyPrefix = "job-" // JobType is a user-defined job struct that is processed by the job queue. // The struct must implement the Process method. @@ -62,5 +62,5 @@ func (j *job[T]) Process() error { // dbKey BadgerDB iterates over keys in lexicographical order, so we need to make sure that the job ID // is strictly increasing to avoid queues being processed out of order. func (j *job[T]) dbKey() []byte { - return []byte(fmt.Sprintf("%s%d", JobDBKeyPrefix, j.ID)) + return []byte(fmt.Sprintf("%s%d", jobDBKeyPrefix, j.ID)) } diff --git a/jobqueue.go b/jobqueue.go index d318aec..5bae9da 100644 --- a/jobqueue.go +++ b/jobqueue.go @@ -24,11 +24,11 @@ const ( // TODO: Use complete status for archiving completed jobs? -var ErrJobChannelFull = errors.New("job channel is closed") +var errJobChannelFull = errors.New("job channel is closed") -const DefaultFetchInterval = 100 * time.Millisecond -const DefaultJobBufferSize = 1000 -const DefaultJobIDSequenceSize = 100 +const defaultFetchInterval = 100 * time.Millisecond +const defaultJobBufferSize = 1000 +const defaultJobIDSequenceSize = 100 type JobQueue[T JobType] struct { db *badger.DB @@ -65,9 +65,9 @@ func NewJobQueue[T JobType](dbPath string, name string, workers int, opts ...Opt logger: log.With().Str("module", "JobQueue").Str("jobName", name).Logger(), isJobIDInQueue: xsync.NewMapOf[uint64, bool](), - jobs: make(chan *job[T], DefaultJobBufferSize), + jobs: make(chan *job[T], defaultJobBufferSize), - fetchInterval: DefaultFetchInterval, + fetchInterval: defaultFetchInterval, } for _, opt := range opts { opt(jq) @@ -75,7 +75,7 @@ func NewJobQueue[T JobType](dbPath string, name string, workers int, opts ...Opt jq.logger.Info().Msg("Starting job queue") - jq.jobID, err = jq.db.GetSequence([]byte("nextJobID"), DefaultJobIDSequenceSize) + jq.jobID, err = jq.db.GetSequence([]byte("nextJobID"), defaultJobIDSequenceSize) if err != nil { return nil, fmt.Errorf("failed to start job id sequence: %w", err) } @@ -230,7 +230,7 @@ func (jq *JobQueue[T]) fetchJobs(ctx context.Context) error { //nolint:gocognit it := txn.NewIterator(opts) defer it.Close() - for it.Seek([]byte(JobDBKeyPrefix)); it.ValidForPrefix([]byte(JobDBKeyPrefix)); it.Next() { + for it.Seek([]byte(jobDBKeyPrefix)); it.ValidForPrefix([]byte(jobDBKeyPrefix)); it.Next() { item := it.Item() err := item.Value(func(v []byte) error { var job job[T] @@ -259,7 +259,7 @@ func (jq *JobQueue[T]) fetchJobs(ctx context.Context) error { //nolint:gocognit default: jq.logger.Warn().Uint64("JobID", job.ID).Msg("Found pending jobs, but job channel is full") - return ErrJobChannelFull + return errJobChannelFull } } diff --git a/jobqueue_test.go b/jobqueue_test.go index 9ec4038..07afae5 100644 --- a/jobqueue_test.go +++ b/jobqueue_test.go @@ -144,7 +144,7 @@ func TestJobQueue_Recovery(t *testing.T) { } func readJob(db *badger.DB, id uint64) ([]byte, error) { - return readKey(db, fmt.Sprintf("%s%d", JobDBKeyPrefix, id)) + return readKey(db, fmt.Sprintf("%s%d", jobDBKeyPrefix, id)) } func readKey(db *badger.DB, key string) ([]byte, error) {