diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index 1741912..6385bd1 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -23,7 +23,6 @@ jobs: echo S3_BUCKET=${{ secrets.S3_BUCKET }} >> .env echo S3_RESULTS_DIR='tests/results' >> .env echo S3_META_DIR='tests/metadata' >> .env - echo S3_LOGS_DIR='tests/logs' >> .env echo BATCH_LOG_STREAM_GROUP='/aws/batch/job' >> .env echo CPL_VSIL_USE_TEMP_FILE_FOR_RANDOM_WRITE='YES' >> .env echo EXPIRY_DAYS='1' >> .env diff --git a/README.md b/README.md index 621dac4..ed2c7c8 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,6 @@ https://developer.ogc.org/api/processes/index.html 3. Update swagger documents and compile the server: `swag init && go build main.go`. 4. Run the server: `./main`, with the following available flags: ``` - `-c [type int] specify the path of the max cache size for storing jobs (default 11073741824 (1GB))` - `-o [type bool] specify if cache should be overwritten` `-d [type string] specify the path of the processes directory to load (default "plugins" assuming program called from inside repo)` `-e [type string] specify the path of the dot env file to load (default ".env")` `-p [type string] specify the port to run the api on (default "5050")` @@ -68,8 +66,6 @@ When a job is submitted, a local container is fired up immediately for sync jobs The API responds to all GET requests (except `/jobs//results`) as HTML or JSON depending upon if the request is being originated from Browser or not or if it specifies the format using query parameter ‘f’. -The app maintains a cache of all jobs submitted, a snapshot of this cache is saved to disk at `./.data/snapshot.gob` every 60 minutes and at the graceful shutdown of the server. If during the restart the snapshot is found at the above location, the cache is repopulated with the snapshot data. - ## Example .env file For AWS services, an env file should be located at the root of this repository (`./.env`) and be formatted like so: @@ -83,7 +79,7 @@ AWS_DEFAULT_REGION='us-east-1' # S3 S3_BUCKET='********' S3_RESULTS_DIR='results' -S3_LOGS_DIR='logs' +S3_META_DIR='metadata' # BATCH BATCH_LOG_STREAM_GROUP='/aws/batch/job' diff --git a/design.svg b/design.svg index 68707a6..67f4d2d 100644 --- a/design.svg +++ b/design.svg @@ -1,9 +1,9 @@ - @@ -12,15 +12,15 @@ - - - - - @@ -28,93 +28,93 @@ requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;">
+ style="display: flex; align-items: unsafe flex-end; justify-content: unsafe center; width: 588px; height: 1px; padding-top: 239px; margin-left: 81px;">
-      API
+ style="display: inline-block; font-size: 18px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; white-space: normal; overflow-wrap: normal;"> +      API
-      API - - - - - - - - - - - + - + + +
+ style="display: flex; align-items: unsafe flex-end; justify-content: unsafe center; width: 188px; height: 1px; padding-top: 260px; margin-left: 460px;">
- Controllers
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; white-space: normal; overflow-wrap: normal;"> + Controllers
-
Controllers
- - +
+ style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 138px; height: 1px; padding-top: 442px; margin-left: -33px;">
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; white-space: normal; overflow-wrap: normal;"> read processes' config files at startup
-
read processes' config... + read processes' con...
- - - @@ -122,16 +122,17 @@ requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;">
+ style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 105px; height: 1px; padding-top: 327px; margin-left: 310px;">
- update job status
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> + update job status +
- update job status + update job sta...
@@ -140,45 +141,42 @@ requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;">
+ style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 178px; height: 1px; padding-top: 671px; margin-left: 540px;">
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> write results
- write results
-
+ style="display: flex; align-items: unsafe flex-start; justify-content: unsafe center; width: 118px; height: 1px; padding-top: 284px; margin-left: 75px;">
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> Processes
Registry

+ style="color: rgb(0 , 0 , 0) ; font-family: "helvetica" ; font-size: 15px ; font-style: normal ; font-weight: 400 ; letter-spacing: normal ; text-indent: 0px ; text-transform: none ; word-spacing: 0px ; background-color: rgb(248 , 249 , 250) ; display: inline ; float: none">Processes
Registry

-
Processes...
- - @@ -186,75 +184,37 @@ requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;">
+ style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 58px; height: 1px; padding-top: 108px; margin-left: 16px;">
- Disk

-
-
-
- Disk... -
-
- - - -
-
-
- regular snapshots
-
-
-
regular sna... -
-
- - - -
-
-
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; font-weight: bold; white-space: normal; overflow-wrap: normal;"> Server
-
Server
- - - + + +
+ style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 606px; margin-left: 771px;">
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> write results
-
write results
@@ -264,27 +224,27 @@ requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;">
+ style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 58px; height: 1px; padding-top: 106px; margin-left: 702px;">
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; font-weight: bold; white-space: normal; overflow-wrap: normal;"> Cloud
- Cloud
- - - + - - @@ -292,20 +252,20 @@ requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;">
+ style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 68px; height: 1px; padding-top: 337px; margin-left: 475px;">
- Docker + style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> + Docker
- Docker
- @@ -313,16 +273,16 @@ requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;">
+ style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 73px; height: 1px; padding-top: 337px; margin-left: 560px;">
- AWS Batch + style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> + AWS Batch
- AWS Batch
@@ -332,17 +292,17 @@ requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;">
+ style="display: flex; align-items: unsafe flex-start; justify-content: unsafe center; width: 63px; height: 1px; padding-top: 18px; margin-left: 251px;">
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> ADMIN
-
+ style="color: rgb(0 , 0 , 0) ; font-family: "helvetica" ; font-size: 15px ; font-style: normal ; letter-spacing: normal ; text-indent: 0px ; text-transform: none ; word-spacing: 0px ; background-color: rgb(248 , 249 , 250) ; display: inline ; float: none">ADMIN
- ADMIN
@@ -352,17 +312,17 @@ requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;">
+ style="display: flex; align-items: unsafe flex-start; justify-content: unsafe center; width: 63px; height: 1px; padding-top: 17px; margin-left: 344px;">
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> DEV
-
+ style="color: rgb(0 , 0 , 0) ; font-family: "helvetica" ; font-size: 15px ; font-style: normal ; letter-spacing: normal ; text-indent: 0px ; text-transform: none ; word-spacing: 0px ; background-color: rgb(248 , 249 , 250) ; display: inline ; float: none">DEV
- DEV @@ -372,27 +332,27 @@ requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;">
+ style="display: flex; align-items: unsafe flex-start; justify-content: unsafe center; width: 63px; height: 1px; padding-top: 17px; margin-left: 439px;">
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> MOD
-
+ style="color: rgb(0 , 0 , 0) ; font-family: "helvetica" ; font-size: 15px ; font-style: normal ; letter-spacing: normal ; text-indent: 0px ; text-transform: none ; word-spacing: 0px ; background-color: rgb(248 , 249 , 250) ; display: inline ; float: none">MOD
- MOD - - + - @@ -400,69 +360,57 @@ requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;">
+ style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 66px; height: 1px; padding-top: 723px; margin-left: 774px;">
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> AWS S3
- AWS S3
- - - - - + - - - - - - - +
+ style="display: flex; align-items: unsafe flex-end; justify-content: unsafe center; width: 158px; height: 1px; padding-top: 167px; margin-left: 718px;">
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; font-weight: bold; white-space: normal; overflow-wrap: normal;"> Container Registries
-
Container Registries
- + - + - @@ -470,57 +418,57 @@ requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;">
+ style="display: flex; align-items: unsafe flex-end; justify-content: unsafe center; width: 63px; height: 1px; padding-top: 586px; margin-left: 478px;">
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; font-weight: bold; white-space: normal; overflow-wrap: normal;"> Active Jobs
- Active Jobs + Active Jo...
- - - - - - - - - - - + @@ -528,25 +476,25 @@ requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;">
+ style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 38px; height: 1px; padding-top: 364px; margin-left: 788px;">
- AWS
Batch
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> + AWS
Batch
- AWS...
- - @@ -554,186 +502,202 @@ requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;">
+ style="display: flex; align-items: unsafe flex-end; justify-content: unsafe center; width: 63px; height: 1px; padding-top: 482px; margin-left: 775px;">
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; font-weight: bold; white-space: normal; overflow-wrap: normal;"> Active Jobs
- Active Jobs + Active Jo...
- - - - - - - - - - + + - - +
+ style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 187px; height: 1px; padding-top: 266px; margin-left: 462px;">
- sync
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> + launch / kill / monitor jobs
-
sync + launch / kill / monitor j...
- +
+ style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 77px; height: 1px; padding-top: 264px; margin-left: 742px;">
- launch / kill / monitor jobs
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> + fetch
images
-
launch / kill / monitor jobs + fetch...
- +
+ style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 178px; height: 1px; padding-top: 624px; margin-left: 59px;">
- Jobs
Cache

