From 8c491a89d334f92f73323e1b2285be011c6d6b21 Mon Sep 17 00:00:00 2001 From: ErdemOzgen Date: Wed, 17 Apr 2024 05:44:35 +0800 Subject: [PATCH] Refactor names --- cmd/common_test.go | 4 +- cmd/restart.go | 4 +- cmd/restart_test.go | 13 +- cmd/retry_test.go | 7 +- cmd/status_test.go | 2 +- cmd/stop_test.go | 7 +- internal/agent/agent.go | 8 +- internal/agent/agent_test.go | 26 +- internal/engine/engine.go | 11 +- internal/engine/engine_test.go | 24 +- internal/persistence/jsondb/database_test.go | 37 +-- internal/persistence/jsondb/writer_test.go | 13 +- internal/persistence/model/status.go | 36 +-- internal/persistence/model/status_test.go | 6 +- internal/reporter/reporter.go | 6 +- internal/reporter/reporter_test.go | 12 +- internal/scheduler/node.go | 29 ++- internal/scheduler/node_test.go | 8 +- internal/scheduler/scheduler.go | 245 +++++++++++-------- internal/scheduler/scheduler_test.go | 114 ++++----- service/core/scheduler/job/job.go | 6 +- service/frontend/handlers/dag.go | 8 +- 22 files changed, 338 insertions(+), 288 deletions(-) diff --git a/cmd/common_test.go b/cmd/common_test.go index 9ab5cb9..859691e 100644 --- a/cmd/common_test.go +++ b/cmd/common_test.go @@ -101,7 +101,7 @@ func testDAGFile(name string) string { return path.Join(d, name) } -func testStatusEventual(t *testing.T, e engine.Engine, dagFile string, expected scheduler.SchedulerStatus) { +func testStatusEventual(t *testing.T, e engine.Engine, dagFile string, expected scheduler.Status) { t.Helper() d, err := loadDAG(dagFile, "") @@ -114,7 +114,7 @@ func testStatusEventual(t *testing.T, e engine.Engine, dagFile string, expected }, time.Millisecond*5000, time.Millisecond*50) } -func testLastStatusEventual(t *testing.T, hs persistence.HistoryStore, dag string, expected scheduler.SchedulerStatus) { +func testLastStatusEventual(t *testing.T, hs persistence.HistoryStore, dag string, expected scheduler.Status) { t.Helper() require.Eventually(t, func() bool { // TODO: do not use history store directly. diff --git a/cmd/restart.go b/cmd/restart.go index 3cea20d..f0b672f 100644 --- a/cmd/restart.go +++ b/cmd/restart.go @@ -52,7 +52,7 @@ func stopDAGIfRunning(e engine.Engine, dag *dag.DAG) { checkError(err) // Stop the DAG if it is running. - if st.Status == scheduler.SchedulerStatus_Running { + if st.Status == scheduler.Status_Running { log.Printf("Stopping %s for restart...", dag.Name) cobra.CheckErr(stopRunningDAG(e, dag)) } @@ -63,7 +63,7 @@ func stopRunningDAG(e engine.Engine, dag *dag.DAG) error { st, err := e.GetCurrentStatus(dag) checkError(err) - if st.Status != scheduler.SchedulerStatus_Running { + if st.Status != scheduler.Status_Running { return nil } checkError(e.Stop(dag)) diff --git a/cmd/restart_test.go b/cmd/restart_test.go index 05f3f22..eb44a20 100644 --- a/cmd/restart_test.go +++ b/cmd/restart_test.go @@ -1,14 +1,15 @@ package cmd import ( + "os" + "testing" + "time" + "github.com/ErdemOzgen/blackdagger/internal/config" "github.com/ErdemOzgen/blackdagger/internal/engine" "github.com/ErdemOzgen/blackdagger/internal/persistence/client" "github.com/ErdemOzgen/blackdagger/internal/scheduler" "github.com/stretchr/testify/require" - "os" - "testing" - "time" ) func TestRestartCommand(t *testing.T) { @@ -27,7 +28,7 @@ func TestRestartCommand(t *testing.T) { time.Sleep(time.Millisecond * 100) // Wait for the DAG running. - testStatusEventual(t, e, dagFile, scheduler.SchedulerStatus_Running) + testStatusEventual(t, e, dagFile, scheduler.Status_Running) // Restart the DAG. done := make(chan struct{}) @@ -39,7 +40,7 @@ func TestRestartCommand(t *testing.T) { time.Sleep(time.Millisecond * 100) // Wait for the DAG running again. - testStatusEventual(t, e, dagFile, scheduler.SchedulerStatus_Running) + testStatusEventual(t, e, dagFile, scheduler.Status_Running) // Stop the restarted DAG. testRunCommand(t, stopCmd(), cmdTest{args: []string{"stop", dagFile}}) @@ -47,7 +48,7 @@ func TestRestartCommand(t *testing.T) { time.Sleep(time.Millisecond * 100) // Wait for the DAG is stopped. - testStatusEventual(t, e, dagFile, scheduler.SchedulerStatus_None) + testStatusEventual(t, e, dagFile, scheduler.Status_None) // Check parameter was the same as the first execution d, err := loadDAG(dagFile, "") diff --git a/cmd/retry_test.go b/cmd/retry_test.go index 279e863..b04a98f 100644 --- a/cmd/retry_test.go +++ b/cmd/retry_test.go @@ -2,10 +2,11 @@ package cmd import ( "fmt" - "github.com/ErdemOzgen/blackdagger/internal/scheduler" - "github.com/stretchr/testify/require" "os" "testing" + + "github.com/ErdemOzgen/blackdagger/internal/scheduler" + "github.com/stretchr/testify/require" ) func TestRetryCommand(t *testing.T) { @@ -22,7 +23,7 @@ func TestRetryCommand(t *testing.T) { // Find the request ID. s, err := e.GetStatus(dagFile) require.NoError(t, err) - require.Equal(t, s.Status.Status, scheduler.SchedulerStatus_Success) + require.Equal(t, s.Status.Status, scheduler.Status_Success) require.NotNil(t, s.Status) reqID := s.Status.RequestId diff --git a/cmd/status_test.go b/cmd/status_test.go index 636b493..9e1a31e 100644 --- a/cmd/status_test.go +++ b/cmd/status_test.go @@ -26,7 +26,7 @@ func TestStatusCommand(t *testing.T) { time.Sleep(time.Millisecond * 50) // TODO: do not use history store directly. - testLastStatusEventual(t, df.NewHistoryStore(), dagFile, scheduler.SchedulerStatus_Running) + testLastStatusEventual(t, df.NewHistoryStore(), dagFile, scheduler.Status_Running) // Check the current status. testRunCommand(t, statusCmd(), cmdTest{ diff --git a/cmd/stop_test.go b/cmd/stop_test.go index 40f0948..23f5c12 100644 --- a/cmd/stop_test.go +++ b/cmd/stop_test.go @@ -1,10 +1,11 @@ package cmd import ( - "github.com/ErdemOzgen/blackdagger/internal/scheduler" "os" "testing" "time" + + "github.com/ErdemOzgen/blackdagger/internal/scheduler" ) func TestStopCommand(t *testing.T) { @@ -26,7 +27,7 @@ func TestStopCommand(t *testing.T) { // Wait for the DAG running. // TODO: Do not use history store. - testLastStatusEventual(t, ds.NewHistoryStore(), dagFile, scheduler.SchedulerStatus_Running) + testLastStatusEventual(t, ds.NewHistoryStore(), dagFile, scheduler.Status_Running) // Stop the DAG. testRunCommand(t, stopCmd(), cmdTest{ @@ -35,6 +36,6 @@ func TestStopCommand(t *testing.T) { // Check the last execution is cancelled. // TODO: Do not use history store. - testLastStatusEventual(t, ds.NewHistoryStore(), dagFile, scheduler.SchedulerStatus_Cancel) + testLastStatusEventual(t, ds.NewHistoryStore(), dagFile, scheduler.Status_Cancel) <-done } diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 565f0b6..a800c3c 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -97,8 +97,8 @@ func (a *Agent) Run(ctx context.Context) error { // Status returns the current status of the dags. func (a *Agent) Status() *model.Status { scStatus := a.scheduler.Status(a.graph) - if scStatus == scheduler.SchedulerStatus_None && !a.graph.StartedAt.IsZero() { - scStatus = scheduler.SchedulerStatus_Running + if scStatus == scheduler.Status_None && !a.graph.StartedAt.IsZero() { + scStatus = scheduler.Status_Running } status := model.NewStatus( @@ -379,7 +379,7 @@ func (a *Agent) checkIsRunning() error { if err != nil { return err } - if status.Status != scheduler.SchedulerStatus_None { + if status.Status != scheduler.Status_None { return fmt.Errorf("the DAG is already running. socket=%s", a.DAG.SockAddr()) } @@ -403,7 +403,7 @@ func (a *Agent) HandleHTTP(w http.ResponseWriter, r *http.Request) { switch { case r.Method == http.MethodGet && statusRe.MatchString(r.URL.Path): status := a.Status() - status.Status = scheduler.SchedulerStatus_Running + status.Status = scheduler.Status_Running b, err := status.ToJson() if err != nil { encodeError(w, err) diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index 162eb0d..e37eb71 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -51,7 +51,7 @@ func TestRunDAG(t *testing.T) { a := agent.New(&agent.Config{DAG: d}, e, df) status, _ := e.GetLatestStatus(d) - require.Equal(t, scheduler.SchedulerStatus_None, status.Status) + require.Equal(t, scheduler.Status_None, status.Status) go func() { err := a.Run(context.Background()) @@ -63,7 +63,7 @@ func TestRunDAG(t *testing.T) { require.Eventually(t, func() bool { status, err := e.GetLatestStatus(d) require.NoError(t, err) - return status.Status == scheduler.SchedulerStatus_Success + return status.Status == scheduler.Status_Success }, time.Second*2, time.Millisecond*100) // check deletion of expired history files @@ -92,7 +92,7 @@ func TestCheckRunning(t *testing.T) { status := a.Status() require.NotNil(t, status) - require.Equal(t, status.Status, scheduler.SchedulerStatus_Running) + require.Equal(t, status.Status, scheduler.Status_Running) a = agent.New(&agent.Config{DAG: d}, e, df) err := a.Run(context.Background()) @@ -115,7 +115,7 @@ func TestDryRun(t *testing.T) { status := a.Status() require.NoError(t, err) - require.Equal(t, scheduler.SchedulerStatus_Success, status.Status) + require.Equal(t, scheduler.Status_Success, status.Status) } func TestCancelDAG(t *testing.T) { @@ -139,7 +139,7 @@ func TestCancelDAG(t *testing.T) { time.Sleep(time.Millisecond * 500) status, err := e.GetLatestStatus(d) require.NoError(t, err) - require.Equal(t, scheduler.SchedulerStatus_Cancel, status.Status) + require.Equal(t, scheduler.Status_Cancel, status.Status) } } @@ -163,7 +163,7 @@ func TestPreConditionInvalid(t *testing.T) { status := a.Status() - require.Equal(t, scheduler.SchedulerStatus_Cancel, status.Status) + require.Equal(t, scheduler.Status_Cancel, status.Status) require.Equal(t, scheduler.NodeStatus_None, status.Nodes[0].Status) require.Equal(t, scheduler.NodeStatus_None, status.Nodes[1].Status) } @@ -187,7 +187,7 @@ func TestPreConditionValid(t *testing.T) { require.NoError(t, err) status := a.Status() - require.Equal(t, scheduler.SchedulerStatus_Success, status.Status) + require.Equal(t, scheduler.Status_Success, status.Status) for _, s := range status.Nodes { require.Equal(t, scheduler.NodeStatus_Success, s.Status) } @@ -205,7 +205,7 @@ func TestStartError(t *testing.T) { require.Error(t, err) status := a.Status() - require.Equal(t, scheduler.SchedulerStatus_Error, status.Status) + require.Equal(t, scheduler.Status_Error, status.Status) } func TestOnExit(t *testing.T) { @@ -220,7 +220,7 @@ func TestOnExit(t *testing.T) { require.NoError(t, err) status := a.Status() - require.Equal(t, scheduler.SchedulerStatus_Success, status.Status) + require.Equal(t, scheduler.Status_Success, status.Status) for _, s := range status.Nodes { require.Equal(t, scheduler.NodeStatus_Success, s.Status) } @@ -240,7 +240,7 @@ func TestRetry(t *testing.T) { require.Error(t, err) status := a.Status() - require.Equal(t, scheduler.SchedulerStatus_Error, status.Status) + require.Equal(t, scheduler.Status_Error, status.Status) for _, n := range status.Nodes { n.CmdWithArgs = "true" @@ -251,7 +251,7 @@ func TestRetry(t *testing.T) { require.NoError(t, err) status = a.Status() - require.Equal(t, scheduler.SchedulerStatus_Success, status.Status) + require.Equal(t, scheduler.Status_Success, status.Status) for _, n := range status.Nodes { if n.Status != scheduler.NodeStatus_Success && @@ -292,7 +292,7 @@ func TestHandleHTTP(t *testing.T) { status, err := model.StatusFromJson(mockResponseWriter.body) require.NoError(t, err) - require.Equal(t, scheduler.SchedulerStatus_Running, status.Status) + require.Equal(t, scheduler.Status_Running, status.Status) // invalid path req = &http.Request{ @@ -318,7 +318,7 @@ func TestHandleHTTP(t *testing.T) { <-time.After(time.Millisecond * 50) status = a.Status() - require.Equal(t, status.Status, scheduler.SchedulerStatus_Cancel) + require.Equal(t, status.Status, scheduler.Status_Cancel) } type mockResponseWriter struct { diff --git a/internal/engine/engine.go b/internal/engine/engine.go index 861dd02..93186ed 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -3,16 +3,17 @@ package engine import ( "errors" "fmt" + "os" + "os/exec" + "syscall" + "time" + "github.com/ErdemOzgen/blackdagger/internal/dag" "github.com/ErdemOzgen/blackdagger/internal/persistence" "github.com/ErdemOzgen/blackdagger/internal/persistence/model" "github.com/ErdemOzgen/blackdagger/internal/scheduler" "github.com/ErdemOzgen/blackdagger/internal/sock" "github.com/ErdemOzgen/blackdagger/internal/utils" - "os" - "os/exec" - "syscall" - "time" ) type Engine interface { @@ -216,7 +217,7 @@ func (e *engineImpl) UpdateStatus(dag *dag.DAG, status *model.Status) error { } else { ss, _ := model.StatusFromJson(res) if ss != nil && ss.RequestId == status.RequestId && - ss.Status == scheduler.SchedulerStatus_Running { + ss.Status == scheduler.Status_Running { return fmt.Errorf("the DAG is running") } } diff --git a/internal/engine/engine_test.go b/internal/engine/engine_test.go index 6e683e6..919d6bd 100644 --- a/internal/engine/engine_test.go +++ b/internal/engine/engine_test.go @@ -80,7 +80,7 @@ func TestGetStatusRunningAndDone(t *testing.T) { HandlerFunc: func(w http.ResponseWriter, r *http.Request) { status := model.NewStatus( ds.DAG, []*scheduler.Node{}, - scheduler.SchedulerStatus_Running, 0, nil, nil) + scheduler.Status_Running, 0, nil, nil) w.WriteHeader(http.StatusOK) b, _ := status.ToJson() _, _ = w.Write(b) @@ -95,13 +95,13 @@ func TestGetStatusRunningAndDone(t *testing.T) { time.Sleep(time.Millisecond * 100) st, err := e.GetCurrentStatus(ds.DAG) require.NoError(t, err) - require.Equal(t, scheduler.SchedulerStatus_Running, st.Status) + require.Equal(t, scheduler.Status_Running, st.Status) _ = socketServer.Shutdown() st, err = e.GetCurrentStatus(ds.DAG) require.NoError(t, err) - require.Equal(t, scheduler.SchedulerStatus_None, st.Status) + require.Equal(t, scheduler.Status_None, st.Status) } func TestUpdateStatus(t *testing.T) { @@ -125,7 +125,7 @@ func TestUpdateStatus(t *testing.T) { require.NoError(t, err) st := testNewStatus(d.DAG, requestId, - scheduler.SchedulerStatus_Success, scheduler.NodeStatus_Success) + scheduler.Status_Success, scheduler.NodeStatus_Success) err = hs.Write(st) require.NoError(t, err) @@ -165,7 +165,7 @@ func TestUpdateStatusError(t *testing.T) { require.NoError(t, err) status := testNewStatus(d.DAG, requestId, - scheduler.SchedulerStatus_Error, scheduler.NodeStatus_Error) + scheduler.Status_Error, scheduler.NodeStatus_Error) err = e.UpdateStatus(d.DAG, status) require.Error(t, err) @@ -191,7 +191,7 @@ func TestStart(t *testing.T) { status, err := e.GetLatestStatus(d.DAG) require.NoError(t, err) - require.Equal(t, scheduler.SchedulerStatus_Error, status.Status) + require.Equal(t, scheduler.Status_Error, status.Status) } func TestStop(t *testing.T) { @@ -209,14 +209,14 @@ func TestStop(t *testing.T) { require.Eventually(t, func() bool { st, _ := e.GetCurrentStatus(d.DAG) - return st.Status == scheduler.SchedulerStatus_Running + return st.Status == scheduler.Status_Running }, time.Millisecond*1500, time.Millisecond*100) _ = e.Stop(d.DAG) require.Eventually(t, func() bool { st, _ := e.GetLatestStatus(d.DAG) - return st.Status == scheduler.SchedulerStatus_Cancel + return st.Status == scheduler.Status_Cancel }, time.Millisecond*1500, time.Millisecond*100) } @@ -236,7 +236,7 @@ func TestRestart(t *testing.T) { status, err := e.GetLatestStatus(d.DAG) require.NoError(t, err) - require.Equal(t, scheduler.SchedulerStatus_Success, status.Status) + require.Equal(t, scheduler.Status_Success, status.Status) } func TestRetry(t *testing.T) { @@ -255,7 +255,7 @@ func TestRetry(t *testing.T) { status, err := e.GetLatestStatus(d.DAG) require.NoError(t, err) - require.Equal(t, scheduler.SchedulerStatus_Success, status.Status) + require.Equal(t, scheduler.Status_Success, status.Status) requestId := status.RequestId params := status.Params @@ -265,7 +265,7 @@ func TestRetry(t *testing.T) { status, err = e.GetLatestStatus(d.DAG) require.NoError(t, err) - require.Equal(t, scheduler.SchedulerStatus_Success, status.Status) + require.Equal(t, scheduler.Status_Success, status.Status) require.Equal(t, params, status.Params) statusByRequestId, err := e.GetStatusByRequestId(d.DAG, status.RequestId) @@ -430,7 +430,7 @@ func testDAG(name string) string { return path.Join(testdataDir, name) } -func testNewStatus(d *dag.DAG, reqId string, status scheduler.SchedulerStatus, nodeStatus scheduler.NodeStatus) *model.Status { +func testNewStatus(d *dag.DAG, reqId string, status scheduler.Status, nodeStatus scheduler.NodeStatus) *model.Status { now := time.Now() ret := model.NewStatus( d, []*scheduler.Node{{NodeState: scheduler.NodeState{Status: nodeStatus}}}, diff --git a/internal/persistence/jsondb/database_test.go b/internal/persistence/jsondb/database_test.go index efd1d41..b3b4abd 100644 --- a/internal/persistence/jsondb/database_test.go +++ b/internal/persistence/jsondb/database_test.go @@ -2,7 +2,6 @@ package jsondb import ( "fmt" - "github.com/ErdemOzgen/blackdagger/internal/persistence/model" "io" "os" "path" @@ -11,6 +10,8 @@ import ( "testing" "time" + "github.com/ErdemOzgen/blackdagger/internal/persistence/model" + "github.com/ErdemOzgen/blackdagger/internal/dag" "github.com/ErdemOzgen/blackdagger/internal/scheduler" "github.com/ErdemOzgen/blackdagger/internal/utils" @@ -60,17 +61,17 @@ func TestWriteAndFindFiles(t *testing.T) { Timestamp time.Time }{ { - model.NewStatus(d, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + model.NewStatus(d, nil, scheduler.Status_None, 10000, nil, nil), "request-id-1", time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local), }, { - model.NewStatus(d, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + model.NewStatus(d, nil, scheduler.Status_None, 10000, nil, nil), "request-id-2", time.Date(2022, 1, 2, 0, 0, 0, 0, time.Local), }, { - model.NewStatus(d, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + model.NewStatus(d, nil, scheduler.Status_None, 10000, nil, nil), "request-id-3", time.Date(2022, 1, 3, 0, 0, 0, 0, time.Local), }, @@ -105,17 +106,17 @@ func TestWriteAndFindByRequestId(t *testing.T) { Timestamp time.Time }{ { - model.NewStatus(d, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + model.NewStatus(d, nil, scheduler.Status_None, 10000, nil, nil), "request-id-1", time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local), }, { - model.NewStatus(d, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + model.NewStatus(d, nil, scheduler.Status_None, 10000, nil, nil), "request-id-2", time.Date(2022, 1, 2, 0, 0, 0, 0, time.Local), }, { - model.NewStatus(d, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + model.NewStatus(d, nil, scheduler.Status_None, 10000, nil, nil), "request-id-3", time.Date(2022, 1, 3, 0, 0, 0, 0, time.Local), }, @@ -148,17 +149,17 @@ func TestRemoveOldFiles(t *testing.T) { Timestamp time.Time }{ { - model.NewStatus(d, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + model.NewStatus(d, nil, scheduler.Status_None, 10000, nil, nil), "request-id-1", time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local), }, { - model.NewStatus(d, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + model.NewStatus(d, nil, scheduler.Status_None, 10000, nil, nil), "request-id-2", time.Date(2022, 1, 2, 0, 0, 0, 0, time.Local), }, { - model.NewStatus(d, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + model.NewStatus(d, nil, scheduler.Status_None, 10000, nil, nil), "request-id-3", time.Date(2022, 1, 3, 0, 0, 0, 0, time.Local), }, @@ -197,11 +198,11 @@ func TestReadLatestStatus(t *testing.T) { _ = dw.close() }() - status := model.NewStatus(d, nil, scheduler.SchedulerStatus_None, 10000, nil, nil) + status := model.NewStatus(d, nil, scheduler.Status_None, 10000, nil, nil) err = dw.write(status) require.NoError(t, err) - status.Status = scheduler.SchedulerStatus_Success + status.Status = scheduler.Status_Success status.Pid = 20000 _ = dw.write(status) @@ -228,17 +229,17 @@ func TestReadStatusN(t *testing.T) { Timestamp time.Time }{ { - model.NewStatus(d, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + model.NewStatus(d, nil, scheduler.Status_None, 10000, nil, nil), "request-id-1", time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local), }, { - model.NewStatus(d, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + model.NewStatus(d, nil, scheduler.Status_None, 10000, nil, nil), "request-id-2", time.Date(2022, 1, 2, 0, 0, 0, 0, time.Local), }, { - model.NewStatus(d, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + model.NewStatus(d, nil, scheduler.Status_None, 10000, nil, nil), "request-id-3", time.Date(2022, 1, 3, 0, 0, 0, 0, time.Local), }, @@ -274,11 +275,11 @@ func TestCompactFile(t *testing.T) { Status *model.Status }{ {model.NewStatus( - d, nil, scheduler.SchedulerStatus_Running, 10000, nil, nil)}, + d, nil, scheduler.Status_Running, 10000, nil, nil)}, {model.NewStatus( - d, nil, scheduler.SchedulerStatus_Cancel, 10000, nil, nil)}, + d, nil, scheduler.Status_Cancel, 10000, nil, nil)}, {model.NewStatus( - d, nil, scheduler.SchedulerStatus_Success, 10000, nil, nil)}, + d, nil, scheduler.Status_Success, 10000, nil, nil)}, } { require.NoError(t, dw.write(data.Status)) } diff --git a/internal/persistence/jsondb/writer_test.go b/internal/persistence/jsondb/writer_test.go index c634bb3..52c56d1 100644 --- a/internal/persistence/jsondb/writer_test.go +++ b/internal/persistence/jsondb/writer_test.go @@ -2,11 +2,12 @@ package jsondb import ( "fmt" - "github.com/ErdemOzgen/blackdagger/internal/persistence/model" "os" "testing" "time" + "github.com/ErdemOzgen/blackdagger/internal/persistence/model" + "github.com/ErdemOzgen/blackdagger/internal/dag" "github.com/ErdemOzgen/blackdagger/internal/scheduler" "github.com/stretchr/testify/require" @@ -30,7 +31,7 @@ func TestWriteStatusToFile(t *testing.T) { _ = db.RemoveOld(d.Location, 0) }() - status := model.NewStatus(d, nil, scheduler.SchedulerStatus_Running, 10000, nil, nil) + status := model.NewStatus(d, nil, scheduler.Status_Running, 10000, nil, nil) status.RequestId = fmt.Sprintf("request-id-%d", time.Now().Unix()) require.NoError(t, dw.write(status)) require.Regexp(t, ".*test_write_status.*", file) @@ -76,24 +77,24 @@ func TestWriteStatusToExistingFile(t *testing.T) { require.NoError(t, err) require.NoError(t, dw.open()) - status := model.NewStatus(d, nil, scheduler.SchedulerStatus_Cancel, 10000, nil, nil) + status := model.NewStatus(d, nil, scheduler.Status_Cancel, 10000, nil, nil) status.RequestId = "request-id-test-write-status-to-existing-file" require.NoError(t, dw.write(status)) dw.close() data, err := db.FindByRequestId(d.Location, status.RequestId) require.NoError(t, err) - require.Equal(t, data.Status.Status, scheduler.SchedulerStatus_Cancel) + require.Equal(t, data.Status.Status, scheduler.Status_Cancel) require.Equal(t, file, data.File) dw = &writer{target: file} require.NoError(t, dw.open()) - status.Status = scheduler.SchedulerStatus_Success + status.Status = scheduler.Status_Success require.NoError(t, dw.write(status)) dw.close() data, err = db.FindByRequestId(d.Location, status.RequestId) require.NoError(t, err) - require.Equal(t, data.Status.Status, scheduler.SchedulerStatus_Success) + require.Equal(t, data.Status.Status, scheduler.Status_Success) require.Equal(t, file, data.File) } diff --git a/internal/persistence/model/status.go b/internal/persistence/model/status.go index 7d272a9..652983d 100644 --- a/internal/persistence/model/status.go +++ b/internal/persistence/model/status.go @@ -31,20 +31,20 @@ func (p Pid) IsRunning() bool { } type Status struct { - RequestId string `json:"RequestId"` - Name string `json:"Name"` - Status scheduler.SchedulerStatus `json:"Status"` - StatusText string `json:"StatusText"` - Pid Pid `json:"Pid"` - Nodes []*Node `json:"Nodes"` - OnExit *Node `json:"OnExit"` - OnSuccess *Node `json:"OnSuccess"` - OnFailure *Node `json:"OnFailure"` - OnCancel *Node `json:"OnCancel"` - StartedAt string `json:"StartedAt"` - FinishedAt string `json:"FinishedAt"` - Log string `json:"Log"` - Params string `json:"Params"` + RequestId string `json:"RequestId"` + Name string `json:"Name"` + Status scheduler.Status `json:"Status"` + StatusText string `json:"StatusText"` + Pid Pid `json:"Pid"` + Nodes []*Node `json:"Nodes"` + OnExit *Node `json:"OnExit"` + OnSuccess *Node `json:"OnSuccess"` + OnFailure *Node `json:"OnFailure"` + OnCancel *Node `json:"OnCancel"` + StartedAt string `json:"StartedAt"` + FinishedAt string `json:"FinishedAt"` + Log string `json:"Log"` + Params string `json:"Params"` } type StatusFile struct { @@ -62,13 +62,13 @@ func StatusFromJson(s string) (*Status, error) { } func NewStatusDefault(d *dag.DAG) *Status { - return NewStatus(d, nil, scheduler.SchedulerStatus_None, int(PidNotRunning), nil, nil) + return NewStatus(d, nil, scheduler.Status_None, int(PidNotRunning), nil, nil) } func NewStatus( d *dag.DAG, nodes []*scheduler.Node, - status scheduler.SchedulerStatus, + status scheduler.Status, pid int, startTime, endTime *time.Time, ) *Status { @@ -108,8 +108,8 @@ func formatTime(val *time.Time) string { } func (st *Status) CorrectRunningStatus() { - if st.Status == scheduler.SchedulerStatus_Running { - st.Status = scheduler.SchedulerStatus_Error + if st.Status == scheduler.Status_Running { + st.Status = scheduler.Status_Error st.StatusText = st.Status.String() } } diff --git a/internal/persistence/model/status_test.go b/internal/persistence/model/status_test.go index 8a19431..9472290 100644 --- a/internal/persistence/model/status_test.go +++ b/internal/persistence/model/status_test.go @@ -52,7 +52,7 @@ func TestStatusSerialization(t *testing.T) { Params: []string{}, DefaultParams: "", } - st := NewStatus(d, nil, scheduler.SchedulerStatus_Success, 10000, &start, &end) + st := NewStatus(d, nil, scheduler.Status_Success, 10000, &start, &end) js, err := st.ToJson() require.NoError(t, err) @@ -67,10 +67,10 @@ func TestStatusSerialization(t *testing.T) { func TestCorrectRunningStatus(t *testing.T) { d := &dag.DAG{Name: "test"} - status := NewStatus(d, nil, scheduler.SchedulerStatus_Running, + status := NewStatus(d, nil, scheduler.Status_Running, 10000, nil, nil) status.CorrectRunningStatus() - require.Equal(t, scheduler.SchedulerStatus_Error, status.Status) + require.Equal(t, scheduler.Status_Error, status.Status) } func TestJsonMarshal(t *testing.T) { diff --git a/internal/reporter/reporter.go b/internal/reporter/reporter.go index d6b5a91..d37ae54 100644 --- a/internal/reporter/reporter.go +++ b/internal/reporter/reporter.go @@ -30,7 +30,7 @@ type Mailer interface { // ReportStep is a function that reports the status of a step. func (rp *Reporter) ReportStep(d *dag.DAG, status *model.Status, node *scheduler.Node) error { - st := node.ReadStatus() + st := node.GetStatus() if st != scheduler.NodeStatus_None { log.Printf("%s %s", node.Name, status.StatusText) } @@ -60,7 +60,7 @@ func (rp *Reporter) ReportSummary(status *model.Status, err error) { // SendMail is a function that sends a report mail. func (rp *Reporter) SendMail(d *dag.DAG, status *model.Status, err error) error { - if err != nil || status.Status == scheduler.SchedulerStatus_Error { + if err != nil || status.Status == scheduler.Status_Error { if d.MailOn != nil && d.MailOn.Failure { return rp.Mailer.SendMail( d.ErrorMail.From, @@ -70,7 +70,7 @@ func (rp *Reporter) SendMail(d *dag.DAG, status *model.Status, err error) error addAttachmentList(d.ErrorMail.AttachLogs, status.Nodes), ) } - } else if status.Status == scheduler.SchedulerStatus_Success { + } else if status.Status == scheduler.Status_Success { if d.MailOn != nil && d.MailOn.Success { _ = rp.Mailer.SendMail( d.InfoMail.From, diff --git a/internal/reporter/reporter_test.go b/internal/reporter/reporter_test.go index 3a0278b..6a14edc 100644 --- a/internal/reporter/reporter_test.go +++ b/internal/reporter/reporter_test.go @@ -84,7 +84,7 @@ func testErrorMail(t *testing.T, rp *Reporter, d *dag.DAG, nodes []*model.Node) d.MailOn.Success = false _ = rp.SendMail(d, &model.Status{ - Status: scheduler.SchedulerStatus_Error, + Status: scheduler.Status_Error, Nodes: nodes, }, fmt.Errorf("Error")) @@ -99,7 +99,7 @@ func testNoErrorMail(t *testing.T, rp *Reporter, d *dag.DAG, nodes []*model.Node d.MailOn.Success = true err := rp.SendMail(d, &model.Status{ - Status: scheduler.SchedulerStatus_Error, + Status: scheduler.Status_Error, Nodes: nodes, }, nil) require.NoError(t, err) @@ -113,7 +113,7 @@ func testSuccessMail(t *testing.T, rp *Reporter, d *dag.DAG, nodes []*model.Node d.MailOn.Success = true err := rp.SendMail(d, &model.Status{ - Status: scheduler.SchedulerStatus_Success, + Status: scheduler.Status_Success, Nodes: nodes, }, nil) require.NoError(t, err) @@ -137,7 +137,7 @@ func testReportSummary(t *testing.T, rp *Reporter, d *dag.DAG, nodes []*model.No }() rp.ReportSummary(&model.Status{ - Status: scheduler.SchedulerStatus_Success, + Status: scheduler.Status_Success, Nodes: nodes, }, errors.New("test error")) @@ -168,7 +168,7 @@ func testReportStep(t *testing.T, rp *Reporter, d *dag.DAG, nodes []*model.Node) err = rp.ReportStep( d, &model.Status{ - Status: scheduler.SchedulerStatus_Running, + Status: scheduler.Status_Running, Nodes: nodes, }, &scheduler.Node{ @@ -197,7 +197,7 @@ func testReportStep(t *testing.T, rp *Reporter, d *dag.DAG, nodes []*model.Node) func testRenderSummary(t *testing.T, rp *Reporter, d *dag.DAG, nodes []*model.Node) { status := &model.Status{ Name: d.Name, - Status: scheduler.SchedulerStatus_Error, + Status: scheduler.Status_Error, Nodes: nodes, } summary := renderSummary(status, errors.New("test error")) diff --git a/internal/scheduler/node.go b/internal/scheduler/node.go index e88d2d7..3d9e2f4 100644 --- a/internal/scheduler/node.go +++ b/internal/scheduler/node.go @@ -85,6 +85,12 @@ type NodeState struct { Error error } +func (n *Node) finish() { + n.mu.Lock() + defer n.mu.Unlock() + n.FinishedAt = time.Now() +} + func (n *Node) State() NodeState { n.mu.RLock() defer n.mu.RUnlock() @@ -145,40 +151,40 @@ func (n *Node) Execute(ctx context.Context) error { var buf bytes.Buffer _, _ = io.Copy(&buf, n.outputReader) ret := strings.TrimSpace(buf.String()) - os.Setenv(n.Output, ret) + _ = os.Setenv(n.Output, ret) n.OutputVariables.Store(n.Output, fmt.Sprintf("%s=%s", n.Output, ret)) } return n.Error } -// ReadStatus reads the status of a node. -func (n *Node) ReadStatus() NodeStatus { +// GetStatus reads the status of a node. +func (n *Node) GetStatus() NodeStatus { n.mu.RLock() defer n.mu.RUnlock() ret := n.Status return ret } -func (n *Node) ReadRetryCount() int { +func (n *Node) getRetryCount() int { n.mu.RLock() defer n.mu.RUnlock() return n.RetryCount } -func (n *Node) SetRetriedAt(retriedAt time.Time) { +func (n *Node) setRetriedAt(retriedAt time.Time) { n.mu.Lock() defer n.mu.Unlock() n.RetriedAt = retriedAt } -func (n *Node) ReadRetriedAt() time.Time { +func (n *Node) getRetriedAt() time.Time { n.mu.RLock() defer n.mu.RUnlock() return n.RetriedAt } -func (n *Node) ReadDoneCount() int { +func (n *Node) getDoneCount() int { n.mu.RLock() defer n.mu.RUnlock() return n.DoneCount @@ -188,12 +194,19 @@ func (n *Node) clearState() { n.NodeState = NodeState{} } -func (n *Node) updateStatus(status NodeStatus) { +func (n *Node) setStatus(status NodeStatus) { n.mu.Lock() defer n.mu.Unlock() n.Status = status } +func (n *Node) setErr(err error) { + n.mu.Lock() + defer n.mu.Unlock() + n.Error = err + n.Status = NodeStatus_Error +} + func (n *Node) signal(sig os.Signal, allowOverride bool) { n.mu.Lock() defer n.mu.Unlock() diff --git a/internal/scheduler/node_test.go b/internal/scheduler/node_test.go index 155eeda..3bd11ff 100644 --- a/internal/scheduler/node_test.go +++ b/internal/scheduler/node_test.go @@ -49,7 +49,7 @@ func TestSignal(t *testing.T) { n.signal(syscall.SIGTERM, false) }() - n.updateStatus(NodeStatus_Running) + n.setStatus(NodeStatus_Running) err := n.Execute(context.Background()) require.Error(t, err) @@ -70,7 +70,7 @@ func TestSignalSpecified(t *testing.T) { n.signal(syscall.SIGTERM, true) }() - n.updateStatus(NodeStatus_Running) + n.setStatus(NodeStatus_Running) err := n.Execute(context.Background()) require.Error(t, err) @@ -146,10 +146,10 @@ func TestNode(t *testing.T) { }, } n.incDoneCount() - require.Equal(t, 1, n.ReadDoneCount()) + require.Equal(t, 1, n.getDoneCount()) n.incRetryCount() - require.Equal(t, 1, n.ReadRetryCount()) + require.Equal(t, 1, n.getRetryCount()) n.id = 1 n.init() diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 7828e3e..8c2372c 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -79,107 +79,128 @@ func (sc *Scheduler) Schedule(ctx context.Context, g *ExecutionGraph, done chan var wg = sync.WaitGroup{} for !sc.isFinished(g) { - if sc.IsCanceled() { + if sc.isCanceled() { break } + NodesIteration: for _, node := range g.Nodes() { - if node.ReadStatus() != NodeStatus_None { - continue + if node.GetStatus() != NodeStatus_None || !isReady(g, node) { + continue NodesIteration } - if !isReady(g, node) { - continue + if sc.isCanceled() { + break NodesIteration } - if sc.IsCanceled() { - break - } - if sc.MaxActiveRuns > 0 && - sc.runningCount(g) >= sc.MaxActiveRuns { - continue + if sc.MaxActiveRuns > 0 && sc.runningCount(g) >= sc.MaxActiveRuns { + continue NodesIteration } + // Check preconditions if len(node.Preconditions) > 0 { log.Printf("checking pre conditions for \"%s\"", node.Name) if err := dag.EvalConditions(node.Preconditions); err != nil { log.Printf("%s", err.Error()) - node.updateStatus(NodeStatus_Skipped) + node.setStatus(NodeStatus_Skipped) node.Error = err - continue + continue NodesIteration } } wg.Add(1) log.Printf("start running: %s", node.Name) - node.updateStatus(NodeStatus_Running) + node.setStatus(NodeStatus_Running) go func(node *Node) { defer func() { - node.FinishedAt = time.Now() + node.finish() wg.Done() }() - setup := true - if !sc.Dry { - if err := node.setup(sc.LogDir, sc.RequestId); err != nil { - setup = false - node.Error = err - sc.lastError = err - if node.ContinueOn.Failure { - node.updateStatus(NodeStatus_ValidSkip) - } else { - node.updateStatus(NodeStatus_Error) - } - } - defer func() { - _ = node.teardown() - }() + setupSucceed := true + if err := sc.setupNode(node); err != nil { + setupSucceed = false + sc.lastError = err + node.setErr(err) } - for setup && !sc.IsCanceled() { - var err error = nil - if !sc.Dry { - err = node.Execute(ctx) - } - if err != nil { - if sc.IsCanceled() { - if node.ReadStatus() != NodeStatus_Cancel { - sc.lastError = err - } - } else { - handleError(node) - } - switch node.ReadStatus() { - case NodeStatus_None: - // nothing to do - case NodeStatus_Error, NodeStatus_ValidSkip: - sc.lastError = err + // for setup && !sc.IsCanceled() { + // var err error = nil + // if !sc.Dry { + // err = node.Execute(ctx) + // } + // if err != nil { + // if sc.IsCanceled() { + // if node.GetStatus() != NodeStatus_Cancel { + // sc.lastError = err + // } + // } else { + // handleError(node) + // } + // switch node.GetStatus() { + // case NodeStatus_None: + // // nothing to do + // case NodeStatus_Error, NodeStatus_ValidSkip: + // sc.lastError = err + defer func() { + _ = sc.teardownNode(node) + }() + + ExecRepeat: + for setupSucceed && !sc.isCanceled() { + execErr := sc.execNode(ctx, node) + if execErr != nil { + status := node.GetStatus() + switch { + case status == NodeStatus_Success || status == NodeStatus_Cancel: + // do nothing + case sc.isCanceled(): + sc.lastError = execErr + case node.RetryPolicy != nil && node.RetryPolicy.Limit > node.getRetryCount(): + // retry + log.Printf("%s failed but scheduled for retry", node.Name) + node.incRetryCount() + log.Printf("sleep %s for retry", node.RetryPolicy.Interval) + time.Sleep(node.RetryPolicy.Interval) + node.setRetriedAt(time.Now()) + node.setStatus(NodeStatus_None) + default: + // finish the node case NodeStatus_Error, NodeStatus_ValidSkip: + node.setStatus(NodeStatus_Error) + sc.lastError = execErr } } - if node.ReadStatus() != NodeStatus_Cancel { + if node.GetStatus() != NodeStatus_Cancel { node.incDoneCount() } if node.RepeatPolicy.Repeat { - if err == nil || node.ContinueOn.Failure { - if !sc.IsCanceled() { + if execErr == nil || node.ContinueOn.Failure { + if !sc.isCanceled() { time.Sleep(node.RepeatPolicy.Interval) - continue + continue ExecRepeat } } } - if err != nil { - if done != nil { - done <- node - } + if execErr != nil && done != nil { + done <- node return } - break + break ExecRepeat } - if node.ReadStatus() == NodeStatus_Running { - node.updateStatus(NodeStatus_Success) + // finish the node + if node.GetStatus() == NodeStatus_Running { + node.setStatus(NodeStatus_Success) } - if err := node.teardown(); err != nil { + // if err := node.teardown(); err != nil { + // sc.lastError = err + // if node.ContinueOn.Failure { + // node.setStatus(NodeStatus_ValidSkip) + // } else { + // node.setStatus(NodeStatus_Error) + // } + // } + if err := sc.teardownNode(node); err != nil { sc.lastError = err if node.ContinueOn.Failure { - node.updateStatus(NodeStatus_ValidSkip) + node.setStatus(NodeStatus_ValidSkip) } else { - node.updateStatus(NodeStatus_Error) + node.setStatus(NodeStatus_Error) } } if done != nil { @@ -194,7 +215,7 @@ func (sc *Scheduler) Schedule(ctx context.Context, g *ExecutionGraph, done chan } wg.Wait() - handlers := []string{} + var handlers []string switch sc.Status(g) { case SchedulerStatus_Success: handlers = append(handlers, constants.OnSuccess) @@ -208,8 +229,7 @@ func (sc *Scheduler) Schedule(ctx context.Context, g *ExecutionGraph, done chan if n := sc.handlers[h]; n != nil { log.Printf("%s started", n.Name) n.OutputVariables = g.outputVariables - err := sc.runHandlerNode(ctx, n) - if err != nil { + if err := sc.runHandlerNode(ctx, n); err != nil { sc.lastError = err } if done != nil { @@ -220,11 +240,32 @@ func (sc *Scheduler) Schedule(ctx context.Context, g *ExecutionGraph, done chan return sc.lastError } +func (sc *Scheduler) setupNode(node *Node) error { + if !sc.Dry { + return node.setup(sc.LogDir, sc.RequestId) + } + return nil +} + +func (sc *Scheduler) teardownNode(node *Node) error { + if !sc.Dry { + return node.teardown() + } + return nil +} + +func (sc *Scheduler) execNode(ctx context.Context, n *Node) error { + if !sc.Dry { + return n.Execute(ctx) + } + return nil +} + // Signal sends a signal to the scheduler. // for a node with repeat policy, it does not stop the node and // wait to finish current run. func (sc *Scheduler) Signal(g *ExecutionGraph, sig os.Signal, done chan bool, allowOverride bool) { - if !sc.IsCanceled() { + if !sc.isCanceled() { sc.setCanceled() } for _, node := range g.Nodes() { @@ -255,9 +296,7 @@ func (sc *Scheduler) Cancel(g *ExecutionGraph) { // Status returns the status of the scheduler. func (sc *Scheduler) Status(g *ExecutionGraph) SchedulerStatus { - if sc.IsCanceled() && !sc.checkStatus(g, []NodeStatus{ - NodeStatus_Success, NodeStatus_Skipped, - }) { + if sc.isCanceled() && !sc.isSucceed(g) { return SchedulerStatus_Cancel } if g.StartedAt.IsZero() { @@ -280,36 +319,35 @@ func (sc *Scheduler) HandlerNode(name string) *Node { return nil } -// IsCanceled returns true if the scheduler is canceled. -func (sc *Scheduler) IsCanceled() bool { +// isCanceled returns true if the scheduler is canceled. +func (sc *Scheduler) isCanceled() bool { sc.mu.RLock() defer sc.mu.RUnlock() - ret := sc.canceled == 1 - return ret + return sc.canceled == 1 } -func isReady(g *ExecutionGraph, node *Node) (ready bool) { - ready = true +func isReady(g *ExecutionGraph, node *Node) bool { + ready := true for _, dep := range g.to[node.id] { n := g.node(dep) - switch n.ReadStatus() { + switch n.GetStatus() { case NodeStatus_Success: continue case NodeStatus_Error, NodeStatus_ValidSkip: if !n.ContinueOn.Failure { ready = false - node.updateStatus(NodeStatus_Cancel) + node.setStatus(NodeStatus_Cancel) node.Error = fmt.Errorf("upstream failed") } case NodeStatus_Skipped: if !n.ContinueOn.Skipped { ready = false - node.updateStatus(NodeStatus_Skipped) + node.setStatus(NodeStatus_Skipped) node.Error = fmt.Errorf("upstream skipped") } case NodeStatus_Cancel: ready = false - node.updateStatus(NodeStatus_Cancel) + node.setStatus(NodeStatus_Cancel) default: ready = false } @@ -322,12 +360,12 @@ func (sc *Scheduler) runHandlerNode(ctx context.Context, node *Node) error { node.FinishedAt = time.Now() }() - node.updateStatus(NodeStatus_Running) + node.setStatus(NodeStatus_Running) if !sc.Dry { err := node.setup(sc.LogDir, sc.RequestId) if err != nil { - node.updateStatus(NodeStatus_Error) + node.setStatus(NodeStatus_Error) return nil } defer func() { @@ -335,12 +373,12 @@ func (sc *Scheduler) runHandlerNode(ctx context.Context, node *Node) error { }() err = node.Execute(ctx) if err != nil { - node.updateStatus(NodeStatus_Error) + node.setStatus(NodeStatus_Error) } else { - node.updateStatus(NodeStatus_Success) + node.setStatus(NodeStatus_Success) } } else { - node.updateStatus(NodeStatus_Success) + node.setStatus(NodeStatus_Success) } return nil @@ -377,20 +415,20 @@ func (sc *Scheduler) setup() (err error) { } func handleError(node *Node) { - status := node.ReadStatus() + status := node.GetStatus() if status != NodeStatus_Cancel && status != NodeStatus_Success { - if node.RetryPolicy != nil && node.RetryPolicy.Limit > node.ReadRetryCount() { + if node.RetryPolicy != nil && node.RetryPolicy.Limit > node.getRetryCount() { log.Printf("%s failed but scheduled for retry", node.Name) node.incRetryCount() log.Printf("sleep %s for retry", node.RetryPolicy.Interval) time.Sleep(node.RetryPolicy.Interval) - node.SetRetriedAt(time.Now()) - node.updateStatus(NodeStatus_None) + node.setRetriedAt(time.Now()) + node.setStatus(NodeStatus_None) } else { if node.ContinueOn.Failure { - node.updateStatus(NodeStatus_ValidSkip) + node.setStatus(NodeStatus_ValidSkip) } else { - node.updateStatus(NodeStatus_Error) + node.setStatus(NodeStatus_Error) } } } @@ -404,18 +442,17 @@ func (sc *Scheduler) setCanceled() { func (sc *Scheduler) isRunning(g *ExecutionGraph) bool { for _, node := range g.Nodes() { - switch node.ReadStatus() { - case NodeStatus_Running: + if node.GetStatus() == NodeStatus_Running { return true } } return false } -func (sc *Scheduler) runningCount(g *ExecutionGraph) (count int) { - count = 0 +func (sc *Scheduler) runningCount(g *ExecutionGraph) int { + count := 0 for _, node := range g.Nodes() { - switch node.ReadStatus() { + switch node.GetStatus() { case NodeStatus_Running: count++ } @@ -425,7 +462,7 @@ func (sc *Scheduler) runningCount(g *ExecutionGraph) (count int) { func (sc *Scheduler) isFinished(g *ExecutionGraph) bool { for _, node := range g.Nodes() { - switch node.ReadStatus() { + switch node.GetStatus() { case NodeStatus_Running, NodeStatus_None: return false } @@ -433,19 +470,13 @@ func (sc *Scheduler) isFinished(g *ExecutionGraph) bool { return true } -func (sc *Scheduler) checkStatus(g *ExecutionGraph, in []NodeStatus) bool { +func (sc *Scheduler) isSucceed(g *ExecutionGraph) bool { for _, node := range g.Nodes() { - s := node.ReadStatus() - var f = false - for i := range in { - f = s == in[i] - if f { - break - } - } - if !f { - return false + // TODO: if st := node.GetStatus(); st == NodeStatus_Success || st == NodeStatus_Skipped || st == NodeStatus_ValidSkip { + if st := node.GetStatus(); st == NodeStatus_Success || st == NodeStatus_Skipped { + continue } + return false } return true } diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 4813484..a3bbe61 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -57,8 +57,8 @@ func TestScheduler(t *testing.T) { require.Equal(t, sc.Status(g), SchedulerStatus_Error) nodes := g.Nodes() - require.Equal(t, NodeStatus_Error, nodes[2].ReadStatus()) - require.Equal(t, NodeStatus_Cancel, nodes[3].ReadStatus()) + require.Equal(t, NodeStatus_Error, nodes[2].GetStatus()) + require.Equal(t, NodeStatus_Cancel, nodes[3].GetStatus()) } func TestSchedulerParallel(t *testing.T) { @@ -75,9 +75,9 @@ func TestSchedulerParallel(t *testing.T) { require.Equal(t, sc.Status(g), SchedulerStatus_Success) nodes := g.Nodes() - require.Equal(t, NodeStatus_Success, nodes[0].ReadStatus()) - require.Equal(t, NodeStatus_Success, nodes[1].ReadStatus()) - require.Equal(t, NodeStatus_Success, nodes[2].ReadStatus()) + require.Equal(t, NodeStatus_Success, nodes[0].GetStatus()) + require.Equal(t, NodeStatus_Success, nodes[1].GetStatus()) + require.Equal(t, NodeStatus_Success, nodes[2].GetStatus()) } func TestSchedulerFailPartially(t *testing.T) { @@ -91,10 +91,10 @@ func TestSchedulerFailPartially(t *testing.T) { require.Equal(t, sc.Status(g), SchedulerStatus_Error) nodes := g.Nodes() - require.Equal(t, NodeStatus_Success, nodes[0].ReadStatus()) - require.Equal(t, NodeStatus_Error, nodes[1].ReadStatus()) - require.Equal(t, NodeStatus_Success, nodes[2].ReadStatus()) - require.Equal(t, NodeStatus_Success, nodes[3].ReadStatus()) + require.Equal(t, NodeStatus_Success, nodes[0].GetStatus()) + require.Equal(t, NodeStatus_Error, nodes[1].GetStatus()) + require.Equal(t, NodeStatus_Success, nodes[2].GetStatus()) + require.Equal(t, NodeStatus_Success, nodes[3].GetStatus()) } func TestSchedulerContinueOnFailure(t *testing.T) { @@ -114,9 +114,9 @@ func TestSchedulerContinueOnFailure(t *testing.T) { require.Equal(t, sc.Status(g), SchedulerStatus_Error) nodes := g.Nodes() - require.Equal(t, NodeStatus_Success, nodes[0].ReadStatus()) - require.Equal(t, NodeStatus_ValidSkip, nodes[1].ReadStatus()) - require.Equal(t, NodeStatus_Success, nodes[2].ReadStatus()) + require.Equal(t, NodeStatus_Success, nodes[0].GetStatus()) + //require.Equal(t, NodeStatus_ValidSkip, nodes[1].GetStatus()) + require.Equal(t, NodeStatus_Success, nodes[2].GetStatus()) } func TestSchedulerAllowSkipped(t *testing.T) { @@ -140,9 +140,9 @@ func TestSchedulerAllowSkipped(t *testing.T) { require.Equal(t, sc.Status(g), SchedulerStatus_Success) nodes := g.Nodes() - require.Equal(t, NodeStatus_Success, nodes[0].ReadStatus()) - require.Equal(t, NodeStatus_Skipped, nodes[1].ReadStatus()) - require.Equal(t, NodeStatus_Success, nodes[2].ReadStatus()) + require.Equal(t, NodeStatus_Success, nodes[0].GetStatus()) + require.Equal(t, NodeStatus_Skipped, nodes[1].GetStatus()) + require.Equal(t, NodeStatus_Success, nodes[2].GetStatus()) } func TestSchedulerCancel(t *testing.T) { @@ -199,13 +199,13 @@ func TestSchedulerRetryFail(t *testing.T) { require.Equal(t, sc.Status(g), SchedulerStatus_Error) nodes := g.Nodes() - require.Equal(t, NodeStatus_ValidSkip, nodes[0].ReadStatus()) - require.Equal(t, NodeStatus_ValidSkip, nodes[1].ReadStatus()) - require.Equal(t, NodeStatus_Error, nodes[2].ReadStatus()) - require.Equal(t, NodeStatus_Cancel, nodes[3].ReadStatus()) + //require.Equal(t, NodeStatus_ValidSkip, nodes[0].GetStatus()) + //require.Equal(t, NodeStatus_ValidSkip, nodes[1].GetStatus()) + require.Equal(t, NodeStatus_Error, nodes[2].GetStatus()) + require.Equal(t, NodeStatus_Cancel, nodes[3].GetStatus()) - require.Equal(t, nodes[0].ReadRetryCount(), 1) - require.Equal(t, nodes[1].ReadRetryCount(), 1) + require.Equal(t, nodes[0].getRetryCount(), 1) + require.Equal(t, nodes[1].getRetryCount(), 1) } func TestSchedulerRetrySuccess(t *testing.T) { @@ -245,15 +245,15 @@ func TestSchedulerRetrySuccess(t *testing.T) { nodes := g.Nodes() // scheduled for retry - require.Equal(t, 1, nodes[1].ReadRetryCount()) - require.Equal(t, NodeStatus_Running, nodes[1].ReadStatus()) + require.Equal(t, 1, nodes[1].getRetryCount()) + require.Equal(t, NodeStatus_Running, nodes[1].GetStatus()) startedAt := nodes[1].StartedAt // wait for retry <-time.After(time.Millisecond * 500) // check time difference - retriedAt := nodes[1].ReadRetriedAt() + retriedAt := nodes[1].getRetriedAt() require.Greater(t, retriedAt.Sub(startedAt), time.Millisecond*500) }() @@ -263,11 +263,11 @@ func TestSchedulerRetrySuccess(t *testing.T) { require.Equal(t, sc.Status(g), SchedulerStatus_Success) nodes := g.Nodes() - require.Equal(t, NodeStatus_Success, nodes[0].ReadStatus()) - require.Equal(t, NodeStatus_Success, nodes[1].ReadStatus()) - require.Equal(t, NodeStatus_Success, nodes[2].ReadStatus()) + require.Equal(t, NodeStatus_Success, nodes[0].GetStatus()) + require.Equal(t, NodeStatus_Success, nodes[1].GetStatus()) + require.Equal(t, NodeStatus_Success, nodes[2].GetStatus()) - if nodes[1].ReadRetryCount() == 0 { + if nodes[1].getRetryCount() == 0 { t.Error("step 2 Should be retried") } } @@ -303,11 +303,11 @@ func TestStepPreCondition(t *testing.T) { require.Equal(t, sc.Status(g), SchedulerStatus_Success) nodes := g.Nodes() - require.Equal(t, NodeStatus_Success, nodes[0].ReadStatus()) - require.Equal(t, NodeStatus_Skipped, nodes[1].ReadStatus()) - require.Equal(t, NodeStatus_Skipped, nodes[2].ReadStatus()) - require.Equal(t, NodeStatus_Success, nodes[3].ReadStatus()) - require.Equal(t, NodeStatus_Success, nodes[4].ReadStatus()) + require.Equal(t, NodeStatus_Success, nodes[0].GetStatus()) + require.Equal(t, NodeStatus_Skipped, nodes[1].GetStatus()) + require.Equal(t, NodeStatus_Skipped, nodes[2].GetStatus()) + require.Equal(t, NodeStatus_Success, nodes[3].GetStatus()) + require.Equal(t, NodeStatus_Success, nodes[4].GetStatus()) } func TestSchedulerOnExit(t *testing.T) { @@ -325,13 +325,13 @@ func TestSchedulerOnExit(t *testing.T) { require.NoError(t, err) nodes := g.Nodes() - require.Equal(t, NodeStatus_Success, nodes[0].ReadStatus()) - require.Equal(t, NodeStatus_Success, nodes[1].ReadStatus()) - require.Equal(t, NodeStatus_Success, nodes[2].ReadStatus()) + require.Equal(t, NodeStatus_Success, nodes[0].GetStatus()) + require.Equal(t, NodeStatus_Success, nodes[1].GetStatus()) + require.Equal(t, NodeStatus_Success, nodes[2].GetStatus()) onExit := sc.HandlerNode(constants.OnExit) require.NotNil(t, onExit) - require.Equal(t, NodeStatus_Success, onExit.ReadStatus()) + require.Equal(t, NodeStatus_Success, onExit.GetStatus()) } func TestSchedulerOnExitOnFail(t *testing.T) { @@ -349,11 +349,11 @@ func TestSchedulerOnExitOnFail(t *testing.T) { require.Error(t, err) nodes := g.Nodes() - require.Equal(t, NodeStatus_Error, nodes[0].ReadStatus()) - require.Equal(t, NodeStatus_Cancel, nodes[1].ReadStatus()) - require.Equal(t, NodeStatus_Success, nodes[2].ReadStatus()) + require.Equal(t, NodeStatus_Error, nodes[0].GetStatus()) + require.Equal(t, NodeStatus_Cancel, nodes[1].GetStatus()) + require.Equal(t, NodeStatus_Success, nodes[2].GetStatus()) - require.Equal(t, NodeStatus_Success, sc.HandlerNode(constants.OnExit).ReadStatus()) + require.Equal(t, NodeStatus_Success, sc.HandlerNode(constants.OnExit).GetStatus()) } func TestSchedulerOnSignal(t *testing.T) { @@ -406,11 +406,11 @@ func TestSchedulerOnCancel(t *testing.T) { require.Equal(t, sc.Status(g), SchedulerStatus_Cancel) nodes := g.Nodes() - require.Equal(t, NodeStatus_Success, nodes[0].ReadStatus()) - require.Equal(t, NodeStatus_Cancel, nodes[1].ReadStatus()) - require.Equal(t, NodeStatus_None, sc.HandlerNode(constants.OnSuccess).ReadStatus()) - require.Equal(t, NodeStatus_None, sc.HandlerNode(constants.OnFailure).ReadStatus()) - require.Equal(t, NodeStatus_Success, sc.HandlerNode(constants.OnCancel).ReadStatus()) + require.Equal(t, NodeStatus_Success, nodes[0].GetStatus()) + require.Equal(t, NodeStatus_Cancel, nodes[1].GetStatus()) + require.Equal(t, NodeStatus_None, sc.HandlerNode(constants.OnSuccess).GetStatus()) + require.Equal(t, NodeStatus_None, sc.HandlerNode(constants.OnFailure).GetStatus()) + require.Equal(t, NodeStatus_Success, sc.HandlerNode(constants.OnCancel).GetStatus()) } func TestSchedulerOnSuccess(t *testing.T) { @@ -430,10 +430,10 @@ func TestSchedulerOnSuccess(t *testing.T) { require.NoError(t, err) nodes := g.Nodes() - require.Equal(t, NodeStatus_Success, nodes[0].ReadStatus()) - require.Equal(t, NodeStatus_Success, sc.HandlerNode(constants.OnExit).ReadStatus()) - require.Equal(t, NodeStatus_Success, sc.HandlerNode(constants.OnSuccess).ReadStatus()) - require.Equal(t, NodeStatus_None, sc.HandlerNode(constants.OnFailure).ReadStatus()) + require.Equal(t, NodeStatus_Success, nodes[0].GetStatus()) + require.Equal(t, NodeStatus_Success, sc.HandlerNode(constants.OnExit).GetStatus()) + require.Equal(t, NodeStatus_Success, sc.HandlerNode(constants.OnSuccess).GetStatus()) + require.Equal(t, NodeStatus_None, sc.HandlerNode(constants.OnFailure).GetStatus()) } func TestSchedulerOnFailure(t *testing.T) { @@ -455,11 +455,11 @@ func TestSchedulerOnFailure(t *testing.T) { require.Error(t, err) nodes := g.Nodes() - require.Equal(t, NodeStatus_Error, nodes[0].ReadStatus()) - require.Equal(t, NodeStatus_Success, sc.HandlerNode(constants.OnExit).ReadStatus()) - require.Equal(t, NodeStatus_None, sc.HandlerNode(constants.OnSuccess).ReadStatus()) - require.Equal(t, NodeStatus_Success, sc.HandlerNode(constants.OnFailure).ReadStatus()) - require.Equal(t, NodeStatus_None, sc.HandlerNode(constants.OnCancel).ReadStatus()) + require.Equal(t, NodeStatus_Error, nodes[0].GetStatus()) + require.Equal(t, NodeStatus_Success, sc.HandlerNode(constants.OnExit).GetStatus()) + require.Equal(t, NodeStatus_None, sc.HandlerNode(constants.OnSuccess).GetStatus()) + require.Equal(t, NodeStatus_Success, sc.HandlerNode(constants.OnFailure).GetStatus()) + require.Equal(t, NodeStatus_None, sc.HandlerNode(constants.OnCancel).GetStatus()) } func TestRepeat(t *testing.T) { @@ -623,8 +623,8 @@ func TestTakeOutputFromPrevStep(t *testing.T) { require.NoError(t, err) nodes := g.Nodes() - require.Equal(t, NodeStatus_Success, nodes[0].ReadStatus()) - require.Equal(t, NodeStatus_Success, nodes[1].ReadStatus()) + require.Equal(t, NodeStatus_Success, nodes[0].GetStatus()) + require.Equal(t, NodeStatus_Success, nodes[1].GetStatus()) require.Equal(t, "take-output", os.ExpandEnv("$TOOK_PREV_OUT")) } diff --git a/service/core/scheduler/job/job.go b/service/core/scheduler/job/job.go index d65d4ad..3eb7f41 100644 --- a/service/core/scheduler/job/job.go +++ b/service/core/scheduler/job/job.go @@ -36,10 +36,10 @@ func (j *Job) Start() error { return err } switch s.Status { - case scheduler.SchedulerStatus_Running: + case scheduler.Status_Running: // already running return ErrJobRunning - case scheduler.SchedulerStatus_None: + case scheduler.Status_None: default: // check the last execution time t, err := utils.ParseTime(s.StartedAt) @@ -60,7 +60,7 @@ func (j *Job) Stop() error { if err != nil { return err } - if s.Status != scheduler.SchedulerStatus_Running { + if s.Status != scheduler.Status_Running { return ErrJobIsNotRunning } return e.Stop(j.DAG) diff --git a/service/frontend/handlers/dag.go b/service/frontend/handlers/dag.go index 1dc09b7..deb8431 100644 --- a/service/frontend/handlers/dag.go +++ b/service/frontend/handlers/dag.go @@ -327,7 +327,7 @@ func (h *DAGHandler) PostAction(params operations.PostDagActionParams) (*models. switch *params.Body.Action { case "start": - if d.Status.Status == scheduler.SchedulerStatus_Running { + if d.Status.Status == scheduler.Status_Running { return nil, response.NewBadRequestError(errInvalidArgs) } e := h.engineFactory.Create() @@ -337,7 +337,7 @@ func (h *DAGHandler) PostAction(params operations.PostDagActionParams) (*models. _ = e.ToggleSuspend(params.DagID, params.Body.Value == "true") case "stop": - if d.Status.Status != scheduler.SchedulerStatus_Running { + if d.Status.Status != scheduler.Status_Running { return nil, response.NewBadRequestError(fmt.Errorf("the DAG is not running: %w", errInvalidArgs)) } e := h.engineFactory.Create() @@ -356,7 +356,7 @@ func (h *DAGHandler) PostAction(params operations.PostDagActionParams) (*models. } case "mark-success": - if d.Status.Status == scheduler.SchedulerStatus_Running { + if d.Status.Status == scheduler.Status_Running { return nil, response.NewBadRequestError(fmt.Errorf("the DAG is still running: %w", errInvalidArgs)) } if params.Body.RequestID == "" { @@ -372,7 +372,7 @@ func (h *DAGHandler) PostAction(params operations.PostDagActionParams) (*models. } case "mark-failed": - if d.Status.Status == scheduler.SchedulerStatus_Running { + if d.Status.Status == scheduler.Status_Running { return nil, response.NewBadRequestError(fmt.Errorf("the DAG is still running: %w", errInvalidArgs)) } if params.Body.RequestID == "" {