Skip to content

Commit

Permalink
Merge pull request #59 from spiral/feature/fsm
Browse files Browse the repository at this point in the history
Feature/fsm
  • Loading branch information
rustatian authored Nov 3, 2020
2 parents 035d73a + fc7374c commit 39347f3
Show file tree
Hide file tree
Showing 30 changed files with 1,430 additions and 891 deletions.
21 changes: 21 additions & 0 deletions LICENCE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2020 Spiral Scout

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
12 changes: 6 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
test:
go test -v -race ./tests/backoff
go test -v -race ./tests/happy_scenarios
go test -v -race ./tests/interfaces
go test -v -race ./tests/issues
go test -v -race ./tests/stress
go test -v -race ./tests/disabled_vertices
go test -v -race -tags=debug ./tests/backoff
go test -v -race -tags=debug ./tests/happy_scenarios
go test -v -race -tags=debug ./tests/interfaces
go test -v -race -tags=debug ./tests/issues
go test -v -race -tags=debug ./tests/stress
go test -v -race -tags=debug ./tests/disabled_vertices
31 changes: 15 additions & 16 deletions calculate_deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"reflect"

"github.com/spiral/endure/structures"
"github.com/spiral/errors"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -32,17 +31,17 @@ func (e *Endure) addProviders(vertexID string, vertex interface{}) error {
// get the Vertex from the graph (gVertex)
gVertex := e.graph.GetVertex(vertexID)
if gVertex.Provides == nil {
gVertex.Provides = make(map[string]structures.ProvidedEntry)
gVertex.Provides = make(map[string]ProvidedEntry)
}

// Make a slice
if gVertex.Meta.FnsProviderToInvoke == nil {
gVertex.Meta.FnsProviderToInvoke = make([]structures.ProviderEntry, 0, 1)
gVertex.Meta.FnsProviderToInvoke = make([]ProviderEntry, 0, 1)
}

// TODO merge function calls into one. Plugin1 -> fn's to invoke ProvideDB, ProvideDB2
// Append functions which we will invoke when we start calling the structure functions after Init stage
gVertex.Meta.FnsProviderToInvoke = append(gVertex.Meta.FnsProviderToInvoke, structures.ProviderEntry{
gVertex.Meta.FnsProviderToInvoke = append(gVertex.Meta.FnsProviderToInvoke, ProviderEntry{
/*
For example:
we need to invoke function ProvideDB - that will be FunctionName
Expand All @@ -64,13 +63,13 @@ func (e *Endure) addProviders(vertexID string, vertex interface{}) error {
if reflect.TypeOf(vertex).Implements(ret) {
tmpValue := reflect.ValueOf(vertex)
tmpIsRef := isReference(ret)
gVertex.Provides[typeStr] = structures.ProvidedEntry{
gVertex.Provides[typeStr] = ProvidedEntry{
IsReference: &tmpIsRef,
Value: &tmpValue,
}
}
} else {
gVertex.Provides[typeStr] = structures.ProvidedEntry{
gVertex.Provides[typeStr] = ProvidedEntry{
IsReference: nil,
Value: nil,
}
Expand All @@ -82,19 +81,19 @@ func (e *Endure) addProviders(vertexID string, vertex interface{}) error {

// addEdges calculates simple graph for the dependencies
func (e *Endure) addEdges() error {
const Op = "add_edges"
const Op = errors.Op("add_edges")
// vertexID for example S2
for vertexID, vrtx := range e.graph.VerticesMap {
// we already checked the interface satisfaction
// and we can safely skip the OK parameter here
init, _ := reflect.TypeOf(vrtx.Iface).MethodByName(InitMethodName)

if init.Type == nil {
e.logger.Fatal("init method is absent in struct", zap.String("vertex id", vertexID))
return errors.E(Op, fmt.Errorf("init method is absent in struct"))
e.logger.Fatal("internal_init method is absent in struct", zap.String("vertex id", vertexID))
return errors.E(Op, fmt.Errorf("internal_init method is absent in struct"))
}

/* Add the dependencies (if) which this vertex needs to init
/* Add the dependencies (if) which this vertex needs to internal_init
Information we know at this step is:
1. vertexID
2. Vertex structure value (interface)
Expand Down Expand Up @@ -124,7 +123,7 @@ func (e *Endure) addEdges() error {
}

func (e *Endure) addCollectorsDeps(vertexID string, vertex interface{}) error {
const Op = "add_collectors_deps"
const Op = errors.Op("add_collectors_deps")
if register, ok := vertex.(Collector); ok {
for _, fn := range register.Collects() {
// what type it might depend on?
Expand Down Expand Up @@ -167,7 +166,7 @@ func (e *Endure) addCollectorsDeps(vertexID string, vertex interface{}) error {
// vertex - S4 func

// we store pointer in the Deps structure in the isRef field
err = e.graph.AddDep(vertexID, removePointerAsterisk(atStr), structures.Collects, isReference(at), at.Kind())
err = e.graph.AddDep(vertexID, removePointerAsterisk(atStr), Collects, isReference(at), at.Kind())
if err != nil {
return errors.E(Op, err)
}
Expand All @@ -177,7 +176,7 @@ func (e *Endure) addCollectorsDeps(vertexID string, vertex interface{}) error {
// get the Vertex from the graph (gVertex)
gVertex := e.graph.GetVertex(vertexID)
if gVertex.Provides == nil {
gVertex.Provides = make(map[string]structures.ProvidedEntry)
gVertex.Provides = make(map[string]ProvidedEntry)
}

if gVertex.Meta.FnsCollectorToInvoke == nil {
Expand All @@ -194,7 +193,7 @@ func (e *Endure) addCollectorsDeps(vertexID string, vertex interface{}) error {
}

func (e *Endure) addInitDeps(vertexID string, initMethod reflect.Method) error {
const Op = "add_init_deps"
const Op = errors.Op("add_init_deps")
// Init function in arguments
initArgs := functionParameters(initMethod)

Expand Down Expand Up @@ -223,11 +222,11 @@ func (e *Endure) addInitDeps(vertexID string, initMethod reflect.Method) error {
}
}

err := e.graph.AddDep(vertexID, removePointerAsterisk(initArg.String()), structures.Init, isReference(initArg), initArg.Kind())
err := e.graph.AddDep(vertexID, removePointerAsterisk(initArg.String()), Init, isReference(initArg), initArg.Kind())
if err != nil {
return errors.E(Op, err)
}
e.logger.Debug("adding dependency via Init()", zap.String("vertex id", vertexID), zap.String("depends", initArg.String()))
e.logger.Debug("adding dependency via Init()", zap.String("vertex id", vertexID), zap.String("depends on", initArg.String()))
}
return nil
}
130 changes: 130 additions & 0 deletions common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package endure

import (
"reflect"

"github.com/cenkalti/backoff/v4"
"github.com/spiral/errors"
"go.uber.org/zap"
)

func (e *Endure) sendResultToUser(res *result) {
e.userResultsCh <- &Result{
Error: res.err,
VertexID: res.vertexID,
}
}

// traverseBackStop used to visit every Prev node and internalStop vertices
func (e *Endure) traverseBackStop(n *DllNode) {
const op = errors.Op("traverse_back_stop")
e.logger.Debug("stopping vertex in the first Serve call", zap.String("vertex id", n.Vertex.ID))
nCopy := n
err := e.shutdown(nCopy, false)
if err != nil {
nCopy.Vertex.SetState(Error)
// ignore errors from internal_stop
e.logger.Error("failed to traverse vertex back", zap.String("vertex id", nCopy.Vertex.ID), zap.Error(errors.E(op, err)))
}
}

func (e *Endure) retryHandler(res *result) {
const op = errors.Op("internal_retry_handler")
// get vertex from the graph
vertex := e.graph.GetVertex(res.vertexID)
if vertex == nil {
e.logger.Error("failed to get vertex from the graph, vertex is nil", zap.String("vertex id from the handleErrorCh channel", res.vertexID))
e.userResultsCh <- &Result{
Error: errors.E(op, errors.Traverse, errors.Str("failed to get vertex from the graph, vertex is nil")),
VertexID: res.vertexID,
}
return
}

// stop without setting Stopped state to the Endure
n := e.runList.Head
err := e.shutdown(n, true)
if err != nil {
e.logger.Error("error happened during shutdown", zap.Error(err))
}

// reset vertex and dependencies to the initial state
// numOfDeps and visited/visiting
vertices := e.graph.Reset(vertex)

// Topologically sort the graph
sorted, err := TopologicalSort(vertices)
if err != nil {
e.logger.Error("error sorting the graph", zap.Error(err))
return
}
if sorted == nil {
e.logger.Error("sorted list should not be nil", zap.String("vertex id from the handleErrorCh channel", res.vertexID))
e.userResultsCh <- &Result{
Error: errors.E(op, errors.Traverse, errors.Str("failed to topologically sort the graph")),
VertexID: res.vertexID,
}
return
}

// Init backoff
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = e.maxInterval
b.InitialInterval = e.initialInterval

affectedRunList := NewDoublyLinkedList()
for i := 0; i <= len(sorted)-1; i++ {
affectedRunList.Push(sorted[i])
}

// call internal_init
headCopy := affectedRunList.Head
for headCopy != nil {
berr := backoff.Retry(e.backoffInit(headCopy.Vertex), b)
if berr != nil {
e.logger.Error("backoff failed", zap.String("vertex id", headCopy.Vertex.ID), zap.Error(berr))
e.userResultsCh <- &Result{
Error: errors.E(op, errors.FunctionCall, errors.Errorf("error during the Init function call")),
VertexID: headCopy.Vertex.ID,
}
return
}
headCopy = headCopy.Next
}

// call serveInternal
headCopy = affectedRunList.Head
for headCopy != nil {
err := e.serveInternal(headCopy)
if err != nil {
e.userResultsCh <- &Result{
Error: errors.E(op, errors.FunctionCall, errors.Errorf("error during the Serve function call")),
VertexID: headCopy.Vertex.ID,
}
e.logger.Error("fatal error during the serveInternal in the main thread", zap.String("vertex id", headCopy.Vertex.ID), zap.Error(err))
return
}
headCopy = headCopy.Next
}

e.sendResultToUser(res)
}

func (e *Endure) backoffInit(v *Vertex) func() error {
return func() error {
const op = errors.Op("internal_backoff_init")
// we already checked the Interface satisfaction
// at this step absence of Init() is impossible
init, _ := reflect.TypeOf(v.Iface).MethodByName(InitMethodName)
v.SetState(Initializing)
err := e.callInitFn(init, v)
if err != nil {
v.SetState(Error)
e.logger.Error("error occurred during the call INIT function", zap.String("vertex id", v.ID), zap.Error(err))
return errors.E(op, errors.FunctionCall, err)
}

v.SetState(Initialized)
return nil
}
}
5 changes: 1 addition & 4 deletions container.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ type Result struct {
VertexID string
}

type notify struct {
// stop used to notify vertex goroutine, that we need to stop vertex and return from goroutine
stop bool
}
type notify struct{}

type result struct {
// error channel from vertex
Expand Down
30 changes: 30 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package endure

/*
MIT License
Copyright (c) 2020 Spiral Scout
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
Loading

0 comments on commit 39347f3

Please sign in to comment.