+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> + read results
-
Jobs... + read results
- - - - +
+ style="display: flex; align-items: unsafe flex-start; justify-content: unsafe center; width: 63px; height: 1px; padding-top: 578px; margin-left: 23px;">
- fetch
images
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> + Plugins
Directory
-
fetch... + Plugins...
- + + +
+ style="display: flex; align-items: unsafe flex-start; justify-content: unsafe center; width: 42px; height: 1px; padding-top: 285px; margin-left: 244px;">
- read results
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> + Active + Jobs
-
read results + Active...
+ + + + +
+ style="display: flex; align-items: unsafe flex-start; justify-content: unsafe center; width: 42px; height: 1px; padding-top: 442px; margin-left: 249px;">
+ style="display: inline-block; font-size: 15px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: none; white-space: normal; overflow-wrap: normal;"> Plugins
Directory
-
+ style="color: rgb(0 , 0 , 0) ; font-family: "helvetica" ; font-size: 15px ; font-style: normal ; font-weight: 400 ; letter-spacing: normal ; text-indent: 0px ; text-transform: none ; word-spacing: 0px ; background-color: rgb(248 , 249 , 250) ; display: inline ; float: none">SQLite + DB
-
Plugins... + SQLite...
- - + + + +
100 || limit < 1 { + limit = 50 + } + + offset, err := strconv.Atoi(offsetStr) + if err != nil || offset < 0 { + offset = 0 + } + + // instantiate result variable without importing processes pkg + result := rh.ProcessList.InfoList[0:0] + + if offset < len(rh.ProcessList.InfoList) { + upperBound := offset + limit + if upperBound > len(rh.ProcessList.InfoList) { + upperBound = len(rh.ProcessList.InfoList) + } + result = rh.ProcessList.InfoList[offset:upperBound] + } + + // required by /req/core/process-list-success + links := make([]link, 0) + + // if offset is not 0 + if offset != 0 { + lnk := link{ + Href: fmt.Sprintf("/processes?offset=%v&limit=%v", offset-limit, limit), + Title: "prev", + } + links = append(links, lnk) + } + + // if limit is not exhausted + if limit == len(result) { + lnk := link{ + Href: fmt.Sprintf("/processes?offset=%v&limit=%v", offset+limit, limit), + Title: "next", + } + links = append(links, lnk) } - return prepareResponse(c, http.StatusOK, "processes", processList) + output := make(map[string]interface{}, 0) + output["processes"] = result + output["links"] = links + + return prepareResponse(c, http.StatusOK, "processes", output) } // ProcessDescribeHandler godoc @@ -183,7 +235,6 @@ func (rh *RESTHandler) ProcessDescribeHandler(c echo.Context) error { func (rh *RESTHandler) Execution(c echo.Context) error { processID := c.Param("processID") - log.Debug("processID", processID) if processID == "" { return c.JSON(http.StatusBadRequest, errResponse{Message: "'processID' parameter is required"}) } @@ -199,7 +250,6 @@ func (rh *RESTHandler) Execution(c echo.Context) error { return c.JSON(http.StatusBadRequest, errResponse{Message: err.Error()}) } - // Review this section if params.Inputs == nil { return c.JSON(http.StatusBadRequest, errResponse{Message: "'inputs' is required in the body of the request"}) } @@ -236,6 +286,7 @@ func (rh *RESTHandler) Execution(c echo.Context) error { EnvVars: p.Container.EnvVars, Resources: jobs.Resources(p.Container.Resources), Cmd: cmd, + DB: rh.DB, } } else { @@ -254,23 +305,26 @@ func (rh *RESTHandler) Execution(c echo.Context) error { JobName: "ogc-api-id-" + jobID, MetaDataLocation: md, ProcessVersion: p.Info.Version, + DB: rh.DB, } default: return c.JSON(http.StatusBadRequest, errResponse{Message: fmt.Sprintf("unsupported type %s", jobType)}) } } - // Add to cache - rh.JobsCache.Add(&j) - // Create job err = j.Create() if err != nil { return c.JSON(http.StatusInternalServerError, errResponse{Message: fmt.Sprintf("submission errorr %s", err.Error())}) } + // Add to active jobs + rh.ActiveJobs.Add(&j) + switch p.Info.JobControlOptions[0] { case "sync-execute": + defer rh.ActiveJobs.Remove(&j) + j.Run() if j.CurrentStatus() == "successful" { @@ -289,7 +343,10 @@ func (rh *RESTHandler) Execution(c echo.Context) error { return c.JSON(http.StatusInternalServerError, resp) } case "async-execute": - go j.Run() + go func() { + defer rh.ActiveJobs.Remove(&j) + j.Run() + }() resp := jobResponse{ProcessID: j.ProcessID(), Type: "process", JobID: jobID, Status: "accepted"} return c.JSON(http.StatusCreated, resp) default: @@ -309,7 +366,7 @@ func (rh *RESTHandler) Execution(c echo.Context) error { func (rh *RESTHandler) JobDismissHandler(c echo.Context) error { jobID := c.Param("jobID") - if j, ok := rh.JobsCache.Jobs[jobID]; ok { + if j, ok := rh.ActiveJobs.Jobs[jobID]; ok { err := (*j).Kill() if err != nil { return c.JSON(http.StatusBadRequest, errResponse{Message: err.Error()}) @@ -325,7 +382,7 @@ func (rh *RESTHandler) JobDismissHandler(c echo.Context) error { // @Info [Format YAML](http://schemas.opengis.net/ogcapi/processes/part1/1.0/openapi/schemas/statusInfo.yaml) // @Accept */* // @Produce json -// @Success 200 {object} JobStatus +// @Success 200 {object} JobRecord // @Router /jobs/{jobID} [get] func (rh *RESTHandler) JobStatusHandler(c echo.Context) error { err := validateFormat(c) @@ -334,14 +391,16 @@ func (rh *RESTHandler) JobStatusHandler(c echo.Context) error { } jobID := c.Param("jobID") - if job, ok := rh.JobsCache.Jobs[jobID]; ok { - resp := jobs.JobStatus{ + if job, ok := rh.ActiveJobs.Jobs[jobID]; ok { + resp := jobs.JobRecord{ ProcessID: (*job).ProcessID(), JobID: (*job).JobID(), LastUpdate: (*job).LastUpdate(), Status: (*job).CurrentStatus(), } return prepareResponse(c, http.StatusOK, "js", resp) + } else if jRcrd, ok := rh.DB.GetJob(jobID); ok { + return prepareResponse(c, http.StatusOK, "js", jRcrd) } output := errResponse{HTTPStatus: http.StatusNotFound, Message: fmt.Sprintf("%s job id not found", jobID)} return prepareResponse(c, http.StatusNotFound, "error", output) @@ -357,13 +416,18 @@ func (rh *RESTHandler) JobStatusHandler(c echo.Context) error { // Does not produce HTML func (rh *RESTHandler) JobResultsHandler(c echo.Context) error { jobID := c.Param("jobID") - if job, ok := rh.JobsCache.Jobs[jobID]; ok { - switch (*job).CurrentStatus() { + if job, ok := rh.ActiveJobs.Jobs[jobID]; ok { // ActiveJobs hit + output := jobResponse{Type: "process", JobID: jobID, Status: (*job).CurrentStatus(), Message: "results not ready", LastUpdate: (*job).LastUpdate()} + return c.JSON(http.StatusNotFound, output) + + } else if jRcrd, ok := rh.DB.GetJob(jobID); ok { // db hit + + switch jRcrd.Status { case jobs.SUCCESSFUL: - outputs, err := jobs.FetchResults(rh.S3Svc, (*job).JobID()) + outputs, err := jobs.FetchResults(rh.S3Svc, jRcrd.JobID) if err != nil { if err.Error() == "not found" { - output := jobResponse{Type: "process", JobID: jobID, Status: (*job).CurrentStatus(), Message: "results not available, resource might have expired."} + output := jobResponse{Type: "process", JobID: jobID, Status: jRcrd.Status, Message: "results not available, resource might have expired."} return c.JSON(http.StatusNotFound, output) } return c.JSON(http.StatusInternalServerError, err.Error()) @@ -371,41 +435,43 @@ func (rh *RESTHandler) JobResultsHandler(c echo.Context) error { output := jobResponse{ Type: "process", JobID: jobID, - Status: (*job).CurrentStatus(), - LastUpdate: (*job).LastUpdate(), + Status: jRcrd.Status, + LastUpdate: jRcrd.LastUpdate, Outputs: outputs, } return c.JSON(http.StatusOK, output) case jobs.FAILED, jobs.DISMISSED: - output := jobResponse{Type: "process", JobID: jobID, Status: (*job).CurrentStatus(), Message: "job Failed or Dismissed. Call logs route for details", LastUpdate: (*job).LastUpdate()} + output := jobResponse{Type: "process", JobID: jobID, Status: jRcrd.Status, Message: "job Failed or Dismissed. Call logs route for details", LastUpdate: jRcrd.LastUpdate} return c.JSON(http.StatusOK, output) default: - output := jobResponse{Type: "process", JobID: jobID, Status: (*job).CurrentStatus(), Message: "results not ready", LastUpdate: (*job).LastUpdate()} + output := jobResponse{Type: "process", JobID: jobID, Status: jRcrd.Status, Message: "results not ready", LastUpdate: jRcrd.LastUpdate} return c.JSON(http.StatusNotFound, output) } } + // miss output := errResponse{Message: "jobID not found"} return c.JSON(http.StatusNotFound, output) } func (rh *RESTHandler) JobMetaDataHandler(c echo.Context) error { jobID := c.Param("jobID") - if job, ok := rh.JobsCache.Jobs[jobID]; ok { - switch (*job).(type) { - case *jobs.DockerJob: - output := jobResponse{Type: "process", JobID: jobID, Status: (*job).CurrentStatus(), Message: "metadata not exist for sync jobs."} - return c.JSON(http.StatusBadRequest, output) - } - - switch (*job).CurrentStatus() { + if job, ok := rh.ActiveJobs.Jobs[jobID]; ok { // ActiveJobs hit + // for sync jobs jobid is not returned to user untill the job is no longer in activeJobs, so it means job is not async + output := jobResponse{Type: "process", JobID: jobID, Status: (*job).CurrentStatus(), Message: "metadata not ready", LastUpdate: (*job).LastUpdate()} + return c.JSON(http.StatusNotFound, output) + } else if jRcrd, ok := rh.DB.GetJob(jobID); ok { // db hit + // todo + // if jRcrd.Mode == "sync" + + switch jRcrd.Status { case jobs.SUCCESSFUL: - md, err := jobs.FetchMeta(rh.S3Svc, (*job).JobID()) + md, err := jobs.FetchMeta(rh.S3Svc, jobID) if err != nil { if err.Error() == "not found" { - output := jobResponse{Type: "process", JobID: jobID, Status: (*job).CurrentStatus(), Message: "metadata not found."} + output := jobResponse{Type: "process", JobID: jobID, Status: jRcrd.Status, Message: "metadata not found."} return c.JSON(http.StatusInternalServerError, output) } return c.JSON(http.StatusInternalServerError, err.Error()) @@ -413,15 +479,16 @@ func (rh *RESTHandler) JobMetaDataHandler(c echo.Context) error { return c.JSON(http.StatusOK, md) case jobs.FAILED, jobs.DISMISSED: - output := jobResponse{Type: "process", JobID: jobID, Status: (*job).CurrentStatus(), Message: "job Failed or Dismissed. Metadata only available for successful jobs.", LastUpdate: (*job).LastUpdate()} + output := jobResponse{Type: "process", JobID: jobID, Status: jRcrd.Status, Message: "job Failed or Dismissed. Metadata only available for successful jobs.", LastUpdate: jRcrd.LastUpdate} return c.JSON(http.StatusOK, output) default: - output := jobResponse{Type: "process", JobID: jobID, Status: (*job).CurrentStatus(), Message: "metadata not ready", LastUpdate: (*job).LastUpdate()} + output := jobResponse{Type: "process", JobID: jobID, Status: jRcrd.Status, Message: "metadata not ready", LastUpdate: jRcrd.LastUpdate} return c.JSON(http.StatusNotFound, output) } } + // miss output := errResponse{Message: "jobID not found"} return c.JSON(http.StatusNotFound, output) } @@ -441,37 +508,83 @@ func (rh *RESTHandler) JobLogsHandler(c echo.Context) error { return err } - if job, ok := rh.JobsCache.Jobs[jobID]; ok { - logs, err := (*job).Logs() - if err != nil { - if err.Error() == "not found" { - output := errResponse{HTTPStatus: http.StatusNotFound, Message: err.Error()} - return prepareResponse(c, http.StatusNotFound, "error", output) - } - output := errResponse{HTTPStatus: http.StatusNotFound, Message: "Error while fetching logs: " + err.Error()} - return prepareResponse(c, http.StatusInternalServerError, "error", output) - } + var logs jobs.JobLogs + var errLogs error - return prepareResponse(c, http.StatusOK, "logs", logs) + if job, ok := rh.ActiveJobs.Jobs[jobID]; ok { // ActiveJobs hit + logs, errLogs = (*job).Logs() + } else if ok := rh.DB.CheckJobExist(jobID); ok { // db hit + logs, errLogs = rh.DB.GetLogs(jobID) + } else { // miss + output := errResponse{HTTPStatus: http.StatusNotFound, Message: "jobID not found"} + return prepareResponse(c, http.StatusNotFound, "error", output) } - output := errResponse{HTTPStatus: http.StatusNotFound, Message: "jobID not found"} - return prepareResponse(c, http.StatusNotFound, "error", output) + if errLogs != nil { + output := errResponse{HTTPStatus: http.StatusInternalServerError, Message: "error while fetching logs: " + errLogs.Error()} + return prepareResponse(c, http.StatusInternalServerError, "error", output) + } + + logs.Prettify() + return prepareResponse(c, http.StatusOK, "logs", logs) + } -// @Summary Summary of all (cached) Jobs +// @Summary Summary of all (active) Jobs // @Description [Job List Specification](https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_retrieve_job_results) // @Tags jobs // @Accept */* // @Produce json -// @Success 200 {object} []JobStatus +// @Success 200 {object} []JobRecord // @Router /jobs [get] -func (rh *RESTHandler) JobsCacheHandler(c echo.Context) error { +func (rh *RESTHandler) ListJobsHandler(c echo.Context) error { err := validateFormat(c) if err != nil { return err } - jobsList := rh.JobsCache.ListJobs() - return prepareResponse(c, http.StatusOK, "jobs", jobsList) + limitStr := c.QueryParam("limit") + offsetStr := c.QueryParam("offset") + + limit, err := strconv.Atoi(limitStr) + if err != nil || limit > 100 || limit < 1 { + limit = 50 + } + + offset, err := strconv.Atoi(offsetStr) + if err != nil || offset < 0 { + offset = 0 + } + + result, err := rh.DB.GetJobs(limit, offset) + if err != nil { + output := errResponse{HTTPStatus: http.StatusInternalServerError, Message: err.Error()} + return prepareResponse(c, http.StatusNotFound, "error", output) + } + + // required by /req/job-list/job-list-success + links := make([]link, 0) + + // if offset is not 0 + if offset != 0 { + lnk := link{ + Href: fmt.Sprintf("/jobs?offset=%v&limit=%v", offset-limit, limit), + Title: "prev", + } + links = append(links, lnk) + } + + // if limit is not exhausted + if limit == len(result) { + lnk := link{ + Href: fmt.Sprintf("/jobs?offset=%v&limit=%v", offset+limit, limit), + Title: "next", + } + links = append(links, lnk) + } + + output := make(map[string]interface{}, 0) + output["jobs"] = result + output["links"] = links + return prepareResponse(c, http.StatusOK, "jobs", output) } diff --git a/jobs/active_jobs.go b/jobs/active_jobs.go new file mode 100644 index 0000000..d1bb201 --- /dev/null +++ b/jobs/active_jobs.go @@ -0,0 +1,40 @@ +package jobs + +import ( + "sync" +) + +// It is the resoponsibility of originator to add and remove job from ActiveJobs +type ActiveJobs struct { + Jobs map[string]*Job `json:"jobs"` + mu sync.Mutex +} + +func (ac *ActiveJobs) Add(j *Job) { + ac.mu.Lock() + defer ac.mu.Unlock() + + ac.Jobs[(*j).JobID()] = j +} + +func (ac *ActiveJobs) Remove(j *Job) { + ac.mu.Lock() + defer ac.mu.Unlock() + + delete(ac.Jobs, (*j).JobID()) +} + +// Revised to kill only currently active jobs +func (ac *ActiveJobs) KillAll() error { + ac.mu.Lock() + defer ac.mu.Unlock() + + for _, j := range ac.Jobs { + if (*j).CurrentStatus() == ACCEPTED || (*j).CurrentStatus() == RUNNING { + if err := (*j).Kill(); err != nil { + return err + } + } + } + return nil +} diff --git a/jobs/aws_batch_jobs.go b/jobs/aws_batch_jobs.go index 6829d9a..1d4c774 100644 --- a/jobs/aws_batch_jobs.go +++ b/jobs/aws_batch_jobs.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "time" - "unsafe" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -15,16 +14,17 @@ import ( // Fields are exported so that gob can access it type AWSBatchJob struct { - ctx context.Context // not exported because unsupported by gob - ctxCancel context.CancelFunc - UUID string `json:"jobID"` - AWSBatchID string - ProcessName string `json:"processID"` - Image string `json:"image"` - Cmd []string `json:"commandOverride"` - UpdateTime time.Time - Status string `json:"status"` - APILogs []string + ctx context.Context // not exported because unsupported by gob + ctxCancel context.CancelFunc + UUID string `json:"jobID"` + AWSBatchID string + ProcessName string `json:"processID"` + Image string `json:"image"` + Cmd []string `json:"commandOverride"` + UpdateTime time.Time + Status string `json:"status"` + apiLogs []string + containerLogs []string JobDef string `json:"jobDefinition"` JobQueue string `json:"jobQueue"` @@ -33,10 +33,11 @@ type AWSBatchJob struct { JobName string `json:"jobName"` EnvVars map[string]string batchContext *controllers.AWSBatchController - LogStreamName string + logStreamName string // MetaData MetaDataLocation string ProcessVersion string + DB *DB } func (j *AWSBatchJob) JobID() string { @@ -55,15 +56,20 @@ func (j *AWSBatchJob) IMAGE() string { return j.Image } -// Fetches Container logs from CloudWatch and API logs from cache +// Return current logs of the job. +// Fetches Container logs from CloudWatch. func (j *AWSBatchJob) Logs() (JobLogs, error) { var logs JobLogs - cl, err := j.FetchLogs() - if err != nil { - return JobLogs{}, err + // we are fetching logs here and not in run function because we only want to fetch logs when needed + if j.logStreamName != "" { + err := j.fetchCloudWatchLogs() + if err != nil { + return logs, fmt.Errorf("error while fetching cloud watch logs for: %s: %s", j.logStreamName, err.Error()) + } } - logs.ContainerLog = cl - logs.APILog = j.APILogs + + logs.ContainerLogs = j.containerLogs + logs.APILogs = j.apiLogs return logs, nil } @@ -72,17 +78,22 @@ func (j *AWSBatchJob) ClearOutputs() { } func (j *AWSBatchJob) Messages(includeErrors bool) []string { - return j.APILogs + return j.apiLogs } func (j *AWSBatchJob) NewMessage(m string) { - j.APILogs = append(j.APILogs, m) + j.apiLogs = append(j.apiLogs, m) } +// Append error to apiLogs, cancelCtx, update Status, and time, write logs to database func (j *AWSBatchJob) HandleError(m string) { - j.APILogs = append(j.APILogs, m) - j.NewStatusUpdate(FAILED) + j.apiLogs = append(j.apiLogs, m) j.ctxCancel() + if j.Status != DISMISSED { // if job dismissed then the error is because of dismissing job + j.NewStatusUpdate(FAILED) + j.fetchCloudWatchLogs() + go j.DB.upsertLogs(j.UUID, j.apiLogs, j.containerLogs) + } } func (j *AWSBatchJob) LastUpdate() time.Time { @@ -91,7 +102,9 @@ func (j *AWSBatchJob) LastUpdate() time.Time { func (j *AWSBatchJob) NewStatusUpdate(s string) { j.Status = s - j.UpdateTime = time.Now() + now := time.Now() + j.UpdateTime = now + j.DB.updateJobRecord(j.UUID, s, now) } func (j *AWSBatchJob) CurrentStatus() string { @@ -118,13 +131,13 @@ func (j *AWSBatchJob) Create() error { batchContext, err := controllers.NewAWSBatchController(os.Getenv("AWS_ACCESS_KEY_ID"), os.Getenv("AWS_SECRET_ACCESS_KEY"), os.Getenv("AWS_DEFAULT_REGION")) if err != nil { - j.HandleError(err.Error()) + j.ctxCancel() return err } aWSBatchID, err := batchContext.JobCreate(j.ctx, j.JobDef, j.JobName, j.JobQueue, j.Cmd, j.EnvVars) if err != nil { - j.HandleError(err.Error()) + j.ctxCancel() return err } @@ -133,7 +146,14 @@ func (j *AWSBatchJob) Create() error { // verify command in body if j.Cmd == nil { - j.HandleError(err.Error()) + j.ctxCancel() + return err + } + + // At this point job is ready to be added to database + err = j.DB.addJob(j.UUID, "accepted", time.Now(), "", "aws-batch", j.ProcessName) + if err != nil { + j.ctxCancel() return err } @@ -163,7 +183,7 @@ func (j *AWSBatchJob) Run() { } if status != oldStatus { - j.LogStreamName = logStreamName + j.logStreamName = logStreamName switch status { case "ACCEPTED": j.NewStatusUpdate(ACCEPTED) @@ -173,15 +193,18 @@ func (j *AWSBatchJob) Run() { // fetch results here // todo j.NewStatusUpdate(SUCCESSFUL) j.ctxCancel() + j.fetchCloudWatchLogs() + go j.DB.upsertLogs(j.UUID, j.apiLogs, j.containerLogs) go j.WriteMeta(c) return case "DISMISSED": j.NewStatusUpdate(DISMISSED) j.ctxCancel() + j.fetchCloudWatchLogs() + go j.DB.upsertLogs(j.UUID, j.apiLogs, j.containerLogs) return case "FAILED": - j.NewStatusUpdate(FAILED) - j.ctxCancel() + j.HandleError("Batch API returned failed status") return } } @@ -205,77 +228,58 @@ func (j *AWSBatchJob) Kill() error { _, err = c.JobKill(j.AWSBatchID) if err != nil { + j.HandleError(err.Error()) return err } j.NewStatusUpdate(DISMISSED) + // this would be redundent in most cases because the run function will also update status and add logs + // but leaving it here in case run function fails + j.fetchCloudWatchLogs() + go j.DB.upsertLogs(j.UUID, j.apiLogs, j.containerLogs) j.ctxCancel() return nil } -// Placeholder -func (j *AWSBatchJob) GetSizeinCache() int { - cmdData := int(unsafe.Sizeof(j.Cmd)) - for _, item := range j.Cmd { - cmdData += len(item) - } - - messageData := int(unsafe.Sizeof(j.APILogs)) - for _, item := range j.APILogs { - messageData += len(item) - } - - totalMemory := cmdData + messageData + - int(unsafe.Sizeof(j.ctx)) + - int(unsafe.Sizeof(j.ctxCancel)) + - int(unsafe.Sizeof(j.UUID)) + len(j.UUID) + - int(unsafe.Sizeof(j.AWSBatchID)) + len(j.AWSBatchID) + - int(unsafe.Sizeof(j.Image)) + len(j.Image) + - int(unsafe.Sizeof(j.UpdateTime)) + - int(unsafe.Sizeof(j.Status)) + - int(unsafe.Sizeof(j.LogStreamName)) + len(j.LogStreamName) + - int(unsafe.Sizeof(j.JobDef)) + len(j.JobDef) + - int(unsafe.Sizeof(j.JobQueue)) + len(j.JobQueue) + - int(unsafe.Sizeof(j.JobName)) + len(j.JobName) + - int(unsafe.Sizeof(j.EnvVars)) + len(j.EnvVars) - - return totalMemory -} - // Fetches logs from CloudWatch using the AWS Go SDK -func (j *AWSBatchJob) FetchLogs() (logs []string, err error) { +func (j *AWSBatchJob) fetchCloudWatchLogs() error { // Create a new session in the desired region sess, err := session.NewSession(&aws.Config{ Region: aws.String(os.Getenv("AWS_DEFAULT_REGION")), }) if err != nil { - return logs, fmt.Errorf("Error creating session: " + err.Error()) + return fmt.Errorf("Error creating session: " + err.Error()) } // Create a CloudWatchLogs client svc := cloudwatchlogs.New(sess) - if j.LogStreamName == "" { - return nil, fmt.Errorf("LogStreamName is empty. If you just ran your job, retry in few seconds") + if j.logStreamName == "" { + return fmt.Errorf("logStreamName is empty. If you just ran your job, retry in few seconds") } // Define the parameters for the log stream you want to read params := &cloudwatchlogs.GetLogEventsInput{ LogGroupName: aws.String(os.Getenv("BATCH_LOG_STREAM_GROUP")), - LogStreamName: aws.String(j.LogStreamName), + LogStreamName: aws.String(j.logStreamName), StartFromHead: aws.Bool(true), } // Call the GetLogEvents API to read the log events resp, err := svc.GetLogEvents(params) if err != nil { - return logs, fmt.Errorf("Error reading log events: " + err.Error()) + if err.Error() == "ResourceNotFoundException: The specified log stream does not exist." { + return nil + } else { + return err + } } // Print the log events - logs = make([]string, len(resp.Events)) + logs := make([]string, len(resp.Events)) for i, event := range resp.Events { logs[i] = *event.Message } - return logs, nil + j.containerLogs = logs + return nil } diff --git a/jobs/database.go b/jobs/database.go new file mode 100644 index 0000000..f60e1ed --- /dev/null +++ b/jobs/database.go @@ -0,0 +1,240 @@ +package jobs + +import ( + "database/sql" + "encoding/json" + "errors" + "os" + "path/filepath" + "time" + + "github.com/labstack/gommon/log" + _ "github.com/mattn/go-sqlite3" +) + +type DB struct { + Handle *sql.DB +} + +// Create tables in the database if they do not exist already +func (db *DB) createTables() { + + // SQLite does not have a built-in ENUM type or array type. + // SQLite doesn't enforce the length of the VARCHAR datatype, therefore not using something like VARCHAR(30). + // SQLite's concurrency control is based on transactions, not connections. A connection to a SQLite database does not inherently acquire a lock. + // Locks are acquired when a transaction is started and released when the transaction is committed or rolled back. + + // indices needed to speedup + // fetching jobs for a particular process id + // providing job-lists ordered by time + + queryJobs := ` + CREATE TABLE IF NOT EXISTS jobs ( + id TEXT PRIMARY KEY, + status TEXT NOT NULL, + updated TIMESTAMP NOT NULL, + mode TEXT NOT NULL, + host TEXT NOT NULL, + process_id TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_jobs_updated ON jobs(updated); + CREATE INDEX IF NOT EXISTS idx_jobs_process_id ON jobs(process_id); + ` + + _, err := db.Handle.Exec(queryJobs) + if err != nil { + log.Fatal(err) + } + + // Array of VARCHAR is represented as TEXT in SQLite. Client application has to handle conversion + + queryLogs := ` + CREATE TABLE IF NOT EXISTS logs ( + job_id TEXT PRIMARY KEY, + api_logs TEXT, + container_logs TEXT, + FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE + ); + ` + + _, err = db.Handle.Exec(queryLogs) + if err != nil { + log.Fatal(err) + } +} + +// Initialize the database. +// Creates intermediate directories if not exist. +func InitDB(dbPath string) *DB { + + // Create directory structure if it doesn't exist + dir := filepath.Dir(dbPath) + err := os.MkdirAll(dir, 0755) + if err != nil { + log.Fatalf(err.Error()) + } + + h, err := sql.Open("sqlite3", dbPath+"?mode=rwc") + // it maybe a good idea to check here if the connections has write privilage https://stackoverflow.com/a/44707371/11428410 https://www.sqlite.org/c3ref/db_readonly.html + // also maybe we should make db such that only go can write to it + + if err != nil { + log.Fatalf("could not open %s Delete the existing database to start with a new datbase. Error: %s", dbPath, err.Error()) + } + + if h == nil { + log.Fatal("db nil") + } + + db := DB{Handle: h} + db.createTables() + return &db +} + +// Add job the database. Will return error if job exist. +func (db *DB) addJob(jid string, status string, updated time.Time, mode string, host string, process_id string) error { + query := `INSERT INTO jobs (id, status, updated, mode, host, process_id) VALUES (?, ?, ?, ?, ?, ?)` + + _, err := db.Handle.Exec(query, jid, status, updated, mode, host, process_id) + if err != nil { + return err + } + return nil +} + +// Update status and time of a job. +func (db *DB) updateJobRecord(jid string, status string, now time.Time) { + query := `UPDATE jobs SET status = ?, updated = ? WHERE id = ?` + _, err := db.Handle.Exec(query, status, now, jid) + if err != nil { + log.Error(err) + } +} + +// Upsert logs, on conflict replace the existing logs. +func (db *DB) upsertLogs(jid string, apiLogs []string, containerLogs []string) { + query := ` + INSERT INTO logs (job_id, api_logs, container_logs) VALUES (?, ?, ?) + ON CONFLICT(job_id) DO UPDATE SET + api_logs = excluded.api_logs, + container_logs = excluded.container_logs; + ` + + // Convert APILogs and ContainerLogs from []string to JSON string + apiLogsJSON, err := json.Marshal(apiLogs) + if err != nil { + log.Error(err) + } + containerLogsJSON, err := json.Marshal(containerLogs) + if err != nil { + log.Error(err) + } + + _, err = db.Handle.Exec(query, jid, string(apiLogsJSON), string(containerLogsJSON)) + if err != nil { + log.Error(err) + } +} + +// Get Job Record from database given a job id. +// If job do not exists, or error encountered bool would be false. +// Similar behaviour as key exist in hashmap. +func (db *DB) GetJob(jid string) (JobRecord, bool) { + query := `SELECT * FROM jobs WHERE id = ?` + + js := JobRecord{} + + row := db.Handle.QueryRow(query, jid) + err := row.Scan(&js.JobID, &js.Status, &js.LastUpdate, &js.Mode, &js.Host, &js.ProcessID) + if err != nil { + if err == sql.ErrNoRows { + return JobRecord{}, false + } else { + log.Error(err) + return JobRecord{}, false + } + } + return js, true +} + +// Check if a job exists in database. +func (db *DB) CheckJobExist(jid string) bool { + query := `SELECT id FROM jobs WHERE id = ?` + + js := JobRecord{} + + row := db.Handle.QueryRow(query, jid) + err := row.Scan(&js.JobID) + if err != nil { + if err == sql.ErrNoRows { + return false + } else { + log.Error(err) + return false + } + } + return true +} + +func (db *DB) GetJobs(limit int, offset int) ([]JobRecord, error) { + query := `SELECT id, status, updated, process_id FROM jobs ORDER BY updated DESC LIMIT ? OFFSET ?` + + res := []JobRecord{} + + rows, err := db.Handle.Query(query, limit, offset) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var r JobRecord + var updated string + if err := rows.Scan(&r.JobID, &r.Status, &updated, &r.ProcessID); err != nil { + return nil, err + } + r.LastUpdate, err = time.Parse(time.RFC3339, updated) + if err != nil { + return nil, err + } + res = append(res, r) + } + + err = rows.Err() + if err != nil { + return nil, err + } + return res, nil +} + +// Get logs from database. If job id is not found then error will be returned. +func (db *DB) GetLogs(jid string) (JobLogs, error) { + query := `SELECT api_logs, container_logs FROM logs WHERE job_id = ?` + + logs := JobLogs{} + // These will hold the JSON strings from the database + var apiLogsJSON, containerLogsJSON string + + row := db.Handle.QueryRow(query, jid) + err := row.Scan(&apiLogsJSON, &containerLogsJSON) + if err != nil { + if err == sql.ErrNoRows { + return JobLogs{}, errors.New("not found") + } else { + return JobLogs{}, err + } + } + + // Convert JSON strings back into arrays of strings + err = json.Unmarshal([]byte(apiLogsJSON), &logs.APILogs) + if err != nil { + return JobLogs{}, errors.New("error decoding api logs") + } + err = json.Unmarshal([]byte(containerLogsJSON), &logs.ContainerLogs) + if err != nil { + return JobLogs{}, errors.New("error decoding container logs") + } + + return logs, nil +} diff --git a/jobs/docker_jobs.go b/jobs/docker_jobs.go index 9b4eb10..16088f0 100644 --- a/jobs/docker_jobs.go +++ b/jobs/docker_jobs.go @@ -2,35 +2,27 @@ package jobs import ( "app/controllers" - "app/utils" - "bufio" "context" "fmt" "os" - "strconv" - "strings" "time" - "unsafe" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" ) -// Fields are exported so that gob can access it type DockerJob struct { - ctx context.Context - ctxCancel context.CancelFunc - UUID string `json:"jobID"` - ContainerID string - Image string `json:"image"` - ProcessName string `json:"processID"` - EnvVars []string - Cmd []string `json:"commandOverride"` - UpdateTime time.Time - Status string `json:"status"` - APILogs []string + ctx context.Context + ctxCancel context.CancelFunc + UUID string `json:"jobID"` + ContainerID string + Image string `json:"image"` + ProcessName string `json:"processID"` + EnvVars []string + Cmd []string `json:"commandOverride"` + UpdateTime time.Time + Status string `json:"status"` + apiLogs []string + containerLogs []string Resources + DB *DB } func (j *DockerJob) JobID() string { @@ -49,30 +41,31 @@ func (j *DockerJob) IMAGE() string { return j.Image } -// Fetches Container logs from S3 and API logs from cache +// Return current logs of the job func (j *DockerJob) Logs() (JobLogs, error) { var logs JobLogs - cl, err := j.FetchLogs() - if err != nil { - return JobLogs{}, err - } - logs.ContainerLog = cl - logs.APILog = j.APILogs + + logs.ContainerLogs = j.containerLogs + logs.APILogs = j.apiLogs return logs, nil } func (j *DockerJob) Messages(includeErrors bool) []string { - return j.APILogs + return j.apiLogs } func (j *DockerJob) NewMessage(m string) { - j.APILogs = append(j.APILogs, m) + j.apiLogs = append(j.apiLogs, m) } +// Append error to apiLogs, cancelCtx, update Status, and time, write logs to database func (j *DockerJob) HandleError(m string) { - j.APILogs = append(j.APILogs, m) - j.NewStatusUpdate(FAILED) + j.apiLogs = append(j.apiLogs, m) j.ctxCancel() + if j.Status != DISMISSED { // if job dismissed then the error is because of dismissing job + j.NewStatusUpdate(FAILED) + go j.DB.upsertLogs(j.UUID, j.apiLogs, j.containerLogs) + } } func (j *DockerJob) LastUpdate() time.Time { @@ -81,7 +74,9 @@ func (j *DockerJob) LastUpdate() time.Time { func (j *DockerJob) NewStatusUpdate(s string) { j.Status = s - j.UpdateTime = time.Now() + now := time.Now() + j.UpdateTime = now + j.DB.updateJobRecord(j.UUID, s, now) } func (j *DockerJob) CurrentStatus() string { @@ -108,13 +103,13 @@ func (j *DockerJob) Create() error { c, err := controllers.NewDockerController() if err != nil { - j.HandleError(err.Error()) + j.ctxCancel() return err } // verify command in body if j.Cmd == nil { - j.HandleError(err.Error()) + j.ctxCancel() return err } @@ -122,11 +117,18 @@ func (j *DockerJob) Create() error { if j.Image != "" { err = c.EnsureImage(ctx, j.Image, false) if err != nil { - j.HandleError(err.Error()) + j.ctxCancel() return err } } + // At this point job is ready to be added to database + err = j.DB.addJob(j.UUID, "accepted", time.Now(), "", "local", j.ProcessName) + if err != nil { + j.ctxCancel() + return err + } + j.NewStatusUpdate(ACCEPTED) return nil } @@ -160,13 +162,9 @@ func (j *DockerJob) Run() { // wait for process to finish statusCode, errWait := c.ContainerWait(j.ctx, j.ContainerID) - logs, errLog := c.ContainerLog(j.ctx, j.ContainerID) - - // Creating new routine so that failure of writing logs does not mean failure of job - // This function does not panic - expDays, _ := strconv.Atoi(os.Getenv("EXPIRY_DAYS")) - textBytes := []byte(strings.Join(logs, "\n")) - go utils.WriteToS3(textBytes, fmt.Sprintf("%s/%s.txt", os.Getenv("S3_LOGS_DIR"), j.UUID), &j.APILogs, "text/plain", expDays) + // todo: get logs while container running so that logs or running containers is visible by users this would only be needed when docker jobs can also be async + containerLogs, errLog := c.ContainerLog(j.ctx, j.ContainerID) + j.containerLogs = containerLogs // If there are error messages remove container before cancelling context inside Handle Error for _, err := range []error{errWait, errLog} { @@ -189,8 +187,6 @@ func (j *DockerJob) Run() { } j.HandleError(fmt.Sprintf("container exit code: %d", statusCode)) return - } else if statusCode == 0 { - j.NewStatusUpdate(SUCCESSFUL) } // clean up the finished job @@ -200,6 +196,9 @@ func (j *DockerJob) Run() { return } + j.NewStatusUpdate(SUCCESSFUL) + + go j.DB.upsertLogs(j.UUID, j.apiLogs, j.containerLogs) j.ctxCancel() } @@ -211,9 +210,11 @@ func (j *DockerJob) Kill() error { return fmt.Errorf("can't call delete on an already completed, failed, or dismissed job") } + j.NewMessage("`received dismiss signal`") + c, err := controllers.NewDockerController() if err != nil { - j.NewMessage(err.Error()) + j.HandleError(err.Error()) } err = c.ContainerKillAndRemove(j.ctx, j.ContainerID, "KILL") @@ -222,75 +223,7 @@ func (j *DockerJob) Kill() error { } j.NewStatusUpdate(DISMISSED) + go j.DB.upsertLogs(j.UUID, j.apiLogs, j.containerLogs) j.ctxCancel() return nil } - -func (j *DockerJob) GetSizeinCache() int { - cmdData := int(unsafe.Sizeof(j.Cmd)) - for _, item := range j.Cmd { - cmdData += len(item) - } - - messageData := int(unsafe.Sizeof(j.APILogs)) - for _, item := range j.APILogs { - messageData += len(item) - } - - totalMemory := cmdData + messageData + - int(unsafe.Sizeof(j.ctx)) + - int(unsafe.Sizeof(j.ctxCancel)) + - int(unsafe.Sizeof(j.UUID)) + len(j.UUID) + - int(unsafe.Sizeof(j.ContainerID)) + len(j.ContainerID) + - int(unsafe.Sizeof(j.Image)) + len(j.Image) + - int(unsafe.Sizeof(j.UpdateTime)) + - int(unsafe.Sizeof(j.Status)) - return totalMemory -} - -// If JobID exists but log file doesn't then it raises an error -// Assumes jobID is valid and the process is sync -func (j *DockerJob) FetchLogs() ([]string, error) { - // Set up a session with AWS credentials and region - sess := session.Must(session.NewSessionWithOptions(session.Options{ - SharedConfigState: session.SharedConfigEnable, - })) - svc := s3.New(sess) - - bucket := os.Getenv("S3_BUCKET") - key := fmt.Sprintf("%s/%s.txt", os.Getenv("S3_LOGS_DIR"), j.UUID) - - exist, err := utils.KeyExists(key, svc) - if err != nil { - return nil, err - } - - if !exist { - return nil, fmt.Errorf("not found") - } - - // Create a new S3GetObjectInput object to specify the file to read - params := &s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), - } - - // Use the S3 service object to download the file into a byte slice - resp, err := svc.GetObject(params) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - scanner := bufio.NewScanner(resp.Body) - var logs []string - - for scanner.Scan() { - logs = append(logs, scanner.Text()) - } - if err := scanner.Err(); err != nil { - return nil, err - } - - return logs, nil -} diff --git a/jobs/jobs.go b/jobs/jobs.go index 0be6d87..b70aa81 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -32,23 +32,33 @@ type Job interface { NewStatusUpdate(string) Run() Create() error - GetSizeinCache() int } -// JobStatus describes status of a job -type JobStatus struct { +// JobRecord contains details about a job +type JobRecord struct { JobID string `json:"jobID"` LastUpdate time.Time `json:"updated"` Status string `json:"status"` ProcessID string `json:"processID"` - CMD []string `json:"commands,omitempty"` Type string `default:"process" json:"type"` + Host string `json:"host,omitempty"` + Mode string `json:"mode,omitempty"` } // JobLogs describes logs for the job type JobLogs struct { - ContainerLog []string `json:"container_log"` - APILog []string `json:"api_log"` + ContainerLogs []string `json:"container_logs"` + APILogs []string `json:"api_logs"` +} + +// Prettify JobLogs by replacing nil with empty []string{} +func (jl *JobLogs) Prettify() { + if jl.ContainerLogs == nil { + jl.ContainerLogs = []string{} + } + if jl.APILogs == nil { + jl.APILogs = []string{} + } } // OGCStatusCodes @@ -62,20 +72,19 @@ const ( // Returns an array of all Job statuses in memory // Most recently updated job first -func (jc *JobsCache) ListJobs() []JobStatus { - jc.mu.Lock() - defer jc.mu.Unlock() +func (ac *ActiveJobs) ListJobs() []JobRecord { + ac.mu.Lock() + defer ac.mu.Unlock() - jobs := make([]JobStatus, len(jc.Jobs)) + jobs := make([]JobRecord, len(ac.Jobs)) var i int - for _, j := range jc.Jobs { - js := JobStatus{ + for _, j := range ac.Jobs { + js := JobRecord{ ProcessID: (*j).ProcessID(), JobID: (*j).JobID(), LastUpdate: (*j).LastUpdate(), Status: (*j).CurrentStatus(), - CMD: (*j).CMD(), } jobs[i] = js i++ diff --git a/jobs/jobs_cache.go b/jobs/jobs_cache.go deleted file mode 100644 index f812d7e..0000000 --- a/jobs/jobs_cache.go +++ /dev/null @@ -1,156 +0,0 @@ -package jobs - -import ( - "encoding/gob" - "errors" - "fmt" - "io" - "os" - "sort" - "sync" - - "github.com/labstack/gommon/log" -) - -type JobsCache struct { - Jobs map[string]*Job `json:"jobs"` - MaxSizeBytes uint64 `json:"maxCacheBytes"` - TrimThreshold float64 `json:"cacheTrimThreshold"` - CurrentSizeBytes uint64 `json:"currentCacheBytes"` - mu sync.Mutex -} - -func (jc *JobsCache) Add(j *Job) { - jc.mu.Lock() - defer jc.mu.Unlock() - jc.Jobs[(*j).JobID()] = j -} - -func (jc *JobsCache) Remove(j *Job) { - jc.mu.Lock() - defer jc.mu.Unlock() - - delete(jc.Jobs, (*j).JobID()) -} - -func (jc *JobsCache) DumpCacheToFile() error { - // create a file to write the serialized data to - err := os.MkdirAll(".data", os.ModePerm) - if err != nil { - return err - } - file, err := os.Create(".data/snapshot.gob.tmp") - if err != nil { - return err - } - defer file.Close() - - gob.Register(&DockerJob{}) - gob.Register(&AWSBatchJob{}) - - // create an encoder and use it to serialize the map to the file - encoder := gob.NewEncoder(file) - err = encoder.Encode(jc.Jobs) - if err != nil { - return err - } - file.Close() - // saving it to tmp is better because - // if the gob panics then the existing snapshot is still untouched - err = os.Rename(".data/snapshot.gob.tmp", ".data/snapshot.gob") - if err != nil { - return fmt.Errorf("error moving file: %v", err.Error()) - } - - return nil -} - -// LoadCacheFromFile loads snapshot if it exists. -// Returns error if file could not be desearilized or not found. -// Only modifies the JobsCache if file is read and parsed correctly. -func (jc *JobsCache) LoadCacheFromFile() error { - - // create a file to read the serialized data from - file, err := os.Open(".data/snapshot.gob") - if errors.Is(err, os.ErrNotExist) { - return fmt.Errorf("not found") - } - - js := make(map[string]*Job) // - - defer file.Close() - - gob.Register(&DockerJob{}) - gob.Register(&AWSBatchJob{}) - - // create a decoder and use it to deserialize the people map from the file - decoder := gob.NewDecoder(file) - err = decoder.Decode(&js) - if err != nil && !errors.Is(err, io.EOF) { - return err - } - jc.Jobs = *&js - return nil -} - -func (jc *JobsCache) TrimCache(desiredLength int64) { - jc.mu.Lock() - defer jc.mu.Unlock() - - jobIDs := make([]string, len(jc.Jobs)) - - var i int - for k := range jc.Jobs { - jobIDs[i] = k - i++ - } - - // sort the jobIDs in reverse order with most recent time first - sort.Slice(jobIDs, func(i, j int) bool { - return (*jc.Jobs[jobIDs[i]]).LastUpdate().After((*jc.Jobs[jobIDs[j]]).LastUpdate()) - }) - - // delete these records from the map - for _, jid := range jobIDs[0:desiredLength] { - delete(jc.Jobs, jid) - } -} - -// Revised to kill only currently active jobs -func (jc *JobsCache) KillAll() error { - jc.mu.Lock() - defer jc.mu.Unlock() - - for _, j := range jc.Jobs { - if (*j).CurrentStatus() == ACCEPTED || (*j).CurrentStatus() == RUNNING { - if err := (*j).Kill(); err != nil { - return err - } - } - } - return nil -} - -func (jc *JobsCache) CheckCache() uint64 { - // jc.mu.Lock() - // defer jc.mu.Unlock() - - // calculate total size of cache as of now - var currentSizeBytes uint64 - for _, j := range jc.Jobs { - currentSizeBytes += uint64((*j).GetSizeinCache()) - } - jc.CurrentSizeBytes = currentSizeBytes - - pctCacheFull := float64(jc.CurrentSizeBytes) / float64(jc.MaxSizeBytes) - log.Info("cache_pct_full=", pctCacheFull, " current_size=", float64(jc.CurrentSizeBytes), " jobs=", len(jc.Jobs), " (max cache=", float64(jc.MaxSizeBytes), ")") - // set default auto-trim to 95%.... - if pctCacheFull > 0.95 { - currentLenth := len(jc.Jobs) - desiredLength := int64(jc.TrimThreshold * float64(currentLenth)) - message := fmt.Sprintf("trimming cache from %d jobs to %d jobs", currentLenth, desiredLength) - log.Info(message) - jc.TrimCache(desiredLength) - } - return currentSizeBytes -} diff --git a/jobs/metadata.go b/jobs/metadata.go index e00f980..6c299bb 100644 --- a/jobs/metadata.go +++ b/jobs/metadata.go @@ -5,7 +5,7 @@ import ( "app/utils" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "strings" "time" @@ -74,6 +74,10 @@ func (j *AWSBatchJob) WriteMeta(c *controllers.AWSBatchController) { i := image{imgURI, imgDgst} g, s, e, err := c.GetJobTimes(j.AWSBatchID) + if err != nil { + j.NewMessage(fmt.Sprintf("error writing metadata: %s", err.Error())) + return + } md := metaData{ Context: "https://github.com/Dewberry/process-api/blob/main/context.jsonld", @@ -92,9 +96,7 @@ func (j *AWSBatchJob) WriteMeta(c *controllers.AWSBatchController) { return } - utils.WriteToS3(jsonBytes, j.MetaDataLocation, &j.APILogs, "application/json", 0) - - return + utils.WriteToS3(jsonBytes, j.MetaDataLocation, &j.apiLogs, "application/json", 0) } // Get image digest from ecr @@ -181,26 +183,26 @@ func getDkrHubImageDigest(imgURI string, arch string) (string, error) { response, err := client.Get(url) if err != nil { - return "", fmt.Errorf("Error sending request: %s\n", err) + return "", fmt.Errorf("error sending request: %s", err) } defer response.Body.Close() - body, err := ioutil.ReadAll(response.Body) + body, err := io.ReadAll(response.Body) if err != nil { - return "", fmt.Errorf("Error reading response: %s\n", err) + return "", fmt.Errorf("error reading response: %s", err) } var result []interface{} err = json.Unmarshal(body, &result) if err != nil { - return "", fmt.Errorf("Error parsing JSON: %s\n", err) + return "", fmt.Errorf("error parsing JSON: %s", err) } // Currently it gets just the first image, while there can be more than 1. This is incorrect digest, ok := result[0].(map[string]interface{})["digest"].(string) if !ok { - return "", fmt.Errorf("Error retrieving image digest") + return "", fmt.Errorf("error retrieving image digest") } return digest, nil diff --git a/main.go b/main.go index 0ac4988..e77ec48 100644 --- a/main.go +++ b/main.go @@ -22,8 +22,6 @@ import ( var ( pluginsDir string - cacheSize int // default 1028*1028*1028 = 11073741824 (1GB) ~500K jobs - overwrite bool port string envFP string ) @@ -32,8 +30,6 @@ func init() { flag.StringVar(&pluginsDir, "d", "plugins", "specify the relative path of the processes dir") flag.StringVar(&port, "p", "5050", "specify the port to run the api on") - flag.IntVar(&cacheSize, "c", 11073741824, "specify the max cache size in bytes (default= 1GB)") - flag.BoolVar(&overwrite, "o", false, "overwrite cache snapshot if exist") flag.StringVar(&envFP, "e", ".env", "specify the path of the dot env file to load") flag.Parse() @@ -63,10 +59,7 @@ func init() { func main() { // Initialize resources - rh, err := handlers.NewRESTHander(pluginsDir, uint64(cacheSize), overwrite) - if err != nil { - log.Fatal(err) - } + rh := handlers.NewRESTHander(pluginsDir) // todo: handle this error: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running @@ -75,7 +68,9 @@ func main() { e.HideBanner = true e.HidePort = true e.Use(middleware.Recover()) - e.Use(middleware.Logger()) + e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ + Output: e.Logger.Output(), + })) e.Use(middleware.CORSWithConfig(middleware.CORSConfig{ AllowCredentials: true, AllowOrigins: []string{"*"}, @@ -99,21 +94,13 @@ func main() { // e.Delete("processes/:processID", rh.RegisterNewProcess) // Jobs - e.GET("/jobs", rh.JobsCacheHandler) + e.GET("/jobs", rh.ListJobsHandler) e.GET("/jobs/:jobID", rh.JobStatusHandler) - e.GET("/jobs/:jobID/results", rh.JobResultsHandler) //requires cache - e.GET("/jobs/:jobID/logs", rh.JobLogsHandler) //requires cache - e.GET("/jobs/:jobID/metadata", rh.JobMetaDataHandler) //requires cache + e.GET("/jobs/:jobID/results", rh.JobResultsHandler) + e.GET("/jobs/:jobID/logs", rh.JobLogsHandler) + e.GET("/jobs/:jobID/metadata", rh.JobMetaDataHandler) e.DELETE("/jobs/:jobID", rh.JobDismissHandler) - // JobCache Monitor - go func() { - for { - time.Sleep(60 * 60 * time.Second) // check cache once an hour - _ = rh.JobsCache.CheckCache() - } - }() - // Start server go func() { e.Logger.Info("server starting on port: ", port) @@ -131,19 +118,18 @@ func main() { e.Logger.Info("gracefully shutting down the server") // Kill any running docker containers (clean up resources) - err = rh.JobsCache.KillAll() - if err != nil { + + if err := rh.ActiveJobs.KillAll(); err != nil { e.Logger.Error(err) } else { e.Logger.Info("killed and removed active containers") } - // Dump cache to file - err = rh.JobsCache.DumpCacheToFile() - if err != nil { - e.Logger.Error(err) + time.Sleep(15 * time.Second) // sleep so that routines spawned by KillAll() can finish, using 15 seconds because AWS batch monitors jobs every 10 seconds + if err := rh.DB.Handle.Close(); err != nil { + e.Logger.Fatal(err) } else { - e.Logger.Info("snapshot created at .data/snapshot.gob") + e.Logger.Info("closed connection to database") } // Shutdown the server diff --git a/move_db_to_mount.sh b/move_db_to_mount.sh new file mode 100755 index 0000000..8f12393 --- /dev/null +++ b/move_db_to_mount.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +# Softwares like Dbeaver and QGIS can't open sqlite db saved in WSL due to special characters in path +# This script moves .data/db.sqlite file to the provided mounted location and then make a symbolic link to that location +# Usage ./move_db_to_mount.sh /mnt/c/Users/asiddiqui/.data/process-api/ + +COPY_TO_PATH=$1 + +mkdir -p "$COPY_TO_PATH" || exit +mv -f .data/db.sqlite ${COPY_TO_PATH} # if this fails, make sure you are not connected to the DB from other softwares +ln -s ${COPY_TO_PATH}/db.sqlite .data/db.sqlite + + diff --git a/processes/processes.go b/processes/processes.go index 850b224..2b92897 100644 --- a/processes/processes.go +++ b/processes/processes.go @@ -150,19 +150,13 @@ func (p process) VerifyInputs(inp map[string]interface{}) error { } // ProcessList describes processes -type ProcessList []process - -// ListAll returns all the processes' info -func (ps *ProcessList) ListAll() ([]Info, error) { - results := make([]Info, len(*ps)) - for i, p := range *ps { - results[i] = p.Info - } - return results, nil +type ProcessList struct { + List []process + InfoList []Info } func (ps *ProcessList) Get(processID string) (process, error) { - for _, p := range *ps { + for _, p := range (*ps).List { if p.Info.ID == processID { return p, nil } @@ -203,21 +197,38 @@ func newProcess(f string) (process, error) { // Load all processes from yml files in the given directory and subdirectories func LoadProcesses(dir string) (ProcessList, error) { + var pl ProcessList + ymls, err := filepath.Glob(fmt.Sprintf("%s/*/*.yml", dir)) + if err != nil { + return pl, err + } yamls, err := filepath.Glob(fmt.Sprintf("%s/*/*.yaml", dir)) + if err != nil { + return pl, err + } y := append(ymls, yamls...) - processes := make(ProcessList, len(y)) + processes := make([]process, len(y)) for i, y := range y { p, err := newProcess(y) if err != nil { - return processes, err + return pl, err } processes[i] = p } if err != nil { - return nil, err + return pl, err + } + + infos := make([]Info, len(processes)) + for i, p := range processes { + infos[i] = p.Info } - return processes, err + + pl.List = processes + pl.InfoList = infos + + return pl, err } diff --git a/public/views/jobs.html b/public/views/jobs.html index cd15e83..8d46546 100644 --- a/public/views/jobs.html +++ b/public/views/jobs.html @@ -57,37 +57,62 @@ .gray { color: gray; } + + /* we do not want visited to be highlighted as this jobs are continuously updated and visited is not relevant here */ + a:link, + a:visited { + color: blue; + } + + .pagination { + text-align: center; + } + + .prev-link { + margin-right: 10px; + }

