From 115375be8f352227d0f4bf250893b875383c3615 Mon Sep 17 00:00:00 2001 From: Abdul Raheem Siddiqui Date: Wed, 22 Nov 2023 14:50:18 -0500 Subject: [PATCH 1/6] Make db interface and sqlite and pg implementation --- .github/workflows/e2e-tests.yml | 7 ++ api/go.mod | 1 + api/go.sum | 2 + api/handlers/config.go | 18 ++- api/handlers/handlers.go | 69 ++++++++--- api/jobs/aws_batch_jobs.go | 6 +- api/jobs/database.go | 202 ++++--------------------------- api/jobs/database_postgres.go | 179 ++++++++++++++++++++++++++++ api/jobs/database_sqlite.go | 204 ++++++++++++++++++++++++++++++++ api/jobs/docker_jobs.go | 6 +- api/main.go | 7 +- docker-compose.yml | 12 ++ example.env | 18 ++- 13 files changed, 516 insertions(+), 215 deletions(-) create mode 100644 api/jobs/database_postgres.go create mode 100644 api/jobs/database_sqlite.go diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index e91dec8..e5d72c8 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -34,6 +34,13 @@ jobs: echo MINIO_ROOT_USER=user >> .env echo MINIO_ROOT_PASSWORD=password >> .env + echo DB_SERVICE='postgres' >> .env + echo POSTGRES_CONN_STRING='postgres://user:password@postgres:5432/db?sslmode=disable' >> .env + echo POSTGRES_PASSWORD=password >> .env + echo POSTGRES_USER=user + echo POSTGRES_DB=db >> .env + echo PG_LOG_CHECKPOINTS='off' >> .env + echo AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} >> .env echo AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} >> .env echo AWS_REGION=${{ secrets.AWS_DEFAULT_REGION }} >> .env diff --git a/api/go.mod b/api/go.mod index a23bc81..f2aa2e3 100644 --- a/api/go.mod +++ b/api/go.mod @@ -9,6 +9,7 @@ require ( github.com/joho/godotenv v1.5.1 github.com/labstack/echo/v4 v4.10.2 github.com/labstack/gommon v0.4.0 + github.com/lib/pq v1.10.9 github.com/sirupsen/logrus v1.7.0 github.com/swaggo/echo-swagger v1.3.5 github.com/swaggo/swag v1.8.1 diff --git a/api/go.sum b/api/go.sum index 89104c9..6c1ecc8 100644 --- a/api/go.sum +++ b/api/go.sum @@ -69,6 +69,8 @@ github.com/labstack/echo/v4 v4.10.2/go.mod h1:OEyqf2//K1DFdE57vw2DRgWY0M7s65IVQO github.com/labstack/gommon v0.3.1/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM= github.com/labstack/gommon v0.4.0 h1:y7cvthEAEbU0yHOf4axH8ZG2NH8knB9iNSoTO8dyIk8= github.com/labstack/gommon v0.4.0/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= diff --git a/api/handlers/config.go b/api/handlers/config.go index d1c975c..8d636b9 100644 --- a/api/handlers/config.go +++ b/api/handlers/config.go @@ -38,7 +38,7 @@ type RESTHandler struct { ConformsTo []string T Template StorageSvc *s3.S3 - DB *jobs.DB + DB jobs.Database MessageQueue *jobs.MessageQueue ActiveJobs *jobs.ActiveJobs ProcessList *pr.ProcessList @@ -55,7 +55,7 @@ func prettyPrint(v interface{}) string { // Initializes resources and return a new handler // errors are fatal -func NewRESTHander(pluginsDir string, dbPath string) *RESTHandler { +func NewRESTHander(pluginsDir string) *RESTHandler { apiName, exist := os.LookupEnv("API_NAME") if !exist { log.Warn("env variable API_NAME not set") @@ -77,6 +77,17 @@ func NewRESTHander(pluginsDir string, dbPath string) *RESTHandler { }, } + dbType, exist := os.LookupEnv("DB_SERVICE") + if !exist { + log.Fatal("env variable DB_SERVICE not set") + } + + db, err := jobs.NewDatabase(dbType) + if err != nil { + log.Fatalf(err.Error()) + } + config.DB = db + // Read all the html templates funcMap := template.FuncMap{ "prettyPrint": prettyPrint, // to pretty print JSONs for results and metadata @@ -114,9 +125,6 @@ func NewRESTHander(pluginsDir string, dbPath string) *RESTHandler { ac.Jobs = make(map[string]*jobs.Job) config.ActiveJobs = &ac - adb := jobs.InitDB(dbPath) - config.DB = adb - config.MessageQueue = &jobs.MessageQueue{ StatusChan: make(chan jobs.StatusMessage, 500), JobDone: make(chan jobs.Job, 1), diff --git a/api/handlers/handlers.go b/api/handlers/handlers.go index 9d04fac..842e338 100644 --- a/api/handlers/handlers.go +++ b/api/handlers/handlers.go @@ -406,12 +406,13 @@ func (rh *RESTHandler) JobDismissHandler(c echo.Context) error { // @Produce json // @Success 200 {object} jobResponse // @Router /jobs/{jobID} [get] -func (rh *RESTHandler) JobStatusHandler(c echo.Context) error { - err := validateFormat(c) +func (rh *RESTHandler) JobStatusHandler(c echo.Context) (err error) { + err = validateFormat(c) if err != nil { return err } + var jRcrd jobs.JobRecord jobID := c.Param("jobID") if job, ok := rh.ActiveJobs.Jobs[jobID]; ok { resp := jobResponse{ @@ -421,7 +422,7 @@ func (rh *RESTHandler) JobStatusHandler(c echo.Context) error { Status: (*job).CurrentStatus(), } return prepareResponse(c, http.StatusOK, "jobStatus", resp) - } else if jRcrd, ok := rh.DB.GetJob(jobID); ok { + } else if jRcrd, ok, err = rh.DB.GetJob(jobID); ok { resp := jobResponse{ ProcessID: jRcrd.ProcessID, JobID: jRcrd.JobID, @@ -430,6 +431,11 @@ func (rh *RESTHandler) JobStatusHandler(c echo.Context) error { } return prepareResponse(c, http.StatusOK, "jobStatus", resp) } + + if err != nil { + output := errResponse{HTTPStatus: http.StatusInternalServerError, Message: err.Error()} + return prepareResponse(c, http.StatusInternalServerError, "error", output) + } output := errResponse{HTTPStatus: http.StatusNotFound, Message: fmt.Sprintf("%s job id not found", jobID)} return prepareResponse(c, http.StatusNotFound, "error", output) } @@ -443,18 +449,19 @@ func (rh *RESTHandler) JobStatusHandler(c echo.Context) error { // @Success 200 {object} map[string]interface{} // @Router /jobs/{jobID}/results [get] // Does not produce HTML -func (rh *RESTHandler) JobResultsHandler(c echo.Context) error { - err := validateFormat(c) +func (rh *RESTHandler) JobResultsHandler(c echo.Context) (err error) { + err = validateFormat(c) if err != nil { return err } + var jRcrd jobs.JobRecord jobID := c.Param("jobID") if job, ok := rh.ActiveJobs.Jobs[jobID]; ok { // ActiveJobs hit output := errResponse{HTTPStatus: http.StatusNotFound, Message: fmt.Sprintf("results not ready, job %s", (*job).CurrentStatus())} return prepareResponse(c, http.StatusNotFound, "error", output) - } else if jRcrd, ok := rh.DB.GetJob(jobID); ok { // db hit + } else if jRcrd, ok, err = rh.DB.GetJob(jobID); ok { // db hit switch jRcrd.Status { case jobs.SUCCESSFUL: @@ -480,6 +487,12 @@ func (rh *RESTHandler) JobResultsHandler(c echo.Context) error { } } + + if err != nil { + output := errResponse{HTTPStatus: http.StatusInternalServerError, Message: err.Error()} + return prepareResponse(c, http.StatusInternalServerError, "error", output) + } + // miss output := errResponse{HTTPStatus: http.StatusNotFound, Message: fmt.Sprintf("%s job id not found", jobID)} return prepareResponse(c, http.StatusNotFound, "error", output) @@ -494,18 +507,20 @@ func (rh *RESTHandler) JobResultsHandler(c echo.Context) error { // @Success 200 {object} map[string]interface{} // @Router /jobs/{jobID}/results [get] // Does not produce HTML -func (rh *RESTHandler) JobMetaDataHandler(c echo.Context) error { - err := validateFormat(c) +func (rh *RESTHandler) JobMetaDataHandler(c echo.Context) (err error) { + err = validateFormat(c) if err != nil { return err } + var jRcrd jobs.JobRecord + jobID := c.Param("jobID") if job, ok := rh.ActiveJobs.Jobs[jobID]; ok { // ActiveJobs hit output := errResponse{HTTPStatus: http.StatusNotFound, Message: fmt.Sprintf("metadata not ready, job %s", (*job).CurrentStatus())} return prepareResponse(c, http.StatusNotFound, "error", output) - } else if jRcrd, ok := rh.DB.GetJob(jobID); ok { // db hit + } else if jRcrd, ok, err = rh.DB.GetJob(jobID); ok { // db hit switch jRcrd.Status { case jobs.SUCCESSFUL: md, err := jobs.FetchMeta(rh.StorageSvc, jobID) @@ -529,6 +544,12 @@ func (rh *RESTHandler) JobMetaDataHandler(c echo.Context) error { } } + + if err != nil { + output := errResponse{HTTPStatus: http.StatusInternalServerError, Message: err.Error()} + return prepareResponse(c, http.StatusInternalServerError, "error", output) + } + // miss output := errResponse{HTTPStatus: http.StatusNotFound, Message: fmt.Sprintf("%s job id not found", jobID)} return prepareResponse(c, http.StatusNotFound, "error", output) @@ -542,15 +563,16 @@ func (rh *RESTHandler) JobMetaDataHandler(c echo.Context) error { // @Param jobID path string true "example: 44d9ca0e-2ca7-4013-907f-a8ccc60da3b4" // @Success 200 {object} jobs.JobLogs // @Router /jobs/{jobID}/logs [get] -func (rh *RESTHandler) JobLogsHandler(c echo.Context) error { +func (rh *RESTHandler) JobLogsHandler(c echo.Context) (err error) { jobID := c.Param("jobID") - err := validateFormat(c) + err = validateFormat(c) if err != nil { return err } var pid string + var jRcrd jobs.JobRecord if job, ok := rh.ActiveJobs.Jobs[jobID]; ok { // ActiveJobs hit err = (*job).UpdateContainerLogs() @@ -559,13 +581,18 @@ func (rh *RESTHandler) JobLogsHandler(c echo.Context) error { return prepareResponse(c, http.StatusInternalServerError, "error", output) } pid = (*job).ProcessID() - } else if jRcrd, ok := rh.DB.GetJob(jobID); ok { // db hit + } else if jRcrd, ok, err = rh.DB.GetJob(jobID); ok { // db hit pid = jRcrd.ProcessID } else { // miss output := errResponse{HTTPStatus: http.StatusNotFound, Message: "jobID not found"} return prepareResponse(c, http.StatusNotFound, "error", output) } + if err != nil { + output := errResponse{HTTPStatus: http.StatusInternalServerError, Message: err.Error()} + return prepareResponse(c, http.StatusInternalServerError, "error", output) + } + logs, err := jobs.FetchLogs(rh.StorageSvc, jobID, pid, false) if err != nil { output := errResponse{HTTPStatus: http.StatusInternalServerError, Message: "error while fetching logs: " + err.Error()} @@ -709,13 +736,19 @@ func (rh *RESTHandler) JobStatusUpdateHandler(c echo.Context) error { (*sm.Job).LogMessage(fmt.Sprintf("Status update received: %s.", sm.Status), logrus.InfoLevel) rh.MessageQueue.StatusChan <- sm return c.JSON(http.StatusAccepted, "status update received") - } else if ok := rh.DB.CheckJobExist(jobID); ok { // db hit - log.Infof("Status update received for inactive job: %s", jobID) - // returning Accepted here so that callers do not retry - return c.JSON(http.StatusAccepted, "job not an active job") - } else { - return c.JSON(http.StatusBadRequest, "job id not found") + } else if ok, err := rh.DB.CheckJobExist(jobID); ok || err != nil { // db hit or error + if ok { + log.Infof("Status update received for inactive job: %s", jobID) + // returning Accepted here so that callers do not retry + return c.JSON(http.StatusAccepted, "job not an active job") + } + + if err != nil { + output := errResponse{HTTPStatus: http.StatusInternalServerError, Message: err.Error()} + return prepareResponse(c, http.StatusInternalServerError, "error", output) + } } + return c.JSON(http.StatusBadRequest, "job id not found") } // func (rh *RESTHandler) JobResultsUpdateHandler(c echo.Context) error { diff --git a/api/jobs/aws_batch_jobs.go b/api/jobs/aws_batch_jobs.go index 108fbaa..28d9c6c 100644 --- a/api/jobs/aws_batch_jobs.go +++ b/api/jobs/aws_batch_jobs.go @@ -53,7 +53,7 @@ type AWSBatchJob struct { cloudWatchForwardToken string // MetaData - DB *DB + DB Database StorageSvc *s3.S3 DoneChan chan Job } @@ -202,7 +202,7 @@ func (j *AWSBatchJob) initLogger() error { lvl, err := log.ParseLevel(os.Getenv("LOG_LEVEL")) if err != nil { - j.logger.Warnf("Invalid LOG_LEVEL set: %s; defaulting to INFO", os.Getenv("LOG_LEVEL")) + j.logger.Warnf("Invalid LOG_LEVEL set: %s, defaulting to INFO", os.Getenv("LOG_LEVEL")) lvl = log.InfoLevel } j.logger.SetLevel(lvl) @@ -239,7 +239,7 @@ func (j *AWSBatchJob) Create() error { j.batchContext = batchContext // At this point job is ready to be added to database - err = j.DB.addJob(j.UUID, "accepted", time.Now(), "", "aws-batch", j.ProcessName, j.Submitter) + err = j.DB.addJob(j.UUID, "accepted", "", "aws-batch", j.ProcessName, j.Submitter, time.Now()) if err != nil { j.ctxCancel() return err diff --git a/api/jobs/database.go b/api/jobs/database.go index 8414ae4..ae29a73 100644 --- a/api/jobs/database.go +++ b/api/jobs/database.go @@ -1,199 +1,43 @@ package jobs import ( - "database/sql" "fmt" "os" - "path/filepath" - "strings" "time" - - _ "github.com/mattn/go-sqlite3" - log "github.com/sirupsen/logrus" ) -type DB struct { - Handle *sql.DB -} - -// Create tables in the database if they do not exist already -func (db *DB) createTables() { - - // SQLite does not have a built-in ENUM type or array type. - // SQLite doesn't enforce the length of the VARCHAR datatype, therefore not using something like VARCHAR(30). - // SQLite's concurrency control is based on transactions, not connections. A connection to a SQLite database does not inherently acquire a lock. - // Locks are acquired when a transaction is started and released when the transaction is committed or rolled back. - - // indices needed to speedup - // fetching jobs for a particular process id - // providing job-lists ordered by time - - queryJobs := ` - CREATE TABLE IF NOT EXISTS jobs ( - id TEXT PRIMARY KEY, - status TEXT NOT NULL, - updated TIMESTAMP NOT NULL, - mode TEXT NOT NULL, - host TEXT NOT NULL, - process_id TEXT NOT NULL, - submitter TEXT NOT NULL DEFAULT '' - ); - - CREATE INDEX IF NOT EXISTS idx_jobs_updated ON jobs(updated); - CREATE INDEX IF NOT EXISTS idx_jobs_process_id ON jobs(process_id); - CREATE INDEX IF NOT EXISTS idx_jobs_submitter ON jobs(submitter); - ` - - _, err := db.Handle.Exec(queryJobs) - if err != nil { - log.Fatal(err) - } -} - -// Initialize the database. -// Creates intermediate directories if not exist. -func InitDB(dbPath string) *DB { - - // Create directory structure if it doesn't exist - dir := filepath.Dir(dbPath) - err := os.MkdirAll(dir, 0755) - if err != nil { - log.Fatalf(err.Error()) - } - - h, err := sql.Open("sqlite3", dbPath+"?mode=rwc") - // it maybe a good idea to check here if the connections has write privilege https://stackoverflow.com/a/44707371/11428410 https://www.sqlite.org/c3ref/db_readonly.html - // also maybe we should make db such that only go can write to it - - if err != nil { - log.Fatalf("Could not open %s Delete the existing database to start with a new database. Error: %s", dbPath, err.Error()) - } - - if h == nil { - log.Fatal("db nil") - } - - db := DB{Handle: h} - db.createTables() - return &db -} - -// Add job to the database. Will return error if job exist. -func (db *DB) addJob(jid string, status string, updated time.Time, mode string, host string, process_id string, submitter string) error { - query := `INSERT INTO jobs (id, status, updated, mode, host, process_id, submitter) VALUES (?, ?, ?, ?, ?, ?, ?)` - - _, err := db.Handle.Exec(query, jid, status, updated, mode, host, process_id, submitter) - if err != nil { - return err - } - return nil -} - -// Update status and time of a job. -func (db *DB) updateJobRecord(jid string, status string, now time.Time) { - query := `UPDATE jobs SET status = ?, updated = ? WHERE id = ?` - _, err := db.Handle.Exec(query, status, now, jid) - if err != nil { - log.Error(err) - } -} - -// Get Job Record from database given a job id. -// If job do not exists, or error encountered bool would be false. -// Similar behavior as key exist in hashmap. -func (db *DB) GetJob(jid string) (JobRecord, bool) { - query := `SELECT * FROM jobs WHERE id = ?` - - jr := JobRecord{} - - row := db.Handle.QueryRow(query, jid) - err := row.Scan(&jr.JobID, &jr.Status, &jr.LastUpdate, &jr.Mode, &jr.Host, &jr.ProcessID, &jr.Submitter) - if err != nil { - if err == sql.ErrNoRows { - return JobRecord{}, false - } else { - log.Error(err) - return JobRecord{}, false - } - } - return jr, true -} - -// Check if a job exists in database. -func (db *DB) CheckJobExist(jid string) bool { - query := `SELECT id FROM jobs WHERE id = ?` - - js := JobRecord{} - - row := db.Handle.QueryRow(query, jid) - err := row.Scan(&js.JobID) - if err != nil { - if err == sql.ErrNoRows { - return false - } else { - log.Error(err) - return false - } - } - return true +// Database interface abstracts database operations +type Database interface { + addJob(jid, status, mode, host, processID, submitter string, updated time.Time) error + updateJobRecord(jid, status string, now time.Time) error + GetJob(jid string) (JobRecord, bool, error) + CheckJobExist(jid string) (bool, error) + GetJobs(limit, offset int, processIDs, statuses, submitters []string) ([]JobRecord, error) + Close() error } -// Assumes query parameters are valid -func (db *DB) GetJobs(limit, offset int, processIDs, statuses []string, submitters []string) ([]JobRecord, error) { - baseQuery := `SELECT id, status, updated, process_id, submitter FROM jobs` - whereClauses := []string{} - args := []interface{}{} +func NewDatabase(dbType string) (db Database, err error) { - if len(processIDs) > 0 { - placeholders := strings.Repeat("?,", len(processIDs)-1) + "?" - whereClauses = append(whereClauses, fmt.Sprintf("process_id IN (%s)", placeholders)) - for _, pid := range processIDs { - args = append(args, pid) + switch dbType { + case "sqlite": + dbPath, exist := os.LookupEnv("SQLITE_DB_PATH") + if !exist { + return nil, fmt.Errorf("env variable SQLITE_DB_PATH not set") } - } - - if len(statuses) > 0 { - placeholders := strings.Repeat("?,", len(statuses)-1) + "?" - whereClauses = append(whereClauses, fmt.Sprintf("status IN (%s)", placeholders)) - for _, st := range statuses { - args = append(args, st) - } - } - - if len(submitters) > 0 { - placeholders := strings.Repeat("?,", len(submitters)-1) + "?" - whereClauses = append(whereClauses, fmt.Sprintf("submitter IN (%s)", placeholders)) - for _, sb := range submitters { - args = append(args, sb) + db, err = NewSQLiteDB(dbPath) + case "postgres": + connString, exist := os.LookupEnv("POSTGRES_CONN_STRING") + if !exist { + return nil, fmt.Errorf("env variable POSTGRES_CONN_STRING not set") } + db, err = NewPostgresDB(connString) + default: + return nil, fmt.Errorf("unsupported database type: %s", dbType) } - if len(whereClauses) > 0 { - baseQuery += " WHERE " + strings.Join(whereClauses, " AND ") - } - - query := baseQuery + ` ORDER BY updated DESC LIMIT ? OFFSET ?` - args = append(args, limit, offset) - - res := []JobRecord{} - - rows, err := db.Handle.Query(query, args...) if err != nil { return nil, err } - defer rows.Close() - - for rows.Next() { - var r JobRecord - if err := rows.Scan(&r.JobID, &r.Status, &r.LastUpdate, &r.ProcessID, &r.Submitter); err != nil { - return nil, err - } - res = append(res, r) - } - err = rows.Err() - if err != nil { - return nil, err - } - return res, nil + return db, nil } diff --git a/api/jobs/database_postgres.go b/api/jobs/database_postgres.go new file mode 100644 index 0000000..bc12ab9 --- /dev/null +++ b/api/jobs/database_postgres.go @@ -0,0 +1,179 @@ +package jobs + +import ( + "database/sql" + "fmt" + "strings" + "time" + + _ "github.com/lib/pq" +) + +type PostgresDB struct { + Handle *sql.DB +} + +// Initialize the database. +// Creates intermediate directories if not exist. +func NewPostgresDB(dbConnString string) (*PostgresDB, error) { + h, err := sql.Open("postgres", dbConnString) + + if err != nil { + return nil, fmt.Errorf("could not connect to database. Error: %s", err.Error()) + } + + if h == nil { + return nil, fmt.Errorf("db nil") + } + + db := PostgresDB{Handle: h} + db.createTables() + return &db, nil +} + +// createTables in the database if they do not exist already for PostgreSQL +func (postgresDB *PostgresDB) createTables() error { + + queryJobs := ` + CREATE TABLE IF NOT EXISTS jobs ( + id TEXT PRIMARY KEY, + status TEXT NOT NULL, + updated TIMESTAMP WITHOUT TIME ZONE NOT NULL, + mode TEXT NOT NULL, + host TEXT NOT NULL, + process_id TEXT NOT NULL, + submitter TEXT NOT NULL DEFAULT '' + ); + + CREATE INDEX IF NOT EXISTS idx_jobs_updated ON jobs(updated); + CREATE INDEX IF NOT EXISTS idx_jobs_process_id ON jobs(process_id); + CREATE INDEX IF NOT EXISTS idx_jobs_submitter ON jobs(submitter); + ` + + _, err := postgresDB.Handle.Exec(queryJobs) + if err != nil { + return fmt.Errorf("error creating tables: %s", err) + } + return nil +} + +// AddJob adds a new job to the database +func (db *PostgresDB) addJob(jid, status, mode, host, processID, submitter string, updated time.Time) error { + query := `INSERT INTO jobs (id, status, updated, mode, host, process_id, submitter) VALUES ($1, $2, $3, $4, $5, $6, $7)` + _, err := db.Handle.Exec(query, jid, status, updated, mode, host, processID, submitter) + return err +} + +// UpdateJobRecord updates a job record +func (db *PostgresDB) updateJobRecord(jid, status string, now time.Time) error { + query := `UPDATE jobs SET status = $2, updated = $3 WHERE id = $1` + _, err := db.Handle.Exec(query, jid, status, now) + return err +} + +// GetJob retrieves a job record by id +func (db *PostgresDB) GetJob(jid string) (JobRecord, bool, error) { + query := `SELECT * FROM jobs WHERE id = $1` + var jr JobRecord + err := db.Handle.QueryRow(query, jid).Scan(&jr.JobID, &jr.Status, &jr.LastUpdate, &jr.Mode, &jr.Host, &jr.ProcessID, &jr.Submitter) + if err != nil { + if err == sql.ErrNoRows { + return JobRecord{}, false, nil + } + return JobRecord{}, false, err + } + return jr, true, nil +} + +// CheckJobExist checks if a job exists in the database +func (db *PostgresDB) CheckJobExist(jid string) (bool, error) { + query := `SELECT 1 FROM jobs WHERE id = $1` + var exists int + err := db.Handle.QueryRow(query, jid).Scan(&exists) + if err != nil { + if err == sql.ErrNoRows { + return false, nil + } + return false, err + } + return true, nil +} + +// Assumes query parameters are valid +func (pgDB *PostgresDB) GetJobs(limit, offset int, processIDs, statuses, submitters []string) ([]JobRecord, error) { + baseQuery := `SELECT id, status, updated, process_id, submitter FROM jobs` + whereClauses := []string{} + args := []interface{}{} + + argIndex := 1 // Start from 1 for PostgreSQL placeholders + + if len(processIDs) > 0 { + placeholders := make([]string, len(processIDs)) + for i := range processIDs { + placeholders[i] = fmt.Sprintf("$%d", argIndex) + argIndex++ + } + whereClauses = append(whereClauses, "process_id IN ("+strings.Join(placeholders, ", ")+")") + for _, pid := range processIDs { + args = append(args, pid) + } + } + + if len(statuses) > 0 { + placeholders := make([]string, len(statuses)) + for i := range statuses { + placeholders[i] = fmt.Sprintf("$%d", argIndex) + argIndex++ + } + whereClauses = append(whereClauses, "status IN ("+strings.Join(placeholders, ", ")+")") + for _, st := range statuses { + args = append(args, st) + } + } + + if len(submitters) > 0 { + placeholders := make([]string, len(submitters)) + for i := range submitters { + placeholders[i] = fmt.Sprintf("$%d", argIndex) + argIndex++ + } + whereClauses = append(whereClauses, "submitter IN ("+strings.Join(placeholders, ", ")+")") + for _, sb := range submitters { + args = append(args, sb) + } + } + + if len(whereClauses) > 0 { + baseQuery += " WHERE " + strings.Join(whereClauses, " AND ") + } + + // Add limit and offset to the query and args + query := baseQuery + fmt.Sprintf(" ORDER BY updated DESC LIMIT $%d OFFSET $%d", argIndex, argIndex+1) + args = append(args, limit, offset) + + res := []JobRecord{} + + rows, err := pgDB.Handle.Query(query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var r JobRecord + if err := rows.Scan(&r.JobID, &r.Status, &r.LastUpdate, &r.ProcessID, &r.Submitter); err != nil { + return nil, err + } + res = append(res, r) + } + + err = rows.Err() + if err != nil { + return nil, err + } + return res, nil +} + +func (pgDB *PostgresDB) Close() error { + return pgDB.Handle.Close() +} diff --git a/api/jobs/database_sqlite.go b/api/jobs/database_sqlite.go new file mode 100644 index 0000000..a03fb76 --- /dev/null +++ b/api/jobs/database_sqlite.go @@ -0,0 +1,204 @@ +package jobs + +import ( + "database/sql" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + _ "github.com/mattn/go-sqlite3" + log "github.com/sirupsen/logrus" +) + +type SQLiteDB struct { + Handle *sql.DB +} + +// Initialize the database. +// Creates intermediate directories if not exist. +func NewSQLiteDB(dbPath string) (*SQLiteDB, error) { + + // Create directory structure if it doesn't exist + dir := filepath.Dir(dbPath) + err := os.MkdirAll(dir, 0755) + if err != nil { + return nil, err + } + + h, err := sql.Open("sqlite3", dbPath+"?mode=rwc") + // it maybe a good idea to check here if the connections has write privilege https://stackoverflow.com/a/44707371/11428410 https://www.sqlite.org/c3ref/db_readonly.html + // also maybe we should make db such that only go can write to it + + if err != nil { + return nil, fmt.Errorf("could not open %s Delete the existing database to start with a new database. Error: %s", dbPath, err.Error()) + } + + if h == nil { + return nil, fmt.Errorf("db nil") + } + + db := SQLiteDB{Handle: h} + db.createTables() + return &db, nil +} + +// Create tables in the database if they do not exist already +func (sqliteDB *SQLiteDB) createTables() error { + + // SQLite does not have a built-in ENUM type or array type. + // SQLite doesn't enforce the length of the VARCHAR datatype, therefore not using something like VARCHAR(30). + // SQLite's concurrency control is based on transactions, not connections. A connection to a SQLite database does not inherently acquire a lock. + // Locks are acquired when a transaction is started and released when the transaction is committed or rolled back. + + // indices needed to speedup + // fetching jobs for a particular process id + // providing job-lists ordered by time + + queryJobs := ` + CREATE TABLE IF NOT EXISTS jobs ( + id TEXT PRIMARY KEY, + status TEXT NOT NULL, + updated TIMESTAMP NOT NULL, + mode TEXT NOT NULL, + host TEXT NOT NULL, + process_id TEXT NOT NULL, + submitter TEXT NOT NULL DEFAULT '' + ); + + CREATE INDEX IF NOT EXISTS idx_jobs_updated ON jobs(updated); + CREATE INDEX IF NOT EXISTS idx_jobs_process_id ON jobs(process_id); + CREATE INDEX IF NOT EXISTS idx_jobs_submitter ON jobs(submitter); + ` + + _, err := sqliteDB.Handle.Exec(queryJobs) + if err != nil { + return fmt.Errorf("error creating tables: %s", err) + } + return nil +} + +// Add job to the database. Will return error if job exist. +func (sqliteDB *SQLiteDB) addJob(jid, status, mode, host, processID, submitter string, updated time.Time) error { + query := `INSERT INTO jobs (id, status, updated, mode, host, process_id, submitter) VALUES (?, ?, ?, ?, ?, ?, ?)` + + _, err := sqliteDB.Handle.Exec(query, jid, status, updated, mode, host, processID, submitter) + if err != nil { + return err + } + return nil +} + +// Update status and time of a job. +func (sqliteDB *SQLiteDB) updateJobRecord(jid, status string, now time.Time) error { + query := `UPDATE jobs SET status = ?, updated = ? WHERE id = ?` + _, err := sqliteDB.Handle.Exec(query, status, now, jid) + if err != nil { + return err + } + return nil +} + +// Get Job Record from database given a job id. +// If job do not exists, or error encountered bool would be false. +// Similar behavior as key exist in hashmap. +func (sqliteDB *SQLiteDB) GetJob(jid string) (JobRecord, bool, error) { + query := `SELECT * FROM jobs WHERE id = ?` + + jr := JobRecord{} + + row := sqliteDB.Handle.QueryRow(query, jid) + err := row.Scan(&jr.JobID, &jr.Status, &jr.LastUpdate, &jr.Mode, &jr.Host, &jr.ProcessID, &jr.Submitter) + if err != nil { + if err == sql.ErrNoRows { + return JobRecord{}, false, nil + } else { + log.Error(err) + return JobRecord{}, false, err + } + } + return jr, true, nil +} + +// Check if a job exists in database. +func (sqliteDB *SQLiteDB) CheckJobExist(jid string) (bool, error) { + query := `SELECT id FROM jobs WHERE id = ?` + + js := JobRecord{} + + row := sqliteDB.Handle.QueryRow(query, jid) + err := row.Scan(&js.JobID) + if err != nil { + if err == sql.ErrNoRows { + return false, nil + } else { + return false, err + } + } + return true, nil +} + +// Assumes query parameters are valid +func (sqliteDB *SQLiteDB) GetJobs(limit, offset int, processIDs, statuses []string, submitters []string) ([]JobRecord, error) { + baseQuery := `SELECT id, status, updated, process_id, submitter FROM jobs` + whereClauses := []string{} + args := []interface{}{} + + if len(processIDs) > 0 { + placeholders := strings.Repeat("?,", len(processIDs)-1) + "?" + whereClauses = append(whereClauses, fmt.Sprintf("process_id IN (%s)", placeholders)) + for _, pid := range processIDs { + args = append(args, pid) + } + } + + if len(statuses) > 0 { + placeholders := strings.Repeat("?,", len(statuses)-1) + "?" + whereClauses = append(whereClauses, fmt.Sprintf("status IN (%s)", placeholders)) + for _, st := range statuses { + args = append(args, st) + } + } + + if len(submitters) > 0 { + placeholders := strings.Repeat("?,", len(submitters)-1) + "?" + whereClauses = append(whereClauses, fmt.Sprintf("submitter IN (%s)", placeholders)) + for _, sb := range submitters { + args = append(args, sb) + } + } + + if len(whereClauses) > 0 { + baseQuery += " WHERE " + strings.Join(whereClauses, " AND ") + } + + query := baseQuery + ` ORDER BY updated DESC LIMIT ? OFFSET ?` + args = append(args, limit, offset) + + res := []JobRecord{} + + rows, err := sqliteDB.Handle.Query(query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var r JobRecord + if err := rows.Scan(&r.JobID, &r.Status, &r.LastUpdate, &r.ProcessID, &r.Submitter); err != nil { + return nil, err + } + res = append(res, r) + } + + err = rows.Err() + if err != nil { + return nil, err + } + return res, nil +} + +func (sqliteDB *SQLiteDB) Close() error { + return sqliteDB.Handle.Close() +} diff --git a/api/jobs/docker_jobs.go b/api/jobs/docker_jobs.go index ebfbb57..2811d5e 100644 --- a/api/jobs/docker_jobs.go +++ b/api/jobs/docker_jobs.go @@ -38,7 +38,7 @@ type DockerJob struct { logFile *os.File Resources - DB *DB + DB Database StorageSvc *s3.S3 DoneChan chan Job } @@ -188,7 +188,7 @@ func (j *DockerJob) initLogger() error { lvl, err := log.ParseLevel(os.Getenv("LOG_LEVEL")) if err != nil { - j.logger.Warnf("Invalid LOG_LEVEL set: %s; defaulting to INFO", os.Getenv("LOG_LEVEL")) + j.logger.Warnf("Invalid LOG_LEVEL set, %s; defaulting to INFO", os.Getenv("LOG_LEVEL")) lvl = log.InfoLevel } j.logger.SetLevel(lvl) @@ -208,7 +208,7 @@ func (j *DockerJob) Create() error { j.ctxCancel = cancelFunc // At this point job is ready to be added to database - err = j.DB.addJob(j.UUID, "accepted", time.Now(), "", "local", j.ProcessName, j.Submitter) + err = j.DB.addJob(j.UUID, "accepted", "", "local", j.ProcessName, j.Submitter, time.Now()) if err != nil { j.ctxCancel() return err diff --git a/api/main.go b/api/main.go index c4b1cfd..607d926 100644 --- a/api/main.go +++ b/api/main.go @@ -57,7 +57,6 @@ func init() { flag.StringVar(&envFP, "e", "", "specify the path of the dot env file to load") flag.StringVar(&pluginsDir, "d", resolveValue("PLUGINS_DIR", "plugins"), "specify the relative path of the processes dir") flag.StringVar(&port, "p", resolveValue("API_PORT", "5050"), "specify the port to run the api on") - flag.StringVar(&dbPath, "db", resolveValue("DB_PATH", "/.data/db.sqlite"), "specify the path of the sqlite database") flag.StringVar(&logFile, "lf", resolveValue("LOG_FILE", "/.data/logs/api.jsonl"), "specify the log file") flag.StringVar(&authSvc, "au", resolveValue("AUTH_SERVICE", ""), "specify the auth service") flag.StringVar(&authLvl, "al", resolveValue("AUTH_LEVEL", "0"), "specify the authorization striction level") @@ -106,7 +105,7 @@ func initLogger() (log.Level, *lumberjack.Logger) { lvl, err := log.ParseLevel(os.Getenv("LOG_LEVEL")) if err != nil { - log.Warnf("Invalid LOG_LEVEL set: %s; defaulting to INFO", os.Getenv("LOG_LEVEL")) + log.Warnf("Invalid LOG_LEVEL set: %s, defaulting to INFO", os.Getenv("LOG_LEVEL")) lvl = log.InfoLevel } log.SetLevel(lvl) @@ -180,7 +179,7 @@ func main() { fmt.Println("Logging to", logFile) // Initialize resources - rh := handlers.NewRESTHander(pluginsDir, dbPath) + rh := handlers.NewRESTHander(pluginsDir) // todo: handle this error: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running // todo: all non terminated job statuses should be updated to unknown // todo: all logs in the logs directory should be moved to storage @@ -270,7 +269,7 @@ func main() { // aws batch jobs close() methods take minimum of 5 seconds time.Sleep(5 * time.Second) - if err := rh.DB.Handle.Close(); err != nil { + if err := rh.DB.Close(); err != nil { log.Error(err) } else { log.Info("closed connection to database") diff --git a/docker-compose.yml b/docker-compose.yml index 987eb47..242e585 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,6 +31,18 @@ services: networks: - process_api_net + postgres: + container_name: postgres + image: postgres:16.1-alpine3.18 + env_file: + - .env + ports: + - '5432:5432' + volumes: + - ./.data/postgres:/var/lib/postgresql/data + networks: + - process_api_net + networks: process_api_net: external: true diff --git a/example.env b/example.env index 875c302..9a6e62e 100644 --- a/example.env +++ b/example.env @@ -11,8 +11,10 @@ LOG_LEVEL='INFO' # Log verbosity level (Optional). LOG_FILE='/.data/logs/api.jsonl' # Location for the main API logs (Optional). TMP_JOB_LOGS_DIR='/.data/tmp/job_logs' # Directory for temporary job logs. -# --- Data & Persistence -DB_PATH='/.data/db.sqlite' # Path to the database file (Optional). +# --- Database Configuration +DB_SERVICE='sqlite' # Options: ['sqlite', 'postgres'] + +# Policies EXPIRY_DAYS='7' # Duration after which certain data might expire. # --- Storage Configuration @@ -24,13 +26,23 @@ STORAGE_LOGS_PREFIX='logs' # --- Miscellaneous AUTH_SERVICE='' # Options: ['', 'keycloak'] (Optional) -AUTH_LEVEL='0' # Options: [0, 1, 2] (Optional) +AUTH_LEVEL='0' # Options: [0, 1, 2] (Optional) PLUGINS_DIR='plugins' # Directory for plugins (Optional). # ============================================== # Providers Settings # ============================================== +# --- SQLITE +SQLITE_DB_PATH='/.data/db.sqlite' # Path to the database file for sqlite or Connection string for postgres. + +# --- PostgreSQL +POSTGRES_CONN_STRING='postgres://user:password@postgres:5432/db?sslmode=disable' +POSTGRES_PASSWORD=password +POSTGRES_USER=user +POSTGRES_DB=db +PG_LOG_CHECKPOINTS='off' + # --- AWS (Used for both S3 and Batch) AWS_ACCESS_KEY_ID=user AWS_SECRET_ACCESS_KEY=password From 9867afb819618ca3c782a9c92a6d55cd63dce858 Mon Sep 17 00:00:00 2001 From: Abdul Raheem Siddiqui Date: Wed, 22 Nov 2023 15:08:45 -0500 Subject: [PATCH 2/6] Add postgres service to docker-compose.prod --- docker-compose.prod.yml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index ecf2b6d..17f065c 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -32,6 +32,18 @@ services: networks: - process_api_net + postgres: + container_name: postgres + image: postgres:16.1-alpine3.18 + env_file: + - .env + ports: + - '5432:5432' + volumes: + - ./.data/postgres:/var/lib/postgresql/data + networks: + - process_api_net + networks: process_api_net: external: true From 1257317611aee228dd9462243162e1e1a6b74d5c Mon Sep 17 00:00:00 2001 From: Abdul Raheem Siddiqui Date: Wed, 22 Nov 2023 15:19:35 -0500 Subject: [PATCH 3/6] Add depends on postgres to api container --- docker-compose.prod.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index 17f065c..7e48f4f 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -17,6 +17,7 @@ services: - process_api_net depends_on: - minio + - postgres minio: container_name: minio From fe4432feff383fb451d3b3da790284f986cfae52 Mon Sep 17 00:00:00 2001 From: Abdul Raheem Siddiqui Date: Wed, 22 Nov 2023 15:33:02 -0500 Subject: [PATCH 4/6] Update workflow for postgres --- .github/workflows/e2e-tests.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index e5d72c8..52d9e11 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -36,9 +36,9 @@ jobs: echo DB_SERVICE='postgres' >> .env echo POSTGRES_CONN_STRING='postgres://user:password@postgres:5432/db?sslmode=disable' >> .env - echo POSTGRES_PASSWORD=password >> .env - echo POSTGRES_USER=user - echo POSTGRES_DB=db >> .env + echo POSTGRES_PASSWORD='password' >> .env + echo POSTGRES_USER='user' + echo POSTGRES_DB='db' >> .env echo PG_LOG_CHECKPOINTS='off' >> .env echo AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} >> .env From 938f04b5b4fdd86cdf5191695070a0c5f7281b2f Mon Sep 17 00:00:00 2001 From: Abdul Raheem Siddiqui Date: Wed, 22 Nov 2023 16:24:33 -0500 Subject: [PATCH 5/6] Fix github workflow --- .github/workflows/e2e-tests.yml | 6 ++++-- api/jobs/database_postgres.go | 5 ++++- api/jobs/database_sqlite.go | 5 ++++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index 52d9e11..ec3df0b 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -37,7 +37,7 @@ jobs: echo DB_SERVICE='postgres' >> .env echo POSTGRES_CONN_STRING='postgres://user:password@postgres:5432/db?sslmode=disable' >> .env echo POSTGRES_PASSWORD='password' >> .env - echo POSTGRES_USER='user' + echo POSTGRES_USER='user' >> .env echo POSTGRES_DB='db' >> .env echo PG_LOG_CHECKPOINTS='off' >> .env @@ -98,6 +98,8 @@ jobs: ## Uncomment to print logs for debugging # - name: Display docker-compose logs - # run: docker-compose logs + # run: | + # docker-compose logs + # cat .data/api/logs/api.jsonl # if: always() diff --git a/api/jobs/database_postgres.go b/api/jobs/database_postgres.go index bc12ab9..9fd4c2f 100644 --- a/api/jobs/database_postgres.go +++ b/api/jobs/database_postgres.go @@ -27,7 +27,10 @@ func NewPostgresDB(dbConnString string) (*PostgresDB, error) { } db := PostgresDB{Handle: h} - db.createTables() + err = db.createTables() + if err != nil { + return nil, err + } return &db, nil } diff --git a/api/jobs/database_sqlite.go b/api/jobs/database_sqlite.go index a03fb76..3cf9db8 100644 --- a/api/jobs/database_sqlite.go +++ b/api/jobs/database_sqlite.go @@ -40,7 +40,10 @@ func NewSQLiteDB(dbPath string) (*SQLiteDB, error) { } db := SQLiteDB{Handle: h} - db.createTables() + err = db.createTables() + if err != nil { + return nil, err + } return &db, nil } From f266b36c7682c0b2618912dd50fda674576017bc Mon Sep 17 00:00:00 2001 From: Abdul Raheem Siddiqui Date: Wed, 22 Nov 2023 19:04:22 -0500 Subject: [PATCH 6/6] Add healthcheck to postgres container --- .github/workflows/e2e-tests.yml | 6 +++--- api/handlers/config.go | 5 ----- docker-compose.prod.yml | 11 +++++++++-- docker-compose.yml | 10 ++++++++++ 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index ec3df0b..074351b 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -96,10 +96,10 @@ jobs: docker run --network="host" -v /home/runner/work/process-api/process-api/tests/e2e:/etc/newman/ postman/newman:5.3.1-alpine run tests.postman_collection.json --env-var "url=localhost:80" --reporters cli --bail --color on - ## Uncomment to print logs for debugging + # # Uncomment to print logs for debugging # - name: Display docker-compose logs # run: | - # docker-compose logs - # cat .data/api/logs/api.jsonl + # docker-compose logs + # cat .data/api/logs/api.jsonl # if: always() diff --git a/api/handlers/config.go b/api/handlers/config.go index 8d636b9..656d0a8 100644 --- a/api/handlers/config.go +++ b/api/handlers/config.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "io" - "net/http" "os" "strings" "text/template" @@ -139,10 +138,6 @@ func NewRESTHander(pluginsDir string) *RESTHandler { return &config } -func (rh *RESTHandler) Ping(c echo.Context) error { - return c.JSON(http.StatusOK, "ok") -} - // This routine sequentially updates status. // So that order of status updates received is preserved. func (rh *RESTHandler) StatusUpdateRoutine() { diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index 7e48f4f..2fc1d74 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -16,8 +16,10 @@ services: networks: - process_api_net depends_on: - - minio - - postgres + minio: + condition: service_started + postgres: + condition: service_healthy minio: container_name: minio @@ -36,6 +38,11 @@ services: postgres: container_name: postgres image: postgres:16.1-alpine3.18 + healthcheck: + test: ['CMD-SHELL', 'pg_isready -U $$POSTGRES_USER -d $$POSTGRES_DB'] + interval: 10s + timeout: 5s + retries: 5 env_file: - .env ports: diff --git a/docker-compose.yml b/docker-compose.yml index 242e585..1f95866 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,6 +16,11 @@ services: - /var/run/docker.sock:/var/run/docker.sock networks: - process_api_net + depends_on: + minio: + condition: service_started + postgres: + condition: service_healthy minio: container_name: minio @@ -38,6 +43,11 @@ services: - .env ports: - '5432:5432' + healthcheck: + test: ['CMD-SHELL', 'pg_isready -U $$POSTGRES_USER -d $$POSTGRES_DB'] + interval: 10s + timeout: 5s + retries: 5 volumes: - ./.data/postgres:/var/lib/postgresql/data networks: