Skip to content

Commit

Permalink
refactor: make internal constants private
Browse files Browse the repository at this point in the history
  • Loading branch information
smsunarto committed Jun 28, 2024
1 parent 0552300 commit b0f70b5
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 12 deletions.
4 changes: 2 additions & 2 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"time"
)

const JobDBKeyPrefix = "job-"
const jobDBKeyPrefix = "job-"

// JobType is a user-defined job struct that is processed by the job queue.
// The struct must implement the Process method.
Expand Down Expand Up @@ -62,5 +62,5 @@ func (j *job[T]) Process() error {
// dbKey BadgerDB iterates over keys in lexicographical order, so we need to make sure that the job ID
// is strictly increasing to avoid queues being processed out of order.
func (j *job[T]) dbKey() []byte {
return []byte(fmt.Sprintf("%s%d", JobDBKeyPrefix, j.ID))
return []byte(fmt.Sprintf("%s%d", jobDBKeyPrefix, j.ID))
}
18 changes: 9 additions & 9 deletions jobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ const (

// TODO: Use complete status for archiving completed jobs?

var ErrJobChannelFull = errors.New("job channel is closed")
var errJobChannelFull = errors.New("job channel is closed")

const DefaultFetchInterval = 100 * time.Millisecond
const DefaultJobBufferSize = 1000
const DefaultJobIDSequenceSize = 100
const defaultFetchInterval = 100 * time.Millisecond
const defaultJobBufferSize = 1000
const defaultJobIDSequenceSize = 100

type JobQueue[T JobType] struct {
db *badger.DB
Expand Down Expand Up @@ -65,17 +65,17 @@ func NewJobQueue[T JobType](dbPath string, name string, workers int, opts ...Opt
logger: log.With().Str("module", "JobQueue").Str("jobName", name).Logger(),

isJobIDInQueue: xsync.NewMapOf[uint64, bool](),
jobs: make(chan *job[T], DefaultJobBufferSize),
jobs: make(chan *job[T], defaultJobBufferSize),

fetchInterval: DefaultFetchInterval,
fetchInterval: defaultFetchInterval,
}
for _, opt := range opts {
opt(jq)
}

jq.logger.Info().Msg("Starting job queue")

jq.jobID, err = jq.db.GetSequence([]byte("nextJobID"), DefaultJobIDSequenceSize)
jq.jobID, err = jq.db.GetSequence([]byte("nextJobID"), defaultJobIDSequenceSize)
if err != nil {
return nil, fmt.Errorf("failed to start job id sequence: %w", err)
}
Expand Down Expand Up @@ -230,7 +230,7 @@ func (jq *JobQueue[T]) fetchJobs(ctx context.Context) error { //nolint:gocognit
it := txn.NewIterator(opts)
defer it.Close()

for it.Seek([]byte(JobDBKeyPrefix)); it.ValidForPrefix([]byte(JobDBKeyPrefix)); it.Next() {
for it.Seek([]byte(jobDBKeyPrefix)); it.ValidForPrefix([]byte(jobDBKeyPrefix)); it.Next() {
item := it.Item()
err := item.Value(func(v []byte) error {
var job job[T]
Expand Down Expand Up @@ -259,7 +259,7 @@ func (jq *JobQueue[T]) fetchJobs(ctx context.Context) error { //nolint:gocognit
default:
jq.logger.Warn().Uint64("JobID",
job.ID).Msg("Found pending jobs, but job channel is full")
return ErrJobChannelFull
return errJobChannelFull
}
}

Expand Down
2 changes: 1 addition & 1 deletion jobqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestJobQueue_Recovery(t *testing.T) {
}

func readJob(db *badger.DB, id uint64) ([]byte, error) {
return readKey(db, fmt.Sprintf("%s%d", JobDBKeyPrefix, id))
return readKey(db, fmt.Sprintf("%s%d", jobDBKeyPrefix, id))
}

func readKey(db *badger.DB, key string) ([]byte, error) {
Expand Down

0 comments on commit b0f70b5

Please sign in to comment.