Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make database interface with sqlite and pg implementations #74

Merged
merged 6 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions .github/workflows/e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ jobs:
echo MINIO_ROOT_USER=user >> .env
echo MINIO_ROOT_PASSWORD=password >> .env

echo DB_SERVICE='postgres' >> .env
echo POSTGRES_CONN_STRING='postgres://user:password@postgres:5432/db?sslmode=disable' >> .env
echo POSTGRES_PASSWORD='password' >> .env
echo POSTGRES_USER='user' >> .env
echo POSTGRES_DB='db' >> .env
echo PG_LOG_CHECKPOINTS='off' >> .env

echo AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} >> .env
echo AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} >> .env
echo AWS_REGION=${{ secrets.AWS_DEFAULT_REGION }} >> .env
Expand Down Expand Up @@ -89,8 +96,10 @@ jobs:
docker run --network="host" -v /home/runner/work/process-api/process-api/tests/e2e:/etc/newman/ postman/newman:5.3.1-alpine run tests.postman_collection.json
--env-var "url=localhost:80" --reporters cli --bail --color on

## Uncomment to print logs for debugging
# # Uncomment to print logs for debugging
# - name: Display docker-compose logs
# run: docker-compose logs
# run: |
# docker-compose logs
# cat .data/api/logs/api.jsonl
# if: always()

