Skip to content

Commit

Permalink
Refactor names
Browse files Browse the repository at this point in the history
  • Loading branch information
ErdemOzgen committed Apr 16, 2024
1 parent 92813e4 commit 8c491a8
Show file tree
Hide file tree
Showing 22 changed files with 338 additions and 288 deletions.
4 changes: 2 additions & 2 deletions cmd/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func testDAGFile(name string) string {
return path.Join(d, name)
}

func testStatusEventual(t *testing.T, e engine.Engine, dagFile string, expected scheduler.SchedulerStatus) {
func testStatusEventual(t *testing.T, e engine.Engine, dagFile string, expected scheduler.Status) {
t.Helper()

d, err := loadDAG(dagFile, "")
Expand All @@ -114,7 +114,7 @@ func testStatusEventual(t *testing.T, e engine.Engine, dagFile string, expected
}, time.Millisecond*5000, time.Millisecond*50)
}

func testLastStatusEventual(t *testing.T, hs persistence.HistoryStore, dag string, expected scheduler.SchedulerStatus) {
func testLastStatusEventual(t *testing.T, hs persistence.HistoryStore, dag string, expected scheduler.Status) {
t.Helper()
require.Eventually(t, func() bool {
// TODO: do not use history store directly.
Expand Down
4 changes: 2 additions & 2 deletions cmd/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func stopDAGIfRunning(e engine.Engine, dag *dag.DAG) {
checkError(err)

// Stop the DAG if it is running.
if st.Status == scheduler.SchedulerStatus_Running {
if st.Status == scheduler.Status_Running {
log.Printf("Stopping %s for restart...", dag.Name)
cobra.CheckErr(stopRunningDAG(e, dag))
}
Expand All @@ -63,7 +63,7 @@ func stopRunningDAG(e engine.Engine, dag *dag.DAG) error {
st, err := e.GetCurrentStatus(dag)
checkError(err)

if st.Status != scheduler.SchedulerStatus_Running {
if st.Status != scheduler.Status_Running {
return nil
}
checkError(e.Stop(dag))
Expand Down
13 changes: 7 additions & 6 deletions cmd/restart_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package cmd

import (
"os"
"testing"
"time"

"github.com/ErdemOzgen/blackdagger/internal/config"
"github.com/ErdemOzgen/blackdagger/internal/engine"
"github.com/ErdemOzgen/blackdagger/internal/persistence/client"
"github.com/ErdemOzgen/blackdagger/internal/scheduler"
"github.com/stretchr/testify/require"
"os"
"testing"
"time"
)

func TestRestartCommand(t *testing.T) {
Expand All @@ -27,7 +28,7 @@ func TestRestartCommand(t *testing.T) {
time.Sleep(time.Millisecond * 100)

// Wait for the DAG running.
testStatusEventual(t, e, dagFile, scheduler.SchedulerStatus_Running)
testStatusEventual(t, e, dagFile, scheduler.Status_Running)

// Restart the DAG.
done := make(chan struct{})
Expand All @@ -39,15 +40,15 @@ func TestRestartCommand(t *testing.T) {
time.Sleep(time.Millisecond * 100)

// Wait for the DAG running again.
testStatusEventual(t, e, dagFile, scheduler.SchedulerStatus_Running)
testStatusEventual(t, e, dagFile, scheduler.Status_Running)

// Stop the restarted DAG.
testRunCommand(t, stopCmd(), cmdTest{args: []string{"stop", dagFile}})

time.Sleep(time.Millisecond * 100)

// Wait for the DAG is stopped.
testStatusEventual(t, e, dagFile, scheduler.SchedulerStatus_None)
testStatusEventual(t, e, dagFile, scheduler.Status_None)

// Check parameter was the same as the first execution
d, err := loadDAG(dagFile, "")
Expand Down
7 changes: 4 additions & 3 deletions cmd/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package cmd

import (
"fmt"
"github.com/ErdemOzgen/blackdagger/internal/scheduler"
"github.com/stretchr/testify/require"
"os"
"testing"

"github.com/ErdemOzgen/blackdagger/internal/scheduler"
"github.com/stretchr/testify/require"
)

func TestRetryCommand(t *testing.T) {
Expand All @@ -22,7 +23,7 @@ func TestRetryCommand(t *testing.T) {
// Find the request ID.
s, err := e.GetStatus(dagFile)
require.NoError(t, err)
require.Equal(t, s.Status.Status, scheduler.SchedulerStatus_Success)
require.Equal(t, s.Status.Status, scheduler.Status_Success)
require.NotNil(t, s.Status)

reqID := s.Status.RequestId
Expand Down
2 changes: 1 addition & 1 deletion cmd/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestStatusCommand(t *testing.T) {
time.Sleep(time.Millisecond * 50)

// TODO: do not use history store directly.
testLastStatusEventual(t, df.NewHistoryStore(), dagFile, scheduler.SchedulerStatus_Running)
testLastStatusEventual(t, df.NewHistoryStore(), dagFile, scheduler.Status_Running)

// Check the current status.
testRunCommand(t, statusCmd(), cmdTest{
Expand Down
7 changes: 4 additions & 3 deletions cmd/stop_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package cmd

import (
"github.com/ErdemOzgen/blackdagger/internal/scheduler"
"os"
"testing"
"time"

"github.com/ErdemOzgen/blackdagger/internal/scheduler"
)

func TestStopCommand(t *testing.T) {
Expand All @@ -26,7 +27,7 @@ func TestStopCommand(t *testing.T) {

// Wait for the DAG running.
// TODO: Do not use history store.
testLastStatusEventual(t, ds.NewHistoryStore(), dagFile, scheduler.SchedulerStatus_Running)
testLastStatusEventual(t, ds.NewHistoryStore(), dagFile, scheduler.Status_Running)

// Stop the DAG.
testRunCommand(t, stopCmd(), cmdTest{
Expand All @@ -35,6 +36,6 @@ func TestStopCommand(t *testing.T) {

// Check the last execution is cancelled.
// TODO: Do not use history store.
testLastStatusEventual(t, ds.NewHistoryStore(), dagFile, scheduler.SchedulerStatus_Cancel)
testLastStatusEventual(t, ds.NewHistoryStore(), dagFile, scheduler.Status_Cancel)
<-done
}
8 changes: 4 additions & 4 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func (a *Agent) Run(ctx context.Context) error {
// Status returns the current status of the dags.
func (a *Agent) Status() *model.Status {
scStatus := a.scheduler.Status(a.graph)
if scStatus == scheduler.SchedulerStatus_None && !a.graph.StartedAt.IsZero() {
scStatus = scheduler.SchedulerStatus_Running
if scStatus == scheduler.Status_None && !a.graph.StartedAt.IsZero() {
scStatus = scheduler.Status_Running
}

status := model.NewStatus(
Expand Down Expand Up @@ -379,7 +379,7 @@ func (a *Agent) checkIsRunning() error {
if err != nil {
return err
}
if status.Status != scheduler.SchedulerStatus_None {
if status.Status != scheduler.Status_None {
return fmt.Errorf("the DAG is already running. socket=%s",
a.DAG.SockAddr())
}
Expand All @@ -403,7 +403,7 @@ func (a *Agent) HandleHTTP(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && statusRe.MatchString(r.URL.Path):
status := a.Status()
status.Status = scheduler.SchedulerStatus_Running
status.Status = scheduler.Status_Running
b, err := status.ToJson()
if err != nil {
encodeError(w, err)
Expand Down
26 changes: 13 additions & 13 deletions internal/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestRunDAG(t *testing.T) {
a := agent.New(&agent.Config{DAG: d}, e, df)

status, _ := e.GetLatestStatus(d)
require.Equal(t, scheduler.SchedulerStatus_None, status.Status)
require.Equal(t, scheduler.Status_None, status.Status)

go func() {
err := a.Run(context.Background())
Expand All @@ -63,7 +63,7 @@ func TestRunDAG(t *testing.T) {
require.Eventually(t, func() bool {
status, err := e.GetLatestStatus(d)
require.NoError(t, err)
return status.Status == scheduler.SchedulerStatus_Success
return status.Status == scheduler.Status_Success
}, time.Second*2, time.Millisecond*100)

// check deletion of expired history files
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestCheckRunning(t *testing.T) {

status := a.Status()
require.NotNil(t, status)
require.Equal(t, status.Status, scheduler.SchedulerStatus_Running)
require.Equal(t, status.Status, scheduler.Status_Running)

a = agent.New(&agent.Config{DAG: d}, e, df)
err := a.Run(context.Background())
Expand All @@ -115,7 +115,7 @@ func TestDryRun(t *testing.T) {
status := a.Status()
require.NoError(t, err)

require.Equal(t, scheduler.SchedulerStatus_Success, status.Status)
require.Equal(t, scheduler.Status_Success, status.Status)
}

func TestCancelDAG(t *testing.T) {
Expand All @@ -139,7 +139,7 @@ func TestCancelDAG(t *testing.T) {
time.Sleep(time.Millisecond * 500)
status, err := e.GetLatestStatus(d)
require.NoError(t, err)
require.Equal(t, scheduler.SchedulerStatus_Cancel, status.Status)
require.Equal(t, scheduler.Status_Cancel, status.Status)
}
}

Expand All @@ -163,7 +163,7 @@ func TestPreConditionInvalid(t *testing.T) {

status := a.Status()

require.Equal(t, scheduler.SchedulerStatus_Cancel, status.Status)
require.Equal(t, scheduler.Status_Cancel, status.Status)
require.Equal(t, scheduler.NodeStatus_None, status.Nodes[0].Status)
require.Equal(t, scheduler.NodeStatus_None, status.Nodes[1].Status)
}
Expand All @@ -187,7 +187,7 @@ func TestPreConditionValid(t *testing.T) {
require.NoError(t, err)

status := a.Status()
require.Equal(t, scheduler.SchedulerStatus_Success, status.Status)
require.Equal(t, scheduler.Status_Success, status.Status)
for _, s := range status.Nodes {
require.Equal(t, scheduler.NodeStatus_Success, s.Status)
}
Expand All @@ -205,7 +205,7 @@ func TestStartError(t *testing.T) {
require.Error(t, err)

status := a.Status()
require.Equal(t, scheduler.SchedulerStatus_Error, status.Status)
require.Equal(t, scheduler.Status_Error, status.Status)
}

func TestOnExit(t *testing.T) {
Expand All @@ -220,7 +220,7 @@ func TestOnExit(t *testing.T) {
require.NoError(t, err)

status := a.Status()
require.Equal(t, scheduler.SchedulerStatus_Success, status.Status)
require.Equal(t, scheduler.Status_Success, status.Status)
for _, s := range status.Nodes {
require.Equal(t, scheduler.NodeStatus_Success, s.Status)
}
Expand All @@ -240,7 +240,7 @@ func TestRetry(t *testing.T) {
require.Error(t, err)

status := a.Status()
require.Equal(t, scheduler.SchedulerStatus_Error, status.Status)
require.Equal(t, scheduler.Status_Error, status.Status)

for _, n := range status.Nodes {
n.CmdWithArgs = "true"
Expand All @@ -251,7 +251,7 @@ func TestRetry(t *testing.T) {
require.NoError(t, err)

status = a.Status()
require.Equal(t, scheduler.SchedulerStatus_Success, status.Status)
require.Equal(t, scheduler.Status_Success, status.Status)

for _, n := range status.Nodes {
if n.Status != scheduler.NodeStatus_Success &&
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestHandleHTTP(t *testing.T) {

status, err := model.StatusFromJson(mockResponseWriter.body)
require.NoError(t, err)
require.Equal(t, scheduler.SchedulerStatus_Running, status.Status)
require.Equal(t, scheduler.Status_Running, status.Status)

// invalid path
req = &http.Request{
Expand All @@ -318,7 +318,7 @@ func TestHandleHTTP(t *testing.T) {
<-time.After(time.Millisecond * 50)

status = a.Status()
require.Equal(t, status.Status, scheduler.SchedulerStatus_Cancel)
require.Equal(t, status.Status, scheduler.Status_Cancel)
}

type mockResponseWriter struct {
Expand Down
11 changes: 6 additions & 5 deletions internal/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package engine
import (
"errors"
"fmt"
"os"
"os/exec"
"syscall"
"time"

"github.com/ErdemOzgen/blackdagger/internal/dag"
"github.com/ErdemOzgen/blackdagger/internal/persistence"
"github.com/ErdemOzgen/blackdagger/internal/persistence/model"
"github.com/ErdemOzgen/blackdagger/internal/scheduler"
"github.com/ErdemOzgen/blackdagger/internal/sock"
"github.com/ErdemOzgen/blackdagger/internal/utils"
"os"
"os/exec"
"syscall"
"time"
)

type Engine interface {
Expand Down Expand Up @@ -216,7 +217,7 @@ func (e *engineImpl) UpdateStatus(dag *dag.DAG, status *model.Status) error {
} else {
ss, _ := model.StatusFromJson(res)
if ss != nil && ss.RequestId == status.RequestId &&
ss.Status == scheduler.SchedulerStatus_Running {
ss.Status == scheduler.Status_Running {
return fmt.Errorf("the DAG is running")
}
}
Expand Down
Loading

0 comments on commit 8c491a8

Please sign in to comment.