Skip to content

Commit

Permalink
feat: in-memory db mode for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
smsunarto committed Jun 30, 2024
1 parent 615e53f commit 7436a36
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 34 deletions.
38 changes: 26 additions & 12 deletions jobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ const defaultJobBufferSize = 1000
const defaultJobIDSequenceSize = 100

type JobQueue[T any] struct {
db *badger.DB
db *badger.DB
dbPath string
dbInMemory bool

wg sync.WaitGroup
logger zerolog.Logger
cancel context.CancelFunc
Expand All @@ -57,17 +60,17 @@ func New[T any](
log.Warn().Msg("Number of workers is 0, jobs will not be automatically processed")
}

db, err := openDB(dbPath)
if err != nil {
return nil, err
}

jq := &JobQueue[T]{
db: db,
db: nil,
dbPath: dbPath,
dbInMemory: false,

wg: sync.WaitGroup{},
logger: log.With().Str("module", "JobQueue").Str("jobName", name).Logger(),
cancel: nil,
handler: handler,

jobID: nil,
isJobIDInQueue: xsync.NewMapOf[uint64, bool](),
jobs: make(chan *job[T], defaultJobBufferSize),

Expand All @@ -77,16 +80,22 @@ func New[T any](
opt(jq)
}

db, err := jq.openDB()
if err != nil {
return nil, err
}
jq.db = db

jq.logger.Info().Msg("Starting job queue")

ctx, cancel := context.WithCancel(context.Background())
jq.cancel = cancel

jq.jobID, err = jq.db.GetSequence([]byte("nextJobID"), defaultJobIDSequenceSize)
if err != nil {
return nil, fmt.Errorf("failed to start job id sequence: %w", err)
}

ctx, cancel := context.WithCancel(context.Background())
jq.cancel = cancel

// Load jobs from BadgerDB
go jq.pollJobs(ctx)

Expand Down Expand Up @@ -282,8 +291,13 @@ func (jq *JobQueue[T]) fetchJobs(ctx context.Context) error { //nolint:gocognit
return nil
}

func openDB(dbPath string) (*badger.DB, error) {
opts := badger.DefaultOptions(dbPath)
func (jq *JobQueue[T]) openDB() (*badger.DB, error) {
var opts badger.Options
if jq.dbInMemory {
opts = badger.DefaultOptions("").WithInMemory(true)
} else {
opts = badger.DefaultOptions(jq.dbPath)
}
opts.Logger = nil

db, err := badger.Open(opts)
Expand Down
29 changes: 7 additions & 22 deletions jobqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type testJob struct {

func testJobHandler() func(JobContext, testJob) error {
return func(ctx JobContext, job testJob) error {
fmt.Println("Test job processed:", job.Msg, ctx.JobID(),
ctx.JobCreatedAt().Unix()) //nolint:forbidigo // for testing
fmt.Println("Test job processed:", job.Msg, ctx.JobID(), //nolint:forbidigo // for testing
ctx.JobCreatedAt().Unix())
return nil
}
}
Expand All @@ -51,27 +51,24 @@ func TestNewJobQueue(t *testing.T) {
dbPath: "/tmp/test_jobqueue_1",
queueName: "test-queue-1",
workers: 2,
options: nil,
options: []Option[testJob]{WithInmemDB[testJob]()},
expectedError: false,
cleanupNeeded: true,
},
{
name: "Invalid workers count",
dbPath: "/tmp/test_jobqueue_2",
queueName: "test-queue-2",
workers: -1,
options: nil,
options: []Option[testJob]{WithInmemDB[testJob]()},
expectedError: true,
cleanupNeeded: false,
},
{
name: "Zero workers",
dbPath: "/tmp/test_jobqueue_3",
queueName: "test-queue-3",
workers: 0,
options: nil,
options: []Option[testJob]{WithInmemDB[testJob]()},
expectedError: false,
cleanupNeeded: true,
},
}

Expand All @@ -80,16 +77,6 @@ func TestNewJobQueue(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

// Arrange
if tc.cleanupNeeded {
t.Cleanup(func() {
err := os.RemoveAll(tc.dbPath)
if err != nil {
return
}
})
}

// Act
jq, err := New[testJob](tc.dbPath, tc.queueName, tc.workers, testJobHandler(), tc.options...)

Expand Down Expand Up @@ -117,12 +104,11 @@ func TestNewJobQueue(t *testing.T) {
func TestJobQueue_Enqueue(t *testing.T) {
cleanupBadgerDB(t)

jq, err := New[testJob](BadgerDBPath, "test-job", 0, testJobHandler())
jq, err := New[testJob](BadgerDBPath, "test-job", 0, testJobHandler(), WithInmemDB[testJob]())
assert.NoError(t, err)

t.Cleanup(func() {
assert.NoError(t, jq.Stop())
cleanupBadgerDB(t)
})

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -150,12 +136,11 @@ func TestJobQueue_Enqueue(t *testing.T) {
func TestJobQueue_ProcessJob(t *testing.T) {
cleanupBadgerDB(t)

jq, err := New[testJob](BadgerDBPath, "test-job", 0, testJobHandler())
jq, err := New[testJob](BadgerDBPath, "test-job", 0, testJobHandler(), WithInmemDB[testJob]())
assert.NoError(t, err)

t.Cleanup(func() {
assert.NoError(t, jq.Stop())
cleanupBadgerDB(t)
})

// Queue a bunch of jobs
Expand Down
8 changes: 8 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,11 @@ func WithJobBufferSize[T any](size int) Option[T] {
jq.jobs = make(chan *job[T], size)
}
}

// WithInmemDB uses an in-memory BadgerDB instead of a persistent one.
// Useful for testing, but provides no durability guarantees.
func WithInmemDB[T any]() Option[T] {
return func(jq *JobQueue[T]) {
jq.dbInMemory = true
}
}

0 comments on commit 7436a36

Please sign in to comment.