1 change: 1 addition & 0 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/joho/godotenv v1.5.1
github.com/labstack/echo/v4 v4.10.2
github.com/labstack/gommon v0.4.0
github.com/lib/pq v1.10.9
github.com/sirupsen/logrus v1.7.0
github.com/swaggo/echo-swagger v1.3.5
github.com/swaggo/swag v1.8.1
Expand Down
2 changes: 2 additions & 0 deletions api/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ github.com/labstack/echo/v4 v4.10.2/go.mod h1:OEyqf2//K1DFdE57vw2DRgWY0M7s65IVQO
github.com/labstack/gommon v0.3.1/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM=
github.com/labstack/gommon v0.4.0 h1:y7cvthEAEbU0yHOf4axH8ZG2NH8knB9iNSoTO8dyIk8=
github.com/labstack/gommon v0.4.0/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
Expand Down
23 changes: 13 additions & 10 deletions api/handlers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"io"
"net/http"
"os"
"strings"
"text/template"
Expand Down Expand Up @@ -38,7 +37,7 @@ type RESTHandler struct {
ConformsTo []string
T Template
StorageSvc *s3.S3
DB *jobs.DB
DB jobs.Database
MessageQueue *jobs.MessageQueue
ActiveJobs *jobs.ActiveJobs
ProcessList *pr.ProcessList
Expand All @@ -55,7 +54,7 @@ func prettyPrint(v interface{}) string {

// Initializes resources and return a new handler
// errors are fatal
func NewRESTHander(pluginsDir string, dbPath string) *RESTHandler {
func NewRESTHander(pluginsDir string) *RESTHandler {
apiName, exist := os.LookupEnv("API_NAME")
if !exist {
log.Warn("env variable API_NAME not set")
Expand All @@ -77,6 +76,17 @@ func NewRESTHander(pluginsDir string, dbPath string) *RESTHandler {
},
}

dbType, exist := os.LookupEnv("DB_SERVICE")
if !exist {
log.Fatal("env variable DB_SERVICE not set")
}

db, err := jobs.NewDatabase(dbType)
if err != nil {
log.Fatalf(err.Error())
}
config.DB = db

// Read all the html templates
funcMap := template.FuncMap{
"prettyPrint": prettyPrint, // to pretty print JSONs for results and metadata
Expand Down Expand Up @@ -114,9 +124,6 @@ func NewRESTHander(pluginsDir string, dbPath string) *RESTHandler {
ac.Jobs = make(map[string]*jobs.Job)
config.ActiveJobs = &ac

adb := jobs.InitDB(dbPath)
config.DB = adb

config.MessageQueue = &jobs.MessageQueue{
StatusChan: make(chan jobs.StatusMessage, 500),
JobDone: make(chan jobs.Job, 1),
Expand All @@ -131,10 +138,6 @@ func NewRESTHander(pluginsDir string, dbPath string) *RESTHandler {
return &config
}

func (rh *RESTHandler) Ping(c echo.Context) error {
return c.JSON(http.StatusOK, "ok")
}

// This routine sequentially updates status.
// So that order of status updates received is preserved.
func (rh *RESTHandler) StatusUpdateRoutine() {
Expand Down
69 changes: 51 additions & 18 deletions api/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,13 @@ func (rh *RESTHandler) JobDismissHandler(c echo.Context) error {
// @Produce json
// @Success 200 {object} jobResponse
// @Router /jobs/{jobID} [get]
func (rh *RESTHandler) JobStatusHandler(c echo.Context) error {
err := validateFormat(c)
func (rh *RESTHandler) JobStatusHandler(c echo.Context) (err error) {
err = validateFormat(c)
if err != nil {
return err
}

var jRcrd jobs.JobRecord
jobID := c.Param("jobID")
if job, ok := rh.ActiveJobs.Jobs[jobID]; ok {
resp := jobResponse{
Expand All @@ -421,7 +422,7 @@ func (rh *RESTHandler) JobStatusHandler(c echo.Context) error {
Status: (*job).CurrentStatus(),
}
return prepareResponse(c, http.StatusOK, "jobStatus", resp)
} else if jRcrd, ok := rh.DB.GetJob(jobID); ok {
} else if jRcrd, ok, err = rh.DB.GetJob(jobID); ok {
resp := jobResponse{
ProcessID: jRcrd.ProcessID,
JobID: jRcrd.JobID,
Expand All @@ -430,6 +431,11 @@ func (rh *RESTHandler) JobStatusHandler(c echo.Context) error {
}
return prepareResponse(c, http.StatusOK, "jobStatus", resp)
}

if err != nil {
output := errResponse{HTTPStatus: http.StatusInternalServerError, Message: err.Error()}
return prepareResponse(c, http.StatusInternalServerError, "error", output)
}
output := errResponse{HTTPStatus: http.StatusNotFound, Message: fmt.Sprintf("%s job id not found", jobID)}
return prepareResponse(c, http.StatusNotFound, "error", output)
}
Expand All @@ -443,18 +449,19 @@ func (rh *RESTHandler) JobStatusHandler(c echo.Context) error {
// @Success 200 {object} map[string]interface{}
// @Router /jobs/{jobID}/results [get]
// Does not produce HTML
func (rh *RESTHandler) JobResultsHandler(c echo.Context) error {
err := validateFormat(c)
func (rh *RESTHandler) JobResultsHandler(c echo.Context) (err error) {
err = validateFormat(c)
if err != nil {
return err
}

var jRcrd jobs.JobRecord
jobID := c.Param("jobID")
if job, ok := rh.ActiveJobs.Jobs[jobID]; ok { // ActiveJobs hit
output := errResponse{HTTPStatus: http.StatusNotFound, Message: fmt.Sprintf("results not ready, job %s", (*job).CurrentStatus())}
return prepareResponse(c, http.StatusNotFound, "error", output)

} else if jRcrd, ok := rh.DB.GetJob(jobID); ok { // db hit
} else if jRcrd, ok, err = rh.DB.GetJob(jobID); ok { // db hit

switch jRcrd.Status {
case jobs.SUCCESSFUL:
Expand All @@ -480,6 +487,12 @@ func (rh *RESTHandler) JobResultsHandler(c echo.Context) error {
}

}

if err != nil {
output := errResponse{HTTPStatus: http.StatusInternalServerError, Message: err.Error()}
return prepareResponse(c, http.StatusInternalServerError, "error", output)
}

// miss
output := errResponse{HTTPStatus: http.StatusNotFound, Message: fmt.Sprintf("%s job id not found", jobID)}
return prepareResponse(c, http.StatusNotFound, "error", output)
Expand All @@ -494,18 +507,20 @@ func (rh *RESTHandler) JobResultsHandler(c echo.Context) error {
// @Success 200 {object} map[string]interface{}
// @Router /jobs/{jobID}/results [get]
// Does not produce HTML
func (rh *RESTHandler) JobMetaDataHandler(c echo.Context) error {
err := validateFormat(c)
func (rh *RESTHandler) JobMetaDataHandler(c echo.Context) (err error) {
err = validateFormat(c)
if err != nil {
return err
}

var jRcrd jobs.JobRecord

jobID := c.Param("jobID")
if job, ok := rh.ActiveJobs.Jobs[jobID]; ok { // ActiveJobs hit
output := errResponse{HTTPStatus: http.StatusNotFound, Message: fmt.Sprintf("metadata not ready, job %s", (*job).CurrentStatus())}
return prepareResponse(c, http.StatusNotFound, "error", output)

} else if jRcrd, ok := rh.DB.GetJob(jobID); ok { // db hit
} else if jRcrd, ok, err = rh.DB.GetJob(jobID); ok { // db hit
switch jRcrd.Status {
case jobs.SUCCESSFUL:
md, err := jobs.FetchMeta(rh.StorageSvc, jobID)
Expand All @@ -529,6 +544,12 @@ func (rh *RESTHandler) JobMetaDataHandler(c echo.Context) error {
}

}

if err != nil {
output := errResponse{HTTPStatus: http.StatusInternalServerError, Message: err.Error()}
return prepareResponse(c, http.StatusInternalServerError, "error", output)
}

// miss
output := errResponse{HTTPStatus: http.StatusNotFound, Message: fmt.Sprintf("%s job id not found", jobID)}
return prepareResponse(c, http.StatusNotFound, "error", output)
Expand All @@ -542,15 +563,16 @@ func (rh *RESTHandler) JobMetaDataHandler(c echo.Context) error {
// @Param jobID path string true "example: 44d9ca0e-2ca7-4013-907f-a8ccc60da3b4"
// @Success 200 {object} jobs.JobLogs
// @Router /jobs/{jobID}/logs [get]
func (rh *RESTHandler) JobLogsHandler(c echo.Context) error {
func (rh *RESTHandler) JobLogsHandler(c echo.Context) (err error) {
jobID := c.Param("jobID")

err := validateFormat(c)
err = validateFormat(c)
if err != nil {
return err
}

var pid string
var jRcrd jobs.JobRecord

if job, ok := rh.ActiveJobs.Jobs[jobID]; ok { // ActiveJobs hit
err = (*job).UpdateContainerLogs()
Expand All @@ -559,13 +581,18 @@ func (rh *RESTHandler) JobLogsHandler(c echo.Context) error {
return prepareResponse(c, http.StatusInternalServerError, "error", output)
}
pid = (*job).ProcessID()
} else if jRcrd, ok := rh.DB.GetJob(jobID); ok { // db hit
} else if jRcrd, ok, err = rh.DB.GetJob(jobID); ok { // db hit
pid = jRcrd.ProcessID
} else { // miss
output := errResponse{HTTPStatus: http.StatusNotFound, Message: "jobID not found"}
return prepareResponse(c, http.StatusNotFound, "error", output)
}

if err != nil {
output := errResponse{HTTPStatus: http.StatusInternalServerError, Message: err.Error()}
return prepareResponse(c, http.StatusInternalServerError, "error", output)
}

logs, err := jobs.FetchLogs(rh.StorageSvc, jobID, pid, false)
if err != nil {
output := errResponse{HTTPStatus: http.StatusInternalServerError, Message: "error while fetching logs: " + err.Error()}
Expand Down Expand Up @@ -709,13 +736,19 @@ func (rh *RESTHandler) JobStatusUpdateHandler(c echo.Context) error {
(*sm.Job).LogMessage(fmt.Sprintf("Status update received: %s.", sm.Status), logrus.InfoLevel)
rh.MessageQueue.StatusChan <- sm
return c.JSON(http.StatusAccepted, "status update received")
} else if ok := rh.DB.CheckJobExist(jobID); ok { // db hit
log.Infof("Status update received for inactive job: %s", jobID)
// returning Accepted here so that callers do not retry
return c.JSON(http.StatusAccepted, "job not an active job")
} else {
return c.JSON(http.StatusBadRequest, "job id not found")
} else if ok, err := rh.DB.CheckJobExist(jobID); ok || err != nil { // db hit or error
if ok {
log.Infof("Status update received for inactive job: %s", jobID)
// returning Accepted here so that callers do not retry
return c.JSON(http.StatusAccepted, "job not an active job")
}

if err != nil {
output := errResponse{HTTPStatus: http.StatusInternalServerError, Message: err.Error()}
return prepareResponse(c, http.StatusInternalServerError, "error", output)
}
}
return c.JSON(http.StatusBadRequest, "job id not found")
}

// func (rh *RESTHandler) JobResultsUpdateHandler(c echo.Context) error {
Expand Down
6 changes: 3 additions & 3 deletions api/jobs/aws_batch_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type AWSBatchJob struct {
cloudWatchForwardToken string
// MetaData

DB *DB
DB Database
StorageSvc *s3.S3
DoneChan chan Job
}
Expand Down Expand Up @@ -202,7 +202,7 @@ func (j *AWSBatchJob) initLogger() error {

lvl, err := log.ParseLevel(os.Getenv("LOG_LEVEL"))
if err != nil {
j.logger.Warnf("Invalid LOG_LEVEL set: %s; defaulting to INFO", os.Getenv("LOG_LEVEL"))
j.logger.Warnf("Invalid LOG_LEVEL set: %s, defaulting to INFO", os.Getenv("LOG_LEVEL"))
lvl = log.InfoLevel
}
j.logger.SetLevel(lvl)
Expand Down Expand Up @@ -239,7 +239,7 @@ func (j *AWSBatchJob) Create() error {
j.batchContext = batchContext

// At this point job is ready to be added to database
err = j.DB.addJob(j.UUID, "accepted", time.Now(), "", "aws-batch", j.ProcessName, j.Submitter)
err = j.DB.addJob(j.UUID, "accepted", "", "aws-batch", j.ProcessName, j.Submitter, time.Now())
if err != nil {
j.ctxCancel()
return err
Expand Down
Loading