Jobs List

-
- - {{range .}} + {{range .jobs}} - - + {{end}}
Logs (JobID) Status ProcessIDCommands Updated
{{.JobID}}{{.Status}}{{.ProcessID}} -
{{range .CMD}}{{.}} {{end}}
+ {{if eq .Status "successful"}}✔️{{else if or (eq .Status "failed") (eq .Status + "dismissed")}}❌{{else if or (eq .Status "accepted") (eq .Status "running")}}🟡{{end}} + + {{.Status}} +
{{.ProcessID}} {{.LastUpdate}}
+
+
diff --git a/public/views/landing.html b/public/views/landing.html index c69c0fe..7baaa89 100644 --- a/public/views/landing.html +++ b/public/views/landing.html @@ -5,6 +5,15 @@ Process-API + diff --git a/public/views/logs.html b/public/views/logs.html index 5c1de0a..919b54d 100644 --- a/public/views/logs.html +++ b/public/views/logs.html @@ -62,7 +62,7 @@

API Logs

- {{range .APILog}} + {{range .APILogs}} @@ -70,7 +70,7 @@

API Logs

{{.}}

Container Logs

- {{range .ContainerLog}} + {{range .ContainerLogs}} diff --git a/public/views/processes.html b/public/views/processes.html index f687f4b..e8fdd13 100644 --- a/public/views/processes.html +++ b/public/views/processes.html @@ -2,6 +2,7 @@ + Process List @@ -70,9 +79,9 @@

