diff --git a/cmd/manager/main.go b/cmd/manager/main.go index d9b437f..aae1ba5 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -4,17 +4,17 @@ import ( "context" "errors" "fmt" - "github.com/GLCharge/otelzap" "net/http" "os" "os/signal" "syscall" "time" + "github.com/GLCharge/otelzap" "github.com/ardanlabs/conf/v3" - "github.com/xBlaz3kx/distributed-scheduler/foundation/database" - "github.com/xBlaz3kx/distributed-scheduler/foundation/logger" - "github.com/xBlaz3kx/distributed-scheduler/handlers" + api "github.com/xBlaz3kx/distributed-scheduler/internal/api/http" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/database" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/logger" "go.uber.org/zap" ) @@ -31,7 +31,6 @@ func main() { if err := run(log); err != nil { log.Error("startup error", zap.Error(err)) - log.Sync() os.Exit(1) } } @@ -123,10 +122,10 @@ func run(log *otelzap.Logger) error { shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM) - apiMux := handlers.APIMux(handlers.APIMuxConfig{ + apiMux := api.Api(api.APIMuxConfig{ Log: log, DB: db, - OpenApi: handlers.OpenApiConfig{ + OpenApi: api.OpenApiConfig{ Enabled: cfg.OpenAPI.Enable, Scheme: cfg.OpenAPI.Scheme, Host: cfg.OpenAPI.Host, diff --git a/cmd/manager/root.go b/cmd/manager/root.go new file mode 100644 index 0000000..c9fcdb2 --- /dev/null +++ b/cmd/manager/root.go @@ -0,0 +1,15 @@ +package main + +import "github.com/spf13/cobra" + +var rootCmd = &cobra.Command{ + Use: "scheduler", + Short: "Scheduler manager", +} + +func Execute() { + err := rootCmd.Execute() + if err != nil { + panic(err) + } +} diff --git a/cmd/runner/main.go b/cmd/runner/main.go index 769d647..51ae911 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -11,15 +11,14 @@ import ( "time" "github.com/GLCharge/otelzap" - "github.com/xBlaz3kx/distributed-scheduler/handlers" - "github.com/ardanlabs/conf/v3" - "github.com/xBlaz3kx/distributed-scheduler/executor" - "github.com/xBlaz3kx/distributed-scheduler/foundation/database" - "github.com/xBlaz3kx/distributed-scheduler/foundation/logger" - "github.com/xBlaz3kx/distributed-scheduler/runner" - "github.com/xBlaz3kx/distributed-scheduler/service/job" - "github.com/xBlaz3kx/distributed-scheduler/store/postgres" + api "github.com/xBlaz3kx/distributed-scheduler/internal/api/http" + "github.com/xBlaz3kx/distributed-scheduler/internal/executor" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/database" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/logger" + "github.com/xBlaz3kx/distributed-scheduler/internal/runner" + "github.com/xBlaz3kx/distributed-scheduler/internal/service/job" + "github.com/xBlaz3kx/distributed-scheduler/internal/store/postgres" "go.uber.org/zap" ) @@ -43,34 +42,28 @@ func main() { } } +type Configuration struct { + conf.Version + Web struct { + ReadTimeout time.Duration `conf:"default:5s"` + WriteTimeout time.Duration `conf:"default:10s"` + IdleTimeout time.Duration `conf:"default:120s"` + ShutdownTimeout time.Duration `conf:"default:20s"` + APIHost string `conf:"default:0.0.0.0:8000"` + } + DB database.Config + ID string `conf:"default:instance1"` + Interval time.Duration `conf:"default:10s"` + MaxConcurrentJobs int `conf:"default:100"` + MaxJobLockTime time.Duration `conf:"default:1m"` +} + func run(log *otelzap.Logger) error { // ------------------------------------------------------------------------- // Configuration - cfg := struct { - conf.Version - Web struct { - ReadTimeout time.Duration `conf:"default:5s"` - WriteTimeout time.Duration `conf:"default:10s"` - IdleTimeout time.Duration `conf:"default:120s"` - ShutdownTimeout time.Duration `conf:"default:20s"` - APIHost string `conf:"default:0.0.0.0:8000"` - } - DB struct { - User string `conf:"default:scheduler"` - Password string `conf:"default:scheduler,mask"` - Host string `conf:"default:localhost:5436"` - Name string `conf:"default:scheduler"` - MaxIdleConns int `conf:"default:3"` - MaxOpenConns int `conf:"default:2"` - DisableTLS bool `conf:"default:true"` - } - ID string `conf:"default:instance1"` - Interval time.Duration `conf:"default:10s"` - MaxConcurrentJobs int `conf:"default:100"` - MaxJobLockTime time.Duration `conf:"default:1m"` - }{ + cfg := Configuration{ Version: conf.Version{ Build: build, Desc: "copyright information here", @@ -135,7 +128,7 @@ func run(log *otelzap.Logger) error { executorFactory := executor.NewFactory(&http.Client{Timeout: 30 * time.Second}) - runnner := runner.New(runner.Config{ + runner := runner.New(runner.Config{ JobService: jobService, Log: log, ExecutorFactory: executorFactory, @@ -145,11 +138,11 @@ func run(log *otelzap.Logger) error { JobLockDuration: cfg.MaxJobLockTime, }) - runnner.Start() + runner.Start() // // API - apiMux := handlers.RunnerAPI(handlers.APIMuxConfig{ + apiMux := api.RunnerAPI(api.APIMuxConfig{ Log: log, DB: db, }) @@ -183,7 +176,7 @@ func run(log *otelzap.Logger) error { defer cancel() // stop the runner - runnner.Stop(ctx) + runner.Stop(ctx) } return nil diff --git a/cmd/tooling/cmd/migrate.go b/cmd/tooling/cmd/migrate.go index 1e9cefe..9aa9690 100644 --- a/cmd/tooling/cmd/migrate.go +++ b/cmd/tooling/cmd/migrate.go @@ -2,11 +2,12 @@ package cmd import ( "context" - "fmt" - "github.com/spf13/cobra" - "github.com/xBlaz3kx/distributed-scheduler/foundation/database" - "github.com/xBlaz3kx/distributed-scheduler/foundation/database/dbmigrate" "time" + + "github.com/GLCharge/otelzap" + "github.com/spf13/cobra" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/database" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/database/dbmigrate" ) var migrateCmd = &cobra.Command{ @@ -29,9 +30,10 @@ func init() { } func migrateRun(cmd *cobra.Command, args []string) { + logger := otelzap.L().Sugar() db, err := database.Open(dbConfig) if err != nil { - fmt.Printf("open database: %v", err) + logger.Fatalf("unable to create database connection: %v", err) return } defer db.Close() @@ -40,9 +42,9 @@ func migrateRun(cmd *cobra.Command, args []string) { defer cancel() if err := dbmigrate.Migrate(ctx, db); err != nil { - fmt.Printf("migrate database: %v", err) + logger.Fatalf("unable to migrate the database: %v", err) return } - fmt.Println("migrations complete") + logger.Info("Database migrations complete!") } diff --git a/cmd/tooling/cmd/root.go b/cmd/tooling/cmd/root.go index 9f27d07..bea9629 100644 --- a/cmd/tooling/cmd/root.go +++ b/cmd/tooling/cmd/root.go @@ -1,8 +1,12 @@ package cmd import ( - "github.com/spf13/cobra" "os" + + "github.com/GLCharge/otelzap" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/logger" ) var rootCmd = &cobra.Command{ @@ -10,7 +14,16 @@ var rootCmd = &cobra.Command{ Short: "CLI tool for managing the scheduler.", } +func setupLogging() { + logLevel := viper.GetString("log.level") + logger, err := logger.New(logLevel) + if err == nil { + otelzap.ReplaceGlobals(logger) + } +} + func Execute() { + cobra.OnInitialize(setupLogging) err := rootCmd.Execute() if err != nil { os.Exit(1) diff --git a/go.mod b/go.mod index b6aea1c..509b218 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/xBlaz3kx/distributed-scheduler -go 1.20 +go 1.22.1 replace github.com/GLCharge/otelzap v0.0.0-20230904131944-57dc7c9994a9 => github.com/xBlaz3kx/otelzap v0.0.0-20230904131944-57dc7c9994a9 @@ -13,45 +13,58 @@ require ( github.com/lib/pq v1.10.9 github.com/samber/lo v1.46.0 github.com/spf13/cobra v1.8.0 + github.com/spf13/viper v1.19.0 github.com/swaggo/files v1.0.1 github.com/swaggo/gin-swagger v1.6.0 github.com/vearne/gin-timeout v0.2.0 ) require ( + github.com/DATA-DOG/go-sqlmock v1.5.2 // indirect github.com/KyleBanks/depth v1.2.1 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/iasm v0.2.0 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect github.com/go-openapi/spec v0.20.8 // indirect github.com/go-openapi/swag v0.22.3 // indirect + github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect - github.com/kr/pretty v0.3.1 // indirect + github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect + github.com/sagikazarmark/locafero v0.4.0 // indirect + github.com/sagikazarmark/slog-shim v0.1.0 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect + github.com/spf13/afero v1.11.0 // indirect + github.com/spf13/cast v1.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/subosito/gotenv v1.6.0 // indirect github.com/swaggo/swag v1.16.3 // indirect github.com/uptrace/opentelemetry-go-extra/otelutil v0.2.0 // indirect - go.opentelemetry.io/otel v1.15.1 // indirect - go.opentelemetry.io/otel/trace v1.15.1 // indirect + go.opentelemetry.io/otel v1.24.0 // indirect + go.opentelemetry.io/otel/trace v1.24.0 // indirect + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect + gopkg.in/ini.v1 v1.67.0 // indirect ) require ( github.com/ardanlabs/conf/v3 v3.1.5 github.com/bytedance/sonic v1.11.6 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/gin-gonic/gin v1.10.0 github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.20.0 // indirect github.com/goccy/go-json v0.10.2 // indirect - github.com/google/uuid v1.3.0 + github.com/google/uuid v1.4.0 github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgx/v5 v5.3.1 @@ -64,7 +77,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pkg/errors v0.9.1 - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rabbitmq/amqp091-go v1.8.1 github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index d87a49a..928188a 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,6 @@ -github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= +github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/ardanlabs/conf/v3 v3.1.5 h1:G6df2AxKnGHAK+ur2p50Ys8Vo1HnKcsvqSj9lxVeczk= @@ -19,8 +20,12 @@ github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQ github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= github.com/gin-contrib/gzip v0.0.6 h1:NjcunTcGAj5CO1gn4N8jHOSIeRFHIbn51z6K+xaN4d4= @@ -30,7 +35,7 @@ github.com/gin-contrib/zap v1.1.1 h1:DDyIF9YQorl3gZzAabIowRywHJuohDfiLnhwvWKl6SY github.com/gin-contrib/zap v1.1.1/go.mod h1:YW8KOko2kYLy8g6k9YgVNTj7SIcrUEzYiAd9IjiBPs0= github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU= github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= -github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY= @@ -58,8 +63,10 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= @@ -74,13 +81,13 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -90,6 +97,8 @@ github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjS github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= +github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= @@ -99,6 +108,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -107,25 +118,36 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA= github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= +github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= +github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= +github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/samber/lo v1.46.0 h1:w8G+oaCPgz1PoCJztqymCFaKwXt+5cCXn51uPxExFfQ= github.com/samber/lo v1.46.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= +github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= +github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= +github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= +github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= +github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -139,6 +161,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/swaggo/files v1.0.1 h1:J1bVJ4XHZNq0I46UU90611i9/YzdrF7x92oX1ig5IdE= github.com/swaggo/files v1.0.1/go.mod h1:0qXmMNH6sXNf+73t65aKeB+ApmgxdnkQzVTAj2uaMUg= github.com/swaggo/gin-swagger v1.6.0 h1:y8sxvQ3E20/RCyrXeFfg60r6H0Z+SwpTjMYsMm+zy8M= @@ -156,11 +180,12 @@ github.com/vearne/gin-timeout v0.2.0/go.mod h1:BKCWwia+RoBi1gv+RS4FtVrcM7bVhNkfU github.com/xBlaz3kx/otelzap v0.0.0-20230904131944-57dc7c9994a9 h1:weeeX2giloC8rp5OSvzSmevSAjiQZqQmLMI3vErqXMQ= github.com/xBlaz3kx/otelzap v0.0.0-20230904131944-57dc7c9994a9/go.mod h1:oDNnuI3k0It4AGtGLqNipQo5TcF8vojylfADQXQW4U8= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.opentelemetry.io/otel v1.15.1 h1:3Iwq3lfRByPaws0f6bU3naAqOR1n5IeDWd9390kWHa8= -go.opentelemetry.io/otel v1.15.1/go.mod h1:mHHGEHVDLal6YrKMmk9LqC4a3sF5g+fHfrttQIB1NTc= +go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= +go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= go.opentelemetry.io/otel/sdk v1.15.1 h1:5FKR+skgpzvhPQHIEfcwMYjCBr14LWzs3uSqKiQzETI= -go.opentelemetry.io/otel/trace v1.15.1 h1:uXLo6iHJEzDfrNC0L0mNjItIp06SyaBQxu5t3xMlngY= -go.opentelemetry.io/otel/trace v1.15.1/go.mod h1:IWdQG/5N1x7f6YUlmdLeJvH9yxtuJAfc4VW5Agv9r/8= +go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= +go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -174,6 +199,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -217,6 +244,8 @@ gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/guregu/null.v4 v4.0.0 h1:1Wm3S1WEA2I26Kq+6vcW+w0gcDo44YKYD7YIEJNHDjg= gopkg.in/guregu/null.v4 v4.0.0/go.mod h1:YoQhUrADuG3i9WqesrCmpNRwm1ypAgSHYqoOcTu/JrI= +gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= +gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/handlers/doc.go b/internal/api/http/doc.go similarity index 92% rename from handlers/doc.go rename to internal/api/http/doc.go index f9c3819..d9aaa22 100644 --- a/handlers/doc.go +++ b/internal/api/http/doc.go @@ -4,4 +4,4 @@ // @host http://localhost:8000 // @BasePath /v1 -package handlers +package http diff --git a/handlers/handlers.go b/internal/api/http/handlers.go similarity index 86% rename from handlers/handlers.go rename to internal/api/http/handlers.go index 02e2e0c..9367fc3 100644 --- a/handlers/handlers.go +++ b/internal/api/http/handlers.go @@ -1,18 +1,19 @@ -package handlers +package http import ( "context" + "net/http" + "time" + "github.com/GLCharge/otelzap" ginzap "github.com/gin-contrib/zap" "github.com/gin-gonic/gin" "github.com/jmoiron/sqlx" timeout "github.com/vearne/gin-timeout" - "github.com/xBlaz3kx/distributed-scheduler/foundation/database" - "github.com/xBlaz3kx/distributed-scheduler/service/job" - "github.com/xBlaz3kx/distributed-scheduler/store/postgres" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/database" + "github.com/xBlaz3kx/distributed-scheduler/internal/service/job" + "github.com/xBlaz3kx/distributed-scheduler/internal/store/postgres" "go.uber.org/zap" - "net/http" - "time" ) // APIMuxConfig contains all the mandatory systems required by handlers. @@ -22,8 +23,8 @@ type APIMuxConfig struct { OpenApi OpenApiConfig } -// APIMux constructs a http.Handler with all application routes defined. -func APIMux(cfg APIMuxConfig) http.Handler { +// Api constructs a http.Handler with all application routes defined. +func Api(cfg APIMuxConfig) http.Handler { // Create a new Gin router router := gin.New() @@ -81,6 +82,7 @@ func RunnerAPI(cfg APIMuxConfig) http.Handler { // Define a route for the health check endpoint router.GET("/health", healthCheck(cfg)) + //_ = gin_healthcheck.New(router, config.Config{}, nil) return router } diff --git a/handlers/jobs.go b/internal/api/http/jobs.go similarity index 93% rename from handlers/jobs.go rename to internal/api/http/jobs.go index a67d63b..6861bf8 100644 --- a/handlers/jobs.go +++ b/internal/api/http/jobs.go @@ -1,13 +1,14 @@ -package handlers +package http import ( - "github.com/google/uuid" "net/http" "strconv" "github.com/gin-gonic/gin" - "github.com/xBlaz3kx/distributed-scheduler/model" - jobService "github.com/xBlaz3kx/distributed-scheduler/service/job" + "github.com/google/uuid" + "github.com/xBlaz3kx/distributed-scheduler/internal/model" + errors "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/error" + jobService "github.com/xBlaz3kx/distributed-scheduler/internal/service/job" ) func JobsRoutesV1(router *gin.Engine, jobsHandler *Jobs) { @@ -58,7 +59,7 @@ func (j *Jobs) CreateJob() gin.HandlerFunc { job, err := j.service.CreateJob(ctx.Request.Context(), create) if err != nil { - jobErr := model.ToCustomJobError(err) + jobErr := errors.ToCustomJobError(err) ctx.JSON(jobErr.Code, ErrorResponse{Error: jobErr.Error()}) return @@ -98,7 +99,7 @@ func (j *Jobs) UpdateJob() gin.HandlerFunc { job, err := j.service.UpdateJob(ctx.Request.Context(), id, update) if err != nil { - jobErr := model.ToCustomJobError(err) + jobErr := errors.ToCustomJobError(err) ctx.JSON(jobErr.Code, ErrorResponse{Error: jobErr.Error()}) return @@ -131,7 +132,7 @@ func (j *Jobs) GetJob() gin.HandlerFunc { job, err := j.service.GetJob(ctx.Request.Context(), id) if err != nil { - jobErr := model.ToCustomJobError(err) + jobErr := errors.ToCustomJobError(err) ctx.JSON(jobErr.Code, ErrorResponse{Error: jobErr.Error()}) return @@ -163,7 +164,7 @@ func (j *Jobs) DeleteJob() gin.HandlerFunc { } if err := j.service.DeleteJob(ctx.Request.Context(), id); err != nil { - jobErr := model.ToCustomJobError(err) + jobErr := errors.ToCustomJobError(err) ctx.JSON(jobErr.Code, ErrorResponse{Error: jobErr.Error()}) return @@ -233,7 +234,7 @@ func (j *Jobs) GetJobExecutions() gin.HandlerFunc { executions, err := j.service.GetJobExecutions(ctx.Request.Context(), jobID, failedOnly, limit, offset) if err != nil { - jobErr := model.ToCustomJobError(err) + jobErr := errors.ToCustomJobError(err) ctx.JSON(jobErr.Code, ErrorResponse{Error: jobErr.Error()}) return diff --git a/handlers/openapi.go b/internal/api/http/openapi.go similarity index 96% rename from handlers/openapi.go rename to internal/api/http/openapi.go index 7daa72f..b1856df 100644 --- a/handlers/openapi.go +++ b/internal/api/http/openapi.go @@ -1,4 +1,4 @@ -package handlers +package http import ( "github.com/gin-gonic/gin" diff --git a/internal/executor/amqp_executor.go b/internal/executor/amqp_executor.go new file mode 100644 index 0000000..d6a430a --- /dev/null +++ b/internal/executor/amqp_executor.go @@ -0,0 +1,65 @@ +package executor + +import ( + "context" + "encoding/base64" + "fmt" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/xBlaz3kx/distributed-scheduler/internal/model" + error2 "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/error" +) + +type amqpExecutor struct{} + +func (ae *amqpExecutor) Execute(ctx context.Context, j *model.Job) error { + // Create a new AMQP connection + conn, err := amqp.Dial(j.AMQPJob.Connection) + if err != nil { + return fmt.Errorf("failed to connect to AMQP: %w", err) + } + defer conn.Close() + + // Create a new AMQP channel + ch, err := conn.Channel() + if err != nil { + return fmt.Errorf("failed to open a channel: %w", err) + } + defer ch.Close() + + var body []byte + + if j.AMQPJob.BodyEncoding != nil { + switch *j.AMQPJob.BodyEncoding { + case model.BodyEncodingBase64: + body, err = base64.StdEncoding.DecodeString(j.AMQPJob.Body) + if err != nil { + return fmt.Errorf("failed to decode body: %w", err) + } + default: + return error2.ErrInvalidBodyEncoding + } + + } else { + body = []byte(j.AMQPJob.Body) + } + + // Publish a message to the exchange + err = ch.PublishWithContext( + ctx, + j.AMQPJob.Exchange, // exchange + j.AMQPJob.RoutingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: j.AMQPJob.ContentType, + Headers: j.AMQPJob.Headers, + Body: body, + }, + ) + if err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} diff --git a/internal/executor/executor.go b/internal/executor/executor.go new file mode 100644 index 0000000..b75d4ba --- /dev/null +++ b/internal/executor/executor.go @@ -0,0 +1,11 @@ +package executor + +import ( + "context" + + "github.com/xBlaz3kx/distributed-scheduler/internal/model" +) + +type Executor interface { + Execute(ctx context.Context, job *model.Job) error +} diff --git a/executor/factory.go b/internal/executor/factory.go similarity index 65% rename from executor/factory.go rename to internal/executor/factory.go index b25ca08..99765a5 100644 --- a/executor/factory.go +++ b/internal/executor/factory.go @@ -2,11 +2,12 @@ package executor import ( "fmt" - "github.com/xBlaz3kx/distributed-scheduler/model" + + "github.com/xBlaz3kx/distributed-scheduler/internal/model" ) type Factory interface { - NewExecutor(job *model.Job, options ...Option) (model.Executor, error) + NewExecutor(job *model.Job, options ...Option) (Executor, error) } type factory struct { @@ -20,16 +21,16 @@ func NewFactory(client HttpClient) Factory { } // Option is a function that modifies an executor before it is returned (e.g. WithRetry) -type Option func(executor model.Executor) model.Executor +type Option func(executor Executor) Executor -func (f *factory) NewExecutor(job *model.Job, options ...Option) (model.Executor, error) { +func (f *factory) NewExecutor(job *model.Job, options ...Option) (Executor, error) { - var executor model.Executor + var executor Executor switch job.Type { case model.JobTypeHTTP: - executor = &hTTPExecutor{Client: f.client} + executor = &httpExecutor{Client: f.client} case model.JobTypeAMQP: - executor = &aMQPExecutor{} + executor = &amqpExecutor{} default: return nil, fmt.Errorf("unknown job type: %v", job.Type) } diff --git a/executor/factory_test.go b/internal/executor/factory_test.go similarity index 82% rename from executor/factory_test.go rename to internal/executor/factory_test.go index 954254a..1fa6e82 100644 --- a/executor/factory_test.go +++ b/internal/executor/factory_test.go @@ -1,10 +1,11 @@ package executor import ( - "github.com/stretchr/testify/assert" - "github.com/xBlaz3kx/distributed-scheduler/model" "net/http" "testing" + + "github.com/stretchr/testify/assert" + "github.com/xBlaz3kx/distributed-scheduler/internal/model" ) func TestNewExecutor(t *testing.T) { @@ -20,12 +21,12 @@ func TestNewExecutor(t *testing.T) { executor, err := factory.NewExecutor(j) assert.Nil(t, err) - assert.IsType(t, &hTTPExecutor{}, executor) + assert.IsType(t, &httpExecutor{}, executor) j.Type = model.JobTypeAMQP executor, err = factory.NewExecutor(j) assert.Nil(t, err) - assert.IsType(t, &aMQPExecutor{}, executor) + assert.IsType(t, &amqpExecutor{}, executor) j.Type = "unknown" executor, err = factory.NewExecutor(j) diff --git a/executor/executor.go b/internal/executor/http_executor.go similarity index 52% rename from executor/executor.go rename to internal/executor/http_executor.go index bf9f859..1291c04 100644 --- a/executor/executor.go +++ b/internal/executor/http_executor.go @@ -2,34 +2,31 @@ package executor import ( "context" - "encoding/base64" "fmt" "io" "net/http" "strings" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/xBlaz3kx/distributed-scheduler/model" + "github.com/xBlaz3kx/distributed-scheduler/internal/model" + errors "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/error" ) -type hTTPExecutor struct { +// HTTPSPrefix and HTTPPrefix are prefixes for HTTP and HTTPS protocols +const ( + HTTPSPrefix = "https://" + HTTPPrefix = "http://" +) + +type httpExecutor struct { Client HttpClient } -type aMQPExecutor struct{} - // HttpClient interface type HttpClient interface { Do(req *http.Request) (*http.Response, error) } -// HTTPSPrefix and HTTPPrefix are prefixes for HTTP and HTTPS protocols -const ( - HTTPSPrefix = "https://" - HTTPPrefix = "http://" -) - -func (he *hTTPExecutor) Execute(ctx context.Context, j *model.Job) error { +func (he *httpExecutor) Execute(ctx context.Context, j *model.Job) error { // Create the HTTP request req, err := he.createHTTPRequest(ctx, j) if err != nil { @@ -45,13 +42,13 @@ func (he *hTTPExecutor) Execute(ctx context.Context, j *model.Job) error { // Check if status code is one of the valid response codes if !he.validResponseCode(resp.StatusCode, j.HTTPJob.ValidResponseCodes) { - return model.ErrInvalidResponseCode + return errors.ErrInvalidResponseCode } return nil } -func (he *hTTPExecutor) validResponseCode(code int, validCodes []int) bool { +func (he *httpExecutor) validResponseCode(code int, validCodes []int) bool { // If no valid response codes are defined, 200 is the default if len(validCodes) == 0 { return code == http.StatusOK @@ -67,7 +64,7 @@ func (he *hTTPExecutor) validResponseCode(code int, validCodes []int) bool { return false } -func (he *hTTPExecutor) createHTTPRequest(ctx context.Context, j *model.Job) (*http.Request, error) { +func (he *httpExecutor) createHTTPRequest(ctx context.Context, j *model.Job) (*http.Request, error) { // Create the request body body := he.createHTTPRequestBody(j.HTTPJob.Body.String) @@ -89,7 +86,7 @@ func (he *hTTPExecutor) createHTTPRequest(ctx context.Context, j *model.Job) (*h return req, nil } -func (he *hTTPExecutor) createHTTPRequestBody(body string) io.Reader { +func (he *httpExecutor) createHTTPRequestBody(body string) io.Reader { if body == "" { return nil } @@ -97,7 +94,7 @@ func (he *hTTPExecutor) createHTTPRequestBody(body string) io.Reader { return strings.NewReader(body) } -func (he *hTTPExecutor) createHTTPRequestURL(url string) string { +func (he *httpExecutor) createHTTPRequestURL(url string) string { if strings.HasPrefix(url, HTTPPrefix) || strings.HasPrefix(url, HTTPSPrefix) { return url } @@ -105,13 +102,13 @@ func (he *hTTPExecutor) createHTTPRequestURL(url string) string { return HTTPSPrefix + url } -func (he *hTTPExecutor) setHTTPRequestHeaders(req *http.Request, headers map[string]string) { +func (he *httpExecutor) setHTTPRequestHeaders(req *http.Request, headers map[string]string) { for key, value := range headers { req.Header.Set(key, value) } } -func (he *hTTPExecutor) setHTTPRequestAuth(req *http.Request, auth model.Auth) { +func (he *httpExecutor) setHTTPRequestAuth(req *http.Request, auth model.Auth) { switch auth.Type { case model.AuthTypeBasic: req.SetBasicAuth(auth.Username.String, auth.Password.String) @@ -119,55 +116,3 @@ func (he *hTTPExecutor) setHTTPRequestAuth(req *http.Request, auth model.Auth) { req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", auth.BearerToken.String)) } } - -func (ae *aMQPExecutor) Execute(ctx context.Context, j *model.Job) error { - // Create a new AMQP connection - conn, err := amqp.Dial(j.AMQPJob.Connection) - if err != nil { - return fmt.Errorf("failed to connect to AMQP: %w", err) - } - defer conn.Close() - - // Create a new AMQP channel - ch, err := conn.Channel() - if err != nil { - return fmt.Errorf("failed to open a channel: %w", err) - } - defer ch.Close() - - var body []byte - - if j.AMQPJob.BodyEncoding != nil { - switch *j.AMQPJob.BodyEncoding { - case model.BodyEncodingBase64: - body, err = base64.StdEncoding.DecodeString(j.AMQPJob.Body) - if err != nil { - return fmt.Errorf("failed to decode body: %w", err) - } - default: - return model.ErrInvalidBodyEncoding - } - - } else { - body = []byte(j.AMQPJob.Body) - } - - // Publish a message to the exchange - err = ch.PublishWithContext( - ctx, - j.AMQPJob.Exchange, // exchange - j.AMQPJob.RoutingKey, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: j.AMQPJob.ContentType, - Headers: j.AMQPJob.Headers, - Body: body, - }, - ) - if err != nil { - return fmt.Errorf("failed to publish message: %w", err) - } - - return nil -} diff --git a/executor/executor_test.go b/internal/executor/http_executor_test.go similarity index 90% rename from executor/executor_test.go rename to internal/executor/http_executor_test.go index de04e3b..56f9137 100644 --- a/executor/executor_test.go +++ b/internal/executor/http_executor_test.go @@ -4,12 +4,13 @@ import ( "context" "encoding/base64" "errors" - "github.com/xBlaz3kx/distributed-scheduler/model" - "gopkg.in/guregu/null.v4" "net/http" "net/http/httptest" "testing" + "github.com/xBlaz3kx/distributed-scheduler/internal/model" + "gopkg.in/guregu/null.v4" + "github.com/stretchr/testify/assert" ) @@ -49,7 +50,7 @@ func TestHTTPExecutor_Execute(t *testing.T) { }, } - httpExecutor := &hTTPExecutor{Client: mockHttpClient} + httpExecutor := &httpExecutor{Client: mockHttpClient} err := httpExecutor.Execute(ctx, j) assert.Nil(t, err) @@ -62,7 +63,7 @@ func TestHTTPExecutor_Execute(t *testing.T) { }, } - httpExecutor := &hTTPExecutor{Client: mockHttpClient} + httpExecutor := &httpExecutor{Client: mockHttpClient} err := httpExecutor.Execute(ctx, j) assert.NotNil(t, err) @@ -78,7 +79,7 @@ func TestHTTPExecutor_Execute(t *testing.T) { }, } - httpExecutor := &hTTPExecutor{Client: mockHttpClient} + httpExecutor := &httpExecutor{Client: mockHttpClient} err := httpExecutor.Execute(ctx, j) assert.NotNil(t, err) @@ -94,7 +95,7 @@ func TestHTTPExecutor_Execute(t *testing.T) { }, } - httpExecutor := &hTTPExecutor{Client: mockHttpClient} + httpExecutor := &httpExecutor{Client: mockHttpClient} err := httpExecutor.Execute(ctx, j) assert.Nil(t, err) @@ -118,7 +119,7 @@ func TestHTTPExecutor_createHTTPRequest(t *testing.T) { }, } - httpExecutor := &hTTPExecutor{} + httpExecutor := &httpExecutor{} req, err := httpExecutor.createHTTPRequest(ctx, j) assert.Nil(t, err) @@ -128,7 +129,7 @@ func TestHTTPExecutor_createHTTPRequest(t *testing.T) { } func TestHTTPExecutor_validResponseCode(t *testing.T) { - httpExecutor := &hTTPExecutor{} + httpExecutor := &httpExecutor{} validResponseCodes := []int{200, 201} diff --git a/executor/retry.go b/internal/executor/opts.go similarity index 84% rename from executor/retry.go rename to internal/executor/opts.go index 25dd0db..c4a40a2 100644 --- a/executor/retry.go +++ b/internal/executor/opts.go @@ -2,17 +2,18 @@ package executor import ( "context" + "github.com/cenkalti/backoff/v4" - "github.com/xBlaz3kx/distributed-scheduler/model" + "github.com/xBlaz3kx/distributed-scheduler/internal/model" ) // RetryExecutor struct encapsulates an executor and adds retry functionality type retryExecutor struct { - executor model.Executor + executor Executor } // WithRetry wraps an executor with a retry mechanism -func WithRetry(executor model.Executor) model.Executor { +func WithRetry(executor Executor) Executor { return &retryExecutor{executor: executor} } diff --git a/executor/retry_test.go b/internal/executor/opts_test.go similarity index 95% rename from executor/retry_test.go rename to internal/executor/opts_test.go index 0e9ad66..d749c14 100644 --- a/executor/retry_test.go +++ b/internal/executor/opts_test.go @@ -3,9 +3,10 @@ package executor import ( "context" "errors" - "github.com/stretchr/testify/assert" - "github.com/xBlaz3kx/distributed-scheduler/model" "testing" + + "github.com/stretchr/testify/assert" + "github.com/xBlaz3kx/distributed-scheduler/internal/model" ) // MockExecutor for testing diff --git a/model/job.go b/internal/model/job.go similarity index 81% rename from model/job.go rename to internal/model/job.go index 36b6472..ea780a1 100644 --- a/model/job.go +++ b/internal/model/job.go @@ -1,9 +1,9 @@ package model import ( - "context" "time" + error2 "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/error" "gopkg.in/guregu/null.v4" "github.com/google/uuid" @@ -30,8 +30,13 @@ func (jt JobType) Valid() bool { type JobStatus string const ( - JobStatusRunning JobStatus = "RUNNING" - JobStatusStopped JobStatus = "STOPPED" + JobStatusRunning JobStatus = "RUNNING" + JobStatusScheduled JobStatus = "SCHEDULED" + JobStatusCancelled JobStatus = "CANCELLED" + JobStatusExecuted JobStatus = "EXECUTED" + JobStatusCompleted JobStatus = "COMPLETED" + JobStatusAwaitingNextExecution JobStatus = "AWAITING_NEXT_EXECUTION" + JobStatusStopped JobStatus = "STOPPED" ) func (js JobStatus) Valid() bool { @@ -95,8 +100,11 @@ type Job struct { UpdatedAt time.Time `json:"updated_at"` // when the job is scheduled to run next (can be null if the job is not scheduled to run again) - NextRun null.Time `json:"next_run"` + NextRun null.Time `json:"next_run"` + NumberOfRuns *int `json:"num_runs"` + AllowedFailedRuns *int `json:"allowed_failed_runs"` + // Custom user tags that can be used to filter jobs Tags []string `json:"tags"` } @@ -113,7 +121,6 @@ type JobUpdate struct { } func (j *Job) ApplyUpdate(update JobUpdate) { - if update.Type != nil { j.Type = *update.Type } @@ -151,10 +158,12 @@ type HTTPJob struct { Headers map[string]string `json:"headers"` // e.g., {"Content-Type": "application/json"} Body null.String `json:"body" swaggertype:"string"` // e.g., "{\"hello\": \"world\"}" ValidResponseCodes []int `json:"valid_response_codes"` // e.g., [200, 201, 202] - Auth Auth `json:"auth"` // e.g., {"type": "basic", "username": "foo", "password": "bar"} + // Todo encode the auth! + Auth Auth `json:"auth"` // e.g., {"type": "basic", "username": "foo", "password": "bar"} } type AMQPJob struct { + // Todo encode the connection string! Connection string `json:"connection"` // e.g., "amqp://guest:guest@localhost:5672/" Exchange string `json:"exchange"` // e.g., "my_exchange" RoutingKey string `json:"routing_key"` // e.g., "my_routing_key" @@ -174,15 +183,15 @@ type Auth struct { // Validate validates a Job struct. func (j *Job) Validate() error { if j.ID == uuid.Nil { - return ErrInvalidJobID + return error2.ErrInvalidJobID } if !j.Type.Valid() { - return ErrInvalidJobType + return error2.ErrInvalidJobType } if !j.Status.Valid() { - return ErrInvalidJobStatus + return error2.ErrInvalidJobStatus } if j.Type == JobTypeHTTP { @@ -191,7 +200,7 @@ func (j *Job) Validate() error { } if j.AMQPJob != nil { - return ErrInvalidJobFields + return error2.ErrInvalidJobFields } } @@ -201,25 +210,25 @@ func (j *Job) Validate() error { } if j.HTTPJob != nil { - return ErrInvalidJobFields + return error2.ErrInvalidJobFields } } // only one of execute_at or cron_schedule can be defined if j.ExecuteAt.Valid == j.CronSchedule.Valid { - return ErrInvalidJobSchedule + return error2.ErrInvalidJobSchedule } if j.CronSchedule.Valid { if _, err := cron.ParseStandard(j.CronSchedule.String); err != nil { - return ErrInvalidCronSchedule + return error2.ErrInvalidCronSchedule } cron.NewChain() } if j.ExecuteAt.Valid { if j.ExecuteAt.Time.Before(time.Now()) { - return ErrInvalidExecuteAt + return error2.ErrInvalidExecuteAt } } @@ -229,15 +238,15 @@ func (j *Job) Validate() error { // Validate validates an HTTPJob struct. func (httpJob *HTTPJob) Validate() error { if httpJob == nil { - return ErrHTTPJobNotDefined + return error2.ErrHTTPJobNotDefined } if httpJob.URL == "" { - return ErrEmptyHTTPJobURL + return error2.ErrEmptyHTTPJobURL } if httpJob.Method == "" { - return ErrEmptyHTTPJobMethod + return error2.ErrEmptyHTTPJobMethod } if err := httpJob.Auth.Validate(); err != nil { @@ -284,19 +293,24 @@ func (j *Job) SetInitialRunTime() { // Validate validates an AMQPJob struct. func (amqpJob *AMQPJob) Validate() error { if amqpJob == nil { - return ErrAMQPJobNotDefined + return error2.ErrAMQPJobNotDefined + } + + // Todo validate URL + if amqpJob.Connection == "" { + return error2.ErrEmptyHTTPJobURL } if amqpJob.Exchange == "" { - return ErrEmptyExchange + return error2.ErrEmptyExchange } if amqpJob.RoutingKey == "" { - return ErrEmptyRoutingKey + return error2.ErrEmptyRoutingKey } if !amqpJob.BodyEncoding.Valid() { - return ErrInvalidBodyEncoding + return error2.ErrInvalidBodyEncoding } return nil @@ -304,25 +318,25 @@ func (amqpJob *AMQPJob) Validate() error { func (auth *Auth) Validate() error { if auth == nil { - return ErrAuthMethodNotDefined + return error2.ErrAuthMethodNotDefined } if !auth.Type.Valid() { - return ErrInvalidAuthType + return error2.ErrInvalidAuthType } if auth.Type == AuthTypeBasic { if !auth.Username.Valid || auth.Username.String == "" { - return ErrEmptyUsername + return error2.ErrEmptyUsername } if !auth.Password.Valid || auth.Password.String == "" { - return ErrEmptyPassword + return error2.ErrEmptyPassword } } if auth.Type == AuthTypeBearer && (!auth.BearerToken.Valid || auth.BearerToken.String == "") { - return ErrEmptyBearerToken + return error2.ErrEmptyBearerToken } return nil @@ -362,11 +376,3 @@ func (j *JobCreate) ToJob() *Job { return job } - -func (j *Job) Execute(ctx context.Context, executor Executor) error { - return executor.Execute(ctx, j) -} - -type Executor interface { - Execute(ctx context.Context, job *Job) error -} diff --git a/internal/model/job_execution.go b/internal/model/job_execution.go new file mode 100644 index 0000000..3049187 --- /dev/null +++ b/internal/model/job_execution.go @@ -0,0 +1,26 @@ +package model + +import ( + "time" + + "github.com/google/uuid" + "gopkg.in/guregu/null.v4" +) + +type JobExecution struct { + ID int `json:"id"` + JobID uuid.UUID `json:"job_id"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + Success bool `json:"success"` + NumberOfExecutions int `json:"number_of_executions"` + NumberOfRetries int `json:"number_of_retries"` + ErrorMessage null.String `json:"error_message,omitempty" swaggertype:"string"` +} + +type JobExecutionStatus string + +const ( + JobExecutionStatusSuccessful JobExecutionStatus = "SUCCESSFUL" + JobExecutionStatusFailed JobExecutionStatus = "FAILED" +) diff --git a/model/job_test.go b/internal/model/job_test.go similarity index 91% rename from model/job_test.go rename to internal/model/job_test.go index bc78823..cec409b 100644 --- a/model/job_test.go +++ b/internal/model/job_test.go @@ -1,11 +1,13 @@ package model import ( + "testing" + "time" + "github.com/google/uuid" "github.com/stretchr/testify/assert" + error2 "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/error" "gopkg.in/guregu/null.v4" - "testing" - "time" ) func TestJobTypeValid(t *testing.T) { @@ -76,7 +78,7 @@ func TestHTTPJobValidate(t *testing.T) { Type: AuthTypeNone, }, }, - want: ErrEmptyHTTPJobURL, + want: error2.ErrEmptyHTTPJobURL, }, { name: "invalid job: empty Method", @@ -87,7 +89,7 @@ func TestHTTPJobValidate(t *testing.T) { Type: AuthTypeNone, }, }, - want: ErrEmptyHTTPJobMethod, + want: error2.ErrEmptyHTTPJobMethod, }, } @@ -121,7 +123,7 @@ func TestAMQPJobValidate(t *testing.T) { Exchange: "", RoutingKey: "my_routing_key", }, - want: ErrEmptyExchange, + want: error2.ErrEmptyExchange, }, { name: "invalid job: empty RoutingKey", @@ -130,7 +132,7 @@ func TestAMQPJobValidate(t *testing.T) { Exchange: "my_exchange", RoutingKey: "", }, - want: ErrEmptyRoutingKey, + want: error2.ErrEmptyRoutingKey, }, } @@ -170,7 +172,7 @@ func TestAuthValidate(t *testing.T) { Type: AuthTypeBasic, Password: null.StringFrom("testpassword"), }, - want: ErrEmptyUsername, + want: error2.ErrEmptyUsername, }, { name: "invalid auth: missing password", @@ -178,14 +180,14 @@ func TestAuthValidate(t *testing.T) { Type: AuthTypeBasic, Username: null.StringFrom("testuser"), }, - want: ErrEmptyPassword, + want: error2.ErrEmptyPassword, }, { name: "invalid auth: unsupported auth type", auth: Auth{ Type: "unsupported_type", }, - want: ErrInvalidAuthType, + want: error2.ErrInvalidAuthType, }, { name: "valid auth: bearer token", @@ -200,7 +202,7 @@ func TestAuthValidate(t *testing.T) { auth: Auth{ Type: AuthTypeBearer, }, - want: ErrEmptyBearerToken, + want: error2.ErrEmptyBearerToken, }, } @@ -251,7 +253,7 @@ func TestJobValidate(t *testing.T) { }, CreatedAt: time.Now(), }, - want: ErrInvalidJobID, + want: error2.ErrInvalidJobID, }, { name: "invalid job: http type with nil HTTPJob", @@ -262,7 +264,7 @@ func TestJobValidate(t *testing.T) { ExecuteAt: null.TimeFrom(time.Now().Add(time.Minute)), CreatedAt: time.Now(), }, - want: ErrHTTPJobNotDefined, + want: error2.ErrHTTPJobNotDefined, }, { name: "invalid job: unsupported Type", @@ -280,7 +282,7 @@ func TestJobValidate(t *testing.T) { }, CreatedAt: time.Now(), }, - want: ErrInvalidJobType, + want: error2.ErrInvalidJobType, }, { name: "invalid job: invalid cron expression", @@ -298,7 +300,7 @@ func TestJobValidate(t *testing.T) { }, CreatedAt: time.Now(), }, - want: ErrInvalidCronSchedule, + want: error2.ErrInvalidCronSchedule, }, { name: "invalid job: schedule and execute at both defined", @@ -317,7 +319,7 @@ func TestJobValidate(t *testing.T) { }, CreatedAt: time.Now(), }, - want: ErrInvalidJobSchedule, + want: error2.ErrInvalidJobSchedule, }, } diff --git a/foundation/database/dbmigrate/dbmigrate.go b/internal/pkg/database/dbmigrate/dbmigrate.go similarity index 92% rename from foundation/database/dbmigrate/dbmigrate.go rename to internal/pkg/database/dbmigrate/dbmigrate.go index f13435f..4073e5a 100644 --- a/foundation/database/dbmigrate/dbmigrate.go +++ b/internal/pkg/database/dbmigrate/dbmigrate.go @@ -10,7 +10,7 @@ import ( "github.com/ardanlabs/darwin/v3/dialects/postgres" "github.com/ardanlabs/darwin/v3/drivers/generic" "github.com/jmoiron/sqlx" - "github.com/xBlaz3kx/distributed-scheduler/foundation/database" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/database" ) var ( diff --git a/foundation/database/dbmigrate/sql/migrate.sql b/internal/pkg/database/dbmigrate/sql/migrate.sql similarity index 100% rename from foundation/database/dbmigrate/sql/migrate.sql rename to internal/pkg/database/dbmigrate/sql/migrate.sql diff --git a/foundation/database/dbtest/dbtest.go b/internal/pkg/database/dbtest/dbtest.go similarity index 94% rename from foundation/database/dbtest/dbtest.go rename to internal/pkg/database/dbtest/dbtest.go index 1056ba7..91863e8 100644 --- a/foundation/database/dbtest/dbtest.go +++ b/internal/pkg/database/dbtest/dbtest.go @@ -11,9 +11,9 @@ import ( "github.com/GLCharge/otelzap" "github.com/jmoiron/sqlx" - "github.com/xBlaz3kx/distributed-scheduler/foundation/database" - "github.com/xBlaz3kx/distributed-scheduler/foundation/database/dbmigrate" - "github.com/xBlaz3kx/distributed-scheduler/foundation/docker" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/database" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/database/dbmigrate" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/tests/docker" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) diff --git a/foundation/database/postgres.go b/internal/pkg/database/postgres.go similarity index 97% rename from foundation/database/postgres.go rename to internal/pkg/database/postgres.go index c3da05a..ad9bf78 100644 --- a/foundation/database/postgres.go +++ b/internal/pkg/database/postgres.go @@ -17,7 +17,7 @@ type Config struct { Name string MaxIdleConns int MaxOpenConns int - DisableTLS bool + DisableTLS bool `conf:"default:false"` } // Open knows how to open a database connection based on the configuration. diff --git a/model/error.go b/internal/pkg/error/error.go similarity index 69% rename from model/error.go rename to internal/pkg/error/error.go index 0f4895d..a5a8cf0 100644 --- a/model/error.go +++ b/internal/pkg/error/error.go @@ -1,6 +1,8 @@ -package model +package error -import "errors" +import ( + "errors" +) var ( ErrInvalidJobType = errors.New("job type must be either HTTP or AMQP") @@ -36,13 +38,28 @@ func (e *CustomError) Error() string { } func ToCustomJobError(err error) *CustomError { - switch err { - case ErrInvalidJobType, ErrInvalidJobID, ErrInvalidJobStatus, ErrInvalidJobFields, ErrInvalidJobSchedule, ErrInvalidCronSchedule, ErrInvalidExecuteAt, - ErrEmptyHTTPJobURL, ErrHTTPJobNotDefined, ErrEmptyHTTPJobMethod, ErrAMQPJobNotDefined, ErrEmptyExchange, ErrEmptyRoutingKey, - ErrInvalidAuthType, ErrEmptyUsername, ErrEmptyPassword, ErrEmptyBearerToken, ErrAuthMethodNotDefined, - ErrJobNotFound: + switch { + case errors.Is(err, ErrInvalidJobType), + errors.Is(err, ErrInvalidJobID), + errors.Is(err, ErrInvalidJobStatus), + errors.Is(err, ErrInvalidJobFields), + errors.Is(err, ErrInvalidJobSchedule), + errors.Is(err, ErrInvalidCronSchedule), + errors.Is(err, ErrInvalidExecuteAt), + errors.Is(err, ErrEmptyHTTPJobURL), + errors.Is(err, ErrHTTPJobNotDefined), + errors.Is(err, ErrEmptyHTTPJobMethod), + errors.Is(err, ErrAMQPJobNotDefined), + errors.Is(err, ErrEmptyExchange), + errors.Is(err, ErrEmptyRoutingKey), + errors.Is(err, ErrInvalidAuthType), + errors.Is(err, ErrEmptyUsername), + errors.Is(err, ErrEmptyPassword), + errors.Is(err, ErrEmptyBearerToken), + errors.Is(err, ErrAuthMethodNotDefined): return &CustomError{err, 400} - + case errors.Is(err, ErrJobNotFound): + return &CustomError{err, 404} default: return &CustomError{err, 500} } diff --git a/model/error_test.go b/internal/pkg/error/error_test.go similarity index 98% rename from model/error_test.go rename to internal/pkg/error/error_test.go index 56107e5..d223018 100644 --- a/model/error_test.go +++ b/internal/pkg/error/error_test.go @@ -1,4 +1,4 @@ -package model +package error import ( "errors" diff --git a/foundation/logger/logger.go b/internal/pkg/logger/logger.go similarity index 100% rename from foundation/logger/logger.go rename to internal/pkg/logger/logger.go diff --git a/foundation/docker/docker.go b/internal/pkg/tests/docker/docker.go similarity index 100% rename from foundation/docker/docker.go rename to internal/pkg/tests/docker/docker.go diff --git a/runner/mocks.go b/internal/runner/mocks.go similarity index 93% rename from runner/mocks.go rename to internal/runner/mocks.go index d3862de..0928ef5 100644 --- a/runner/mocks.go +++ b/internal/runner/mocks.go @@ -2,14 +2,15 @@ package runner import ( "context" - "github.com/GLCharge/otelzap" - "github.com/google/uuid" - "github.com/xBlaz3kx/distributed-scheduler/executor" - "github.com/xBlaz3kx/distributed-scheduler/model" - "go.uber.org/zap" "sync" "testing" "time" + + "github.com/GLCharge/otelzap" + "github.com/google/uuid" + "github.com/xBlaz3kx/distributed-scheduler/internal/executor" + "github.com/xBlaz3kx/distributed-scheduler/internal/model" + "go.uber.org/zap" ) type mockJobService struct { @@ -68,7 +69,7 @@ type mockExecutorFactory struct { factoryErr error } -func (m *mockExecutorFactory) NewExecutor(_ *model.Job, _ ...executor.Option) (model.Executor, error) { +func (m *mockExecutorFactory) NewExecutor(_ *model.Job, _ ...executor.Option) (executor.Executor, error) { if m.factoryErr != nil { return nil, m.factoryErr } diff --git a/runner/runner.go b/internal/runner/runner.go similarity index 96% rename from runner/runner.go rename to internal/runner/runner.go index c98c7fa..ece79a6 100644 --- a/runner/runner.go +++ b/internal/runner/runner.go @@ -2,13 +2,13 @@ package runner import ( "context" - "github.com/GLCharge/otelzap" - "go.uber.org/zap" "sync" "time" - "github.com/xBlaz3kx/distributed-scheduler/executor" - "github.com/xBlaz3kx/distributed-scheduler/model" + "github.com/GLCharge/otelzap" + "github.com/xBlaz3kx/distributed-scheduler/internal/executor" + "github.com/xBlaz3kx/distributed-scheduler/internal/model" + "go.uber.org/zap" ) type Runner struct { @@ -188,7 +188,7 @@ func (s *Runner) executeJob(job *model.Job) { startTime := time.Now() // Execute the job - err = job.Execute(s.ctx, jobExecutor) + err = jobExecutor.Execute(s.ctx, job) stopTime := time.Now() diff --git a/runner/runner_test.go b/internal/runner/runner_test.go similarity index 100% rename from runner/runner_test.go rename to internal/runner/runner_test.go diff --git a/service/job/job.go b/internal/service/job/job.go similarity index 96% rename from service/job/job.go rename to internal/service/job/job.go index 4f95362..a7b3195 100644 --- a/service/job/job.go +++ b/internal/service/job/job.go @@ -2,13 +2,13 @@ package job import ( "context" + "time" + "github.com/GLCharge/otelzap" "github.com/google/uuid" + "github.com/xBlaz3kx/distributed-scheduler/internal/model" + "github.com/xBlaz3kx/distributed-scheduler/internal/store" "gopkg.in/guregu/null.v4" - "time" - - "github.com/xBlaz3kx/distributed-scheduler/model" - "github.com/xBlaz3kx/distributed-scheduler/store" ) // Service is a struct that contains a store and a logger. diff --git a/service/job/job_test.go b/internal/service/job/job_test.go similarity index 96% rename from service/job/job_test.go rename to internal/service/job/job_test.go index 87c4e7e..fca1cf9 100644 --- a/service/job/job_test.go +++ b/internal/service/job/job_test.go @@ -10,10 +10,10 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" "github.com/samber/lo" - "github.com/xBlaz3kx/distributed-scheduler/foundation/database/dbtest" - "github.com/xBlaz3kx/distributed-scheduler/foundation/docker" - "github.com/xBlaz3kx/distributed-scheduler/model" - "github.com/xBlaz3kx/distributed-scheduler/store/postgres" + "github.com/xBlaz3kx/distributed-scheduler/internal/model" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/database/dbtest" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/tests/docker" + "github.com/xBlaz3kx/distributed-scheduler/internal/store/postgres" "gopkg.in/guregu/null.v4" ) diff --git a/store/postgres/helpers.go b/internal/store/postgres/helpers.go similarity index 81% rename from store/postgres/helpers.go rename to internal/store/postgres/helpers.go index 587e04a..68bcf99 100644 --- a/store/postgres/helpers.go +++ b/internal/store/postgres/helpers.go @@ -2,6 +2,7 @@ package postgres import ( "database/sql" + "errors" "github.com/GLCharge/otelzap" "github.com/jmoiron/sqlx" @@ -10,7 +11,7 @@ import ( func rollback(tx *sqlx.Tx, log *otelzap.Logger) { err := tx.Rollback() - if err != sql.ErrTxDone && err != nil { + if err != nil && !errors.Is(err, sql.ErrTxDone) { log.Error("Failed to rollback transaction", zap.Error(err)) } } diff --git a/store/postgres/models.go b/internal/store/postgres/models.go similarity index 97% rename from store/postgres/models.go rename to internal/store/postgres/models.go index 3d81f3d..e08787d 100644 --- a/store/postgres/models.go +++ b/internal/store/postgres/models.go @@ -7,7 +7,7 @@ import ( "github.com/google/uuid" "github.com/lib/pq" "github.com/pkg/errors" - "github.com/xBlaz3kx/distributed-scheduler/model" + "github.com/xBlaz3kx/distributed-scheduler/internal/model" "gopkg.in/guregu/null.v4" ) diff --git a/store/postgres/models_test.go b/internal/store/postgres/models_test.go similarity index 95% rename from store/postgres/models_test.go rename to internal/store/postgres/models_test.go index afa9ab4..eee67b8 100644 --- a/store/postgres/models_test.go +++ b/internal/store/postgres/models_test.go @@ -7,7 +7,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/xBlaz3kx/distributed-scheduler/model" + "github.com/xBlaz3kx/distributed-scheduler/internal/model" "gopkg.in/guregu/null.v4" ) diff --git a/store/postgres/postgres.go b/internal/store/postgres/postgres.go similarity index 96% rename from store/postgres/postgres.go rename to internal/store/postgres/postgres.go index c697aee..158f21c 100644 --- a/store/postgres/postgres.go +++ b/internal/store/postgres/postgres.go @@ -5,14 +5,15 @@ import ( "database/sql" "errors" "fmt" - "github.com/GLCharge/otelzap" - "gopkg.in/guregu/null.v4" "time" + "github.com/GLCharge/otelzap" "github.com/google/uuid" "github.com/jmoiron/sqlx" - "github.com/xBlaz3kx/distributed-scheduler/model" - "github.com/xBlaz3kx/distributed-scheduler/store" + "github.com/xBlaz3kx/distributed-scheduler/internal/model" + errs "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/error" + "github.com/xBlaz3kx/distributed-scheduler/internal/store" + "gopkg.in/guregu/null.v4" ) type pgStore struct { @@ -145,7 +146,7 @@ func (s *pgStore) GetJob(ctx context.Context, id uuid.UUID) (*model.Job, error) err := s.db.GetContext(ctx, &dbJob, query, id) if err != nil { if errors.Is(err, sql.ErrNoRows) { - return nil, model.ErrJobNotFound + return nil, errs.ErrJobNotFound } return nil, fmt.Errorf("failed to get job from database: %w", err) } diff --git a/store/store.go b/internal/store/store.go similarity index 94% rename from store/store.go rename to internal/store/store.go index 774040f..5e8bed9 100644 --- a/store/store.go +++ b/internal/store/store.go @@ -5,7 +5,7 @@ import ( "time" "github.com/google/uuid" - "github.com/xBlaz3kx/distributed-scheduler/model" + "github.com/xBlaz3kx/distributed-scheduler/internal/model" "gopkg.in/guregu/null.v4" ) diff --git a/model/job_execution.go b/model/job_execution.go deleted file mode 100644 index a6b2dd1..0000000 --- a/model/job_execution.go +++ /dev/null @@ -1,23 +0,0 @@ -package model - -import ( - "github.com/google/uuid" - "gopkg.in/guregu/null.v4" - "time" -) - -type JobExecution struct { - ID int `json:"id"` - JobID uuid.UUID `json:"job_id"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - Success bool `json:"success"` - ErrorMessage null.String `json:"error_message,omitempty" swaggertype:"string"` -} - -type JobExecutionStatus string - -const ( - JobExecutionStatusSuccessful JobExecutionStatus = "SUCCESSFUL" - JobExecutionStatusFailed JobExecutionStatus = "FAILED" -)