Skip to content

Commit

Permalink
Refactor function and command names for consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
ErdemOzgen committed Apr 16, 2024
1 parent 0ce9846 commit 92813e4
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 76 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ go-lint:
@golangci-lint run ./...

cert-dir:
@mkdir ./cert
@mkdir -p ./cert

gencerts-ca:
@openssl req -x509 -newkey rsa:4096 \
Expand Down
5 changes: 2 additions & 3 deletions app/fx.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package app

import (
"os"

"github.com/ErdemOzgen/blackdagger/internal/config"
"github.com/ErdemOzgen/blackdagger/internal/engine"
"github.com/ErdemOzgen/blackdagger/internal/logger"
"github.com/ErdemOzgen/blackdagger/internal/persistence/client"
"github.com/ErdemOzgen/blackdagger/service/frontend"
"go.uber.org/fx"
"os"
)

var (
Expand All @@ -17,9 +18,7 @@ var (
fx.Provide(logger.NewSlogLogger),
fx.Provide(client.NewDataStoreFactory),
)
)

var (
cfgInstance *config.Config = nil
)

Expand Down
4 changes: 2 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ func registerCommands(root *cobra.Command) {
rootCmd.AddCommand(stopCmd())
rootCmd.AddCommand(restartCmd())
rootCmd.AddCommand(dryCmd())
rootCmd.AddCommand(createStatusCommand())
rootCmd.AddCommand(statusCmd())
rootCmd.AddCommand(versionCmd())
rootCmd.AddCommand(serverCmd())
rootCmd.AddCommand(createSchedulerCommand())
rootCmd.AddCommand(schedulerCmd())
rootCmd.AddCommand(retryCmd())
rootCmd.AddCommand(startAllCmd())
}
2 changes: 1 addition & 1 deletion cmd/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/spf13/viper"
)