Process List

- {{range .}} + {{range .processes}} - + @@ -81,6 +90,17 @@

Process List

{{end}}
{{.}}
{{.Title}}{{.Title}} {{.Description}} {{.Version}} {{range .JobControlOptions}}{{.}} {{end}}
+
+ diff --git a/tests/e2e/processes/aepGrid.yml b/tests/e2e/processes/dfc.yml similarity index 60% rename from tests/e2e/processes/aepGrid.yml rename to tests/e2e/processes/dfc.yml index 98ce1df..516bfbe 100644 --- a/tests/e2e/processes/aepGrid.yml +++ b/tests/e2e/processes/dfc.yml @@ -1,8 +1,8 @@ info: - version: '2023.2.1' - id: aepGrid - title: aepGrid - description: Creates an AEP grid + version: '0.0.1' + id: dfc + title: Depth Frequency Curve + description: Returns data for depth-frequency curve jobControlOptions: - async-execute outputTransmission: @@ -16,15 +16,23 @@ host: container: command: - python - - aep_blocks.py - + - dfc.py inputs: + - id: crs + title: crs + input: + literalDataDomain: + dataType: value + valueDefinition: + anyValue: true + minOccurs: 0 + maxOccurs: 1 - id: tile title: tile input: literalDataDomain: - dataType: string + dataType: value valueDefinition: anyValue: true minOccurs: 1 @@ -33,27 +41,26 @@ inputs: title: epoch input: literalDataDomain: - dataType: string + dataType: value valueDefinition: anyValue: true minOccurs: 1 maxOccurs: 1 - - id: aepGridDestination - title: aepGridDestination + - id: points + title: points input: literalDataDomain: - dataType: reference + dataType: value valueDefinition: anyValue: true minOccurs: 1 - maxOccurs: 1 + maxOccurs: 999999 outputs: - - id: aepGrid - title: aepGrid - inputId: aepGridDestination + - id: dfc + title: dfc output: transmissionMode: - - reference + - value diff --git a/tests/e2e/tests.postman_collection.json b/tests/e2e/tests.postman_collection.json index 4c2cacd..3938f9d 100644 --- a/tests/e2e/tests.postman_collection.json +++ b/tests/e2e/tests.postman_collection.json @@ -45,16 +45,42 @@ }, { "name": "processes", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "const processData = pm.response.json().processes;\r", + "\r", + "pm.test(\"number of processes registered test\", function () {\r", + " pm.expect(processData.length).to.eql(1);\r", + "});\r", + "" + ], + "type": "text/javascript" + } + } + ], "request": { "method": "GET", "header": [], "url": { - "raw": "{{url}}/processes", + "raw": "{{url}}/processes?limit=1&offset=1", "host": [ "{{url}}" ], "path": [ "processes" + ], + "query": [ + { + "key": "limit", + "value": "1" + }, + { + "key": "offset", + "value": "1" + } ] } }, @@ -161,7 +187,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"inputs\": {\n \"aepGridDestination\": \"aep-grid-outputs/output3.tif\",\n \"epoch\": \"2020\"\n }\n}", + "raw": "{\n \"inputs\": {\n \"crs\": \"4326\",\n \"points\": [\n [\n -76.491824,\n 37.271065\n ]\n ],\n \"epoch\": \"2020\"\n }\n}", "options": { "raw": { "language": "json" @@ -181,7 +207,7 @@ "variable": [ { "key": "processID", - "value": "aepGrid" + "value": "dfc" } ] } @@ -594,7 +620,7 @@ "name": "job-submit", "item": [ { - "name": "aepGrid", + "name": "dfc", "event": [ { "listen": "test", @@ -612,7 +638,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"inputs\": {\n \"aepGridDestination\": \"aep-grid-outputs/output3.tif\",\n \"tile\": \"106\",\n \"epoch\": \"2020\"\n }\n}", + "raw": "{\n \"inputs\": {\n \"crs\": \"4326\",\n \"points\": [\n [\n -76.491824,\n 37.271065\n ]\n ],\n \"tile\": \"122\",\n \"epoch\": \"2020\"\n }\n}", "options": { "raw": { "language": "json" @@ -632,7 +658,7 @@ "variable": [ { "key": "processID", - "value": "aepGrid" + "value": "dfc" } ] } @@ -664,7 +690,7 @@ "response": [] }, { - "name": "aepGrid retry", + "name": "dfc retry", "event": [ { "listen": "test", @@ -682,7 +708,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"inputs\": {\n \"aepGridDestination\": \"aep-grid-outputs/output3.tif\",\n \"tile\": \"106\",\n \"epoch\": \"2020\"\n }\n}", + "raw": "{\n \"inputs\": {\n \"crs\": \"4326\",\n \"points\": [\n [\n -76.491824,\n 37.271065\n ]\n ],\n \"tile\": \"122\",\n \"epoch\": \"2020\"\n }\n}", "options": { "raw": { "language": "json" @@ -702,7 +728,7 @@ "variable": [ { "key": "processID", - "value": "aepGrid" + "value": "dfc" } ] } @@ -738,6 +764,51 @@ { "name": "job-details", "item": [ + { + "name": "job-logs-before-job-is-running", + "event": [ + { + "listen": "prerequest", + "script": { + "exec": [ + "" + ], + "type": "text/javascript" + } + }, + { + "listen": "test", + "script": { + "exec": [ + "" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{url}}/jobs/:jobID/logs", + "host": [ + "{{url}}" + ], + "path": [ + "jobs", + ":jobID", + "logs" + ], + "variable": [ + { + "key": "jobID", + "value": "{{jobID}}" + } + ] + } + }, + "response": [] + }, { "name": "job-status", "event": [ @@ -745,7 +816,7 @@ "listen": "prerequest", "script": { "exec": [ - "setTimeout(() => {}, 20000); \r", + "setTimeout(() => {}, 30000); \r", "" ], "type": "text/javascript" @@ -794,13 +865,13 @@ "response": [] }, { - "name": "job-logs", + "name": "job-logs-after-streamname-exist", "event": [ { "listen": "prerequest", "script": { "exec": [ - "setTimeout(() => {}, 20000); " + "" ], "type": "text/javascript" } @@ -845,7 +916,7 @@ "listen": "prerequest", "script": { "exec": [ - "setTimeout(() => {}, 200000); " + "setTimeout(() => {}, 110000); " ], "type": "text/javascript" } @@ -949,7 +1020,7 @@ "name": "job-submit", "item": [ { - "name": "aepGrid", + "name": "dfc", "event": [ { "listen": "test", @@ -967,7 +1038,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"inputs\": {\n \"aepGridDestination\": \"aep-grid-outputs/output3.tif\",\n \"tile\": \"106\",\n \"epoch\": \"2020\"\n }\n}", + "raw": "{\n \"inputs\": {\n \"crs\": \"4326\",\n \"points\": [\n [\n -76.491824,\n 37.271065\n ]\n ],\n \"tile\": \"122\",\n \"epoch\": \"2020\"\n }\n}", "options": { "raw": { "language": "json" @@ -987,7 +1058,7 @@ "variable": [ { "key": "processID", - "value": "aepGrid" + "value": "dfc" } ] } @@ -1023,31 +1094,6 @@ { "name": "Negatives", "item": [ - { - "name": "job-logs", - "request": { - "method": "GET", - "header": [], - "url": { - "raw": "{{url}}/jobs/:jobID/logs", - "host": [ - "{{url}}" - ], - "path": [ - "jobs", - ":jobID", - "logs" - ], - "variable": [ - { - "key": "jobID", - "value": "{{jobID}}" - } - ] - } - }, - "response": [] - }, { "name": "job-metadata", "request": { @@ -1160,10 +1206,10 @@ "listen": "test", "script": { "exec": [ - "const jobsData = pm.response.json();\r", + "const jobsData = pm.response.json().jobs;\r", "\r", "pm.test(\"number of jobs in record test\", function () {\r", - " pm.expect(jobsData.length).to.eql(4);\r", + " pm.expect(jobsData.length).to.eql(2);\r", "});\r", "" ], @@ -1175,7 +1221,7 @@ "method": "GET", "header": [], "url": { - "raw": "{{url}}/jobs", + "raw": "{{url}}/jobs?limit=2&offset=1", "host": [ "{{url}}" ], @@ -1184,9 +1230,12 @@ ], "query": [ { - "key": "include_error_messages", - "value": "true", - "disabled": true + "key": "limit", + "value": "2" + }, + { + "key": "offset", + "value": "1" } ] }