Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add context to schedule functions #898

Merged
merged 8 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions api/schedule/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func CreateSchedule(c *gin.Context) {
// capture middleware values
u := user.Retrieve(c)
r := repo.Retrieve(c)
ctx := c.Request.Context()
allowlist := c.Value("allowlistschedule").([]string)
minimumFrequency := c.Value("scheduleminimumfrequency").(time.Duration)

Expand Down Expand Up @@ -145,7 +146,7 @@ func CreateSchedule(c *gin.Context) {
}

// send API call to capture the schedule from the database
dbSchedule, err := database.FromContext(c).GetScheduleForRepo(r, input.GetName())
dbSchedule, err := database.FromContext(c).GetScheduleForRepo(ctx, r, input.GetName())
if err == nil && dbSchedule.GetActive() {
retErr := fmt.Errorf("unable to create schedule: %s is already active", input.GetName())

Expand All @@ -170,7 +171,7 @@ func CreateSchedule(c *gin.Context) {
dbSchedule.SetActive(true)

// send API call to update the schedule
err = database.FromContext(c).UpdateSchedule(dbSchedule, true)
err = database.FromContext(c).UpdateSchedule(ctx, dbSchedule, true)
if err != nil {
retErr := fmt.Errorf("unable to set schedule %s to active: %w", dbSchedule.GetName(), err)

Expand All @@ -180,10 +181,10 @@ func CreateSchedule(c *gin.Context) {
}

// send API call to capture the updated schedule
s, _ = database.FromContext(c).GetScheduleForRepo(r, dbSchedule.GetName())
s, _ = database.FromContext(c).GetScheduleForRepo(ctx, r, dbSchedule.GetName())
} else {
// send API call to create the schedule
err = database.FromContext(c).CreateSchedule(s)
err = database.FromContext(c).CreateSchedule(ctx, s)
if err != nil {
retErr := fmt.Errorf("unable to create new schedule %s: %w", r.GetName(), err)

Expand All @@ -193,7 +194,7 @@ func CreateSchedule(c *gin.Context) {
}

// send API call to capture the created schedule
s, _ = database.FromContext(c).GetScheduleForRepo(r, input.GetName())
s, _ = database.FromContext(c).GetScheduleForRepo(ctx, r, input.GetName())
}

c.JSON(http.StatusCreated, s)
Expand Down
3 changes: 2 additions & 1 deletion api/schedule/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func DeleteSchedule(c *gin.Context) {
r := repo.Retrieve(c)
u := user.Retrieve(c)
s := schedule.Retrieve(c)
ctx := c.Request.Context()

// update engine logger with API metadata
//
Expand All @@ -75,7 +76,7 @@ func DeleteSchedule(c *gin.Context) {
"user": u.GetName(),
}).Infof("deleting schedule %s", s.GetName())

err := database.FromContext(c).DeleteSchedule(s)
err := database.FromContext(c).DeleteSchedule(ctx, s)
if err != nil {
retErr := fmt.Errorf("unable to delete schedule %s: %w", s.GetName(), err)

Expand Down
3 changes: 2 additions & 1 deletion api/schedule/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import (
func ListSchedules(c *gin.Context) {
// capture middleware values
r := repo.Retrieve(c)
ctx := c.Request.Context()

// update engine logger with API metadata
//
Expand Down Expand Up @@ -109,7 +110,7 @@ func ListSchedules(c *gin.Context) {
perPage = util.MaxInt(1, util.MinInt(100, perPage))

// send API call to capture the list of schedules for the repo
s, t, err := database.FromContext(c).ListSchedulesForRepo(r, page, perPage)
s, t, err := database.FromContext(c).ListSchedulesForRepo(ctx, r, page, perPage)
if err != nil {
retErr := fmt.Errorf("unable to get schedules for repo %s: %w", r.GetFullName(), err)

Expand Down
5 changes: 3 additions & 2 deletions api/schedule/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func UpdateSchedule(c *gin.Context) {
// capture middleware values
r := repo.Retrieve(c)
s := schedule.Retrieve(c)
ctx := c.Request.Context()
scheduleName := util.PathParameter(c, "schedule")
minimumFrequency := c.Value("scheduleminimumfrequency").(time.Duration)

Expand Down Expand Up @@ -123,7 +124,7 @@ func UpdateSchedule(c *gin.Context) {
}

// update the schedule within the database
err = database.FromContext(c).UpdateSchedule(s, true)
err = database.FromContext(c).UpdateSchedule(ctx, s, true)
if err != nil {
retErr := fmt.Errorf("unable to update scheduled %s: %w", scheduleName, err)

Expand All @@ -133,7 +134,7 @@ func UpdateSchedule(c *gin.Context) {
}

// capture the updated scheduled
s, _ = database.FromContext(c).GetScheduleForRepo(r, scheduleName)
s, _ = database.FromContext(c).GetScheduleForRepo(ctx, r, scheduleName)

c.JSON(http.StatusOK, s)
}
13 changes: 7 additions & 6 deletions cmd/vela-server/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"fmt"
"strings"
"time"
Expand All @@ -26,11 +27,11 @@ import (

const baseErr = "unable to schedule build"

func processSchedules(compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
func processSchedules(ctx context.Context, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
logrus.Infof("processing active schedules to create builds")

// send API call to capture the list of active schedules
schedules, err := database.ListActiveSchedules()
schedules, err := database.ListActiveSchedules(ctx)
if err != nil {
return err
}
Expand All @@ -42,7 +43,7 @@ func processSchedules(compiler compiler.Engine, database database.Interface, met
// This is needed to ensure we are not dealing with a stale schedule since we fetch
// all schedules once and iterate through that list which can take a significant
// amount of time to get to the end of the list.
schedule, err := database.GetSchedule(s.GetID())
schedule, err := database.GetSchedule(ctx, s.GetID())
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())

Expand Down Expand Up @@ -84,7 +85,7 @@ func processSchedules(compiler compiler.Engine, database database.Interface, met
}

if trigger && schedule.GetActive() {
err = processSchedule(schedule, compiler, database, metadata, queue, scm)
err = processSchedule(ctx, schedule, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())

Expand All @@ -97,7 +98,7 @@ func processSchedules(compiler compiler.Engine, database database.Interface, met
}

//nolint:funlen // ignore function length and number of statements
func processSchedule(s *library.Schedule, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
func processSchedule(ctx context.Context, s *library.Schedule, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
// sleep for 1s - 3s before processing the schedule
//
// This should prevent multiple servers from processing a schedule at the same time by
Expand Down Expand Up @@ -350,7 +351,7 @@ func processSchedule(s *library.Schedule, compiler compiler.Engine, database dat
}

// send API call to update schedule for ensuring scheduled_at field is set
err = database.UpdateSchedule(s, false)
err = database.UpdateSchedule(ctx, s, false)
if err != nil {
return fmt.Errorf("unable to update schedule %s/%s: %w", r.GetFullName(), s.GetName(), err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/vela-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func server(c *cli.Context) error {
// along with a scale factor of 0.1.
time.Sleep(wait.Jitter(base, 0.1))

err = processSchedules(compiler, database, metadata, queue, scm)
err = processSchedules(ctx, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warn("unable to process schedules")
} else {
Expand Down
6 changes: 5 additions & 1 deletion database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package database

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -54,6 +55,8 @@ type (
client *gorm.DB
// engine configuration settings used in database functions
config *config
// engine context used in database functions
ctx context.Context
// sirupsen/logrus logger used in database functions
logger *logrus.Entry

Expand Down Expand Up @@ -85,6 +88,7 @@ func New(opts ...EngineOpt) (Interface, error) {
e.client = new(gorm.DB)
e.config = new(config)
e.logger = new(logrus.Entry)
e.ctx = context.TODO()

// apply all provided configuration options
for _, opt := range opts {
Expand Down Expand Up @@ -143,7 +147,7 @@ func New(opts ...EngineOpt) (Interface, error) {
}

// create database agnostic engines for resources
err = e.NewResources()
err = e.NewResources(e.ctx)
if err != nil {
return nil, err
}
Expand Down
14 changes: 13 additions & 1 deletion database/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package database

import "time"
import (
"context"
"time"
)

// EngineOpt represents a configuration option to initialize the database engine.
type EngineOpt func(*engine) error
Expand Down Expand Up @@ -88,3 +91,12 @@ func WithSkipCreation(skipCreation bool) EngineOpt {
return nil
}
}

// WithContext sets the context in the database engine.
func WithContext(ctx context.Context) EngineOpt {
return func(e *engine) error {
e.ctx = ctx

return nil
}
}
4 changes: 3 additions & 1 deletion database/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package database

import (
"context"
"github.com/go-vela/server/database/build"
"github.com/go-vela/server/database/hook"
"github.com/go-vela/server/database/log"
Expand All @@ -19,7 +20,7 @@ import (
)

// NewResources creates and returns the database agnostic engines for resources.
func (e *engine) NewResources() error {
func (e *engine) NewResources(ctx context.Context) error {
var err error

// create the database agnostic engine for builds
Expand Down Expand Up @@ -77,6 +78,7 @@ func (e *engine) NewResources() error {

// create the database agnostic engine for schedules
e.ScheduleInterface, err = schedule.New(
schedule.WithContext(e.ctx),
schedule.WithClient(e.client),
schedule.WithLogger(e.logger),
schedule.WithSkipCreation(e.config.SkipCreation),
Expand Down
3 changes: 2 additions & 1 deletion database/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package database

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
Expand Down Expand Up @@ -94,7 +95,7 @@ func TestDatabase_Engine_NewResources(t *testing.T) {
// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
err := test.database.NewResources()
err := test.database.NewResources(context.TODO())

if test.failure {
if err == nil {
Expand Down
3 changes: 2 additions & 1 deletion database/schedule/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
package schedule

import (
"context"
"github.com/go-vela/types/constants"
)

// CountSchedules gets the count of all schedules from the database.
func (e *engine) CountSchedules() (int64, error) {
func (e *engine) CountSchedules(ctx context.Context) (int64, error) {
e.logger.Tracef("getting count of all schedules from the database")

// variable to store query results
Expand Down
3 changes: 2 additions & 1 deletion database/schedule/count_active.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
package schedule

import (
"context"
"github.com/go-vela/types/constants"
)

// CountActiveSchedules gets the count of all active schedules from the database.
func (e *engine) CountActiveSchedules() (int64, error) {
func (e *engine) CountActiveSchedules(ctx context.Context) (int64, error) {
e.logger.Tracef("getting count of all active schedules from the database")

// variable to store query results
Expand Down
7 changes: 4 additions & 3 deletions database/schedule/count_active_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package schedule

import (
"context"
"reflect"
"testing"

Expand Down Expand Up @@ -46,12 +47,12 @@ func TestSchedule_Engine_CountActiveSchedules(t *testing.T) {
_sqlite := testSqlite(t)
defer func() { _sql, _ := _sqlite.client.DB(); _sql.Close() }()

err := _sqlite.CreateSchedule(_scheduleOne)
err := _sqlite.CreateSchedule(context.TODO(), _scheduleOne)
if err != nil {
t.Errorf("unable to create test schedule for sqlite: %v", err)
}

err = _sqlite.CreateSchedule(_scheduleTwo)
err = _sqlite.CreateSchedule(context.TODO(), _scheduleTwo)
if err != nil {
t.Errorf("unable to create test schedule for sqlite: %v", err)
}
Expand Down Expand Up @@ -80,7 +81,7 @@ func TestSchedule_Engine_CountActiveSchedules(t *testing.T) {
// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := test.database.CountActiveSchedules()
got, err := test.database.CountActiveSchedules(context.TODO())

if test.failure {
if err == nil {
Expand Down
3 changes: 2 additions & 1 deletion database/schedule/count_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
package schedule

import (
"context"
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/sirupsen/logrus"
)

// CountSchedulesForRepo gets the count of schedules by repo ID from the database.
func (e *engine) CountSchedulesForRepo(r *library.Repo) (int64, error) {
func (e *engine) CountSchedulesForRepo(ctx context.Context, r *library.Repo) (int64, error) {
e.logger.WithFields(logrus.Fields{
"org": r.GetOrg(),
"repo": r.GetName(),
Expand Down
7 changes: 4 additions & 3 deletions database/schedule/count_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package schedule

import (
"context"
"reflect"
"testing"

Expand Down Expand Up @@ -50,12 +51,12 @@ func TestSchedule_Engine_CountSchedulesForRepo(t *testing.T) {
_sqlite := testSqlite(t)
defer func() { _sql, _ := _sqlite.client.DB(); _sql.Close() }()

err := _sqlite.CreateSchedule(_scheduleOne)
err := _sqlite.CreateSchedule(context.TODO(), _scheduleOne)
if err != nil {
t.Errorf("unable to create test schedule for sqlite: %v", err)
}

err = _sqlite.CreateSchedule(_scheduleTwo)
err = _sqlite.CreateSchedule(context.TODO(), _scheduleTwo)
if err != nil {
t.Errorf("unable to create test schedule for sqlite: %v", err)
}
Expand Down Expand Up @@ -84,7 +85,7 @@ func TestSchedule_Engine_CountSchedulesForRepo(t *testing.T) {
// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := test.database.CountSchedulesForRepo(_repo)
got, err := test.database.CountSchedulesForRepo(context.TODO(), _repo)

if test.failure {
if err == nil {
Expand Down
Loading