func createSchedulerCommand() *cobra.Command {
func schedulerCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "scheduler",
Short: "Start the scheduler",
Expand Down
2 changes: 1 addition & 1 deletion cmd/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestSchedulerCommand(t *testing.T) {
}()

go func() {
testRunCommand(t, createSchedulerCommand(), cmdTest{
testRunCommand(t, schedulerCmd(), cmdTest{
args: []string{"scheduler"},
expectedOut: []string{"starting blackdagger scheduler"},
})
Expand Down
3 changes: 1 addition & 2 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ func serverCmd() *cobra.Command {
// TODO: move to config files
pullDagList := []string{"default"}
Pulldags(pullDagList)
service := app.NewFrontendService()
err := service.Start(cmd.Context())
err := app.NewFrontendService().Start(cmd.Context())
checkError(err)
},
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/spf13/cobra"
)

func createStatusCommand() *cobra.Command {
func statusCmd() *cobra.Command {
return &cobra.Command{
Use: "status <DAG file>",
Short: "Display current status of the DAG",
Expand Down
5 changes: 3 additions & 2 deletions cmd/status_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package cmd

import (
"github.com/ErdemOzgen/blackdagger/internal/scheduler"
"os"
"testing"
"time"

"github.com/ErdemOzgen/blackdagger/internal/scheduler"
)

func TestStatusCommand(t *testing.T) {
Expand All @@ -28,7 +29,7 @@ func TestStatusCommand(t *testing.T) {
testLastStatusEventual(t, df.NewHistoryStore(), dagFile, scheduler.SchedulerStatus_Running)

// Check the current status.
testRunCommand(t, createStatusCommand(), cmdTest{
testRunCommand(t, statusCmd(), cmdTest{
args: []string{"status", dagFile},
expectedOut: []string{"Status=running"},
})
Expand Down
11 changes: 6 additions & 5 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/ErdemOzgen/blackdagger/internal/persistence"
"log"
"net/http"
"os"
Expand All @@ -15,6 +14,8 @@ import (
"syscall"
"time"

"github.com/ErdemOzgen/blackdagger/internal/persistence"

"github.com/ErdemOzgen/blackdagger/internal/constants"
"github.com/ErdemOzgen/blackdagger/internal/dag"
"github.com/ErdemOzgen/blackdagger/internal/engine"
Expand Down Expand Up @@ -111,16 +112,16 @@ func (a *Agent) Status() *model.Status {
status.RequestId = a.requestId
status.Log = a.logManager.logFilename
if node := a.scheduler.HandlerNode(constants.OnExit); node != nil {
status.OnExit = model.FromNode(node)
status.OnExit = model.FromNode(node.State(), node.Step)
}
if node := a.scheduler.HandlerNode(constants.OnSuccess); node != nil {
status.OnSuccess = model.FromNode(node)
status.OnSuccess = model.FromNode(node.State(), node.Step)
}
if node := a.scheduler.HandlerNode(constants.OnFailure); node != nil {
status.OnFailure = model.FromNode(node)
status.OnFailure = model.FromNode(node.State(), node.Step)
}
if node := a.scheduler.HandlerNode(constants.OnCancel); node != nil {
status.OnCancel = model.FromNode(node)
status.OnCancel = model.FromNode(node.State(), node.Step)
}
return status
}
Expand Down
67 changes: 37 additions & 30 deletions internal/persistence/model/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ type Node struct {
func (n *Node) ToNode() *scheduler.Node {
startedAt, _ := utils.ParseTime(n.StartedAt)
finishedAt, _ := utils.ParseTime(n.FinishedAt)
var err error = nil
if n.Error != "" {
err = fmt.Errorf(n.Error)
}
ret := &scheduler.Node{
return &scheduler.Node{
Step: n.Step,
NodeState: scheduler.NodeState{
Status: n.Status,
Expand All @@ -36,57 +32,68 @@ func (n *Node) ToNode() *scheduler.Node {
FinishedAt: finishedAt,
RetryCount: n.RetryCount,
DoneCount: n.DoneCount,
Error: err,
Error: errFromText(n.Error),
},
}
return ret
}

func FromNode(n *scheduler.Node) *Node {
node := &Node{
Step: n.Step,
func FromNode(n scheduler.NodeState, step *dag.Step) *Node {
return &Node{
Step: step,
Log: n.Log,
StartedAt: utils.FormatTime(n.StartedAt),
FinishedAt: utils.FormatTime(n.FinishedAt),
Status: n.ReadStatus(),
StatusText: n.ReadStatus().String(),
RetryCount: n.ReadRetryCount(),
DoneCount: n.ReadDoneCount(),
Status: n.Status,
StatusText: n.Status.String(),
RetryCount: n.RetryCount,
DoneCount: n.DoneCount,
Error: errText(n.Error),
}
}

func errFromText(err string) error {
if err == "" {
return nil
}
if n.Error != nil {
node.Error = n.Error.Error()
return fmt.Errorf(err)
}

func errText(err error) string {
if err == nil {
return ""
}
return node
return err.Error()
}

func FromNodes(nodes []*scheduler.Node) []*Node {
ret := []*Node{}
var ret []*Node
for _, n := range nodes {
ret = append(ret, FromNode(n))
ret = append(ret, FromNode(n.State(), n.Step))
}
return ret
}

func FromSteps(steps []*dag.Step) []*Node {
ret := []*Node{}
var ret []*Node
for _, s := range steps {
ret = append(ret, fromStepWithDefValues(s))
ret = append(ret, nodeOrNil(s))
}
return ret
}

func fromStepWithDefValues(s *dag.Step) *Node {
if s == nil {
return nil
}
step := &Node{
Step: s,
Log: "",
func NewNode(step *dag.Step) *Node {
return &Node{
Step: step,
StartedAt: "-",
FinishedAt: "-",
Status: scheduler.NodeStatus_None,
StatusText: scheduler.NodeStatus_None.String(),
RetryCount: 0,
}
return step
}

func nodeOrNil(s *dag.Step) *Node {
if s == nil {
return nil
}
return NewNode(s)
}
56 changes: 28 additions & 28 deletions internal/persistence/model/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,52 +70,52 @@ func NewStatus(
nodes []*scheduler.Node,
status scheduler.SchedulerStatus,
pid int,
startTime, endTIme *time.Time,
startTime, endTime *time.Time,
) *Status {
finish, start := time.Time{}, time.Time{}
if startTime != nil {
start = *startTime
}
if endTIme != nil {
finish = *endTIme
}
var models []*Node
if len(nodes) != 0 {
models = FromNodes(nodes)
} else {
models = FromSteps(d.Steps)
}
var onExit, onSuccess, onFailure, onCancel *Node
onExit = fromStepWithDefValues(d.HandlerOn.Exit)
onSuccess = fromStepWithDefValues(d.HandlerOn.Success)
onFailure = fromStepWithDefValues(d.HandlerOn.Failure)
onCancel = fromStepWithDefValues(d.HandlerOn.Cancel)
onExit = nodeOrNil(d.HandlerOn.Exit)
onSuccess = nodeOrNil(d.HandlerOn.Success)
onFailure = nodeOrNil(d.HandlerOn.Failure)
onCancel = nodeOrNil(d.HandlerOn.Cancel)
return &Status{
RequestId: "",
Name: d.Name,
Status: status,
StatusText: status.String(),
Pid: Pid(pid),
Nodes: models,
Nodes: nodesOrSteps(nodes, d.Steps),
OnExit: onExit,
OnSuccess: onSuccess,
OnFailure: onFailure,
OnCancel: onCancel,
StartedAt: utils.FormatTime(start),
FinishedAt: utils.FormatTime(finish),
StartedAt: formatTime(startTime),
FinishedAt: formatTime(endTime),
Params: strings.Join(d.Params, " "),
}
}

func (sts *Status) CorrectRunningStatus() {
if sts.Status == scheduler.SchedulerStatus_Running {
sts.Status = scheduler.SchedulerStatus_Error
sts.StatusText = sts.Status.String()
func nodesOrSteps(nodes []*scheduler.Node, steps []*dag.Step) []*Node {
if len(nodes) != 0 {
return FromNodes(nodes)
}
return FromSteps(steps)
}

func formatTime(val *time.Time) string {
if val == nil {
return ""
}
return utils.FormatTime(*val)
}

func (st *Status) CorrectRunningStatus() {
if st.Status == scheduler.SchedulerStatus_Running {
st.Status = scheduler.SchedulerStatus_Error
st.StatusText = st.Status.String()
}
}

func (sts *Status) ToJson() ([]byte, error) {
js, err := json.Marshal(sts)
func (st *Status) ToJson() ([]byte, error) {
js, err := json.Marshal(st)
if err != nil {
return []byte{}, err
}
Expand Down
6 changes: 6 additions & 0 deletions internal/scheduler/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ type NodeState struct {
Error error
}

func (n *Node) State() NodeState {
n.mu.RLock()
defer n.mu.RUnlock()
return n.NodeState
}

// Execute runs the command synchronously and returns error if any.
func (n *Node) Execute(ctx context.Context) error {
ctx, fn := context.WithCancel(ctx)
Expand Down

0 comments on commit 92813e4

Please sign in to comment.