diff --git a/LICENCE b/LICENCE new file mode 100644 index 0000000..9583b66 --- /dev/null +++ b/LICENCE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 Spiral Scout + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/Makefile b/Makefile index 36310ac..9366176 100755 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ test: - go test -v -race ./tests/backoff - go test -v -race ./tests/happy_scenarios - go test -v -race ./tests/interfaces - go test -v -race ./tests/issues - go test -v -race ./tests/stress - go test -v -race ./tests/disabled_vertices + go test -v -race -tags=debug ./tests/backoff + go test -v -race -tags=debug ./tests/happy_scenarios + go test -v -race -tags=debug ./tests/interfaces + go test -v -race -tags=debug ./tests/issues + go test -v -race -tags=debug ./tests/stress + go test -v -race -tags=debug ./tests/disabled_vertices diff --git a/calculate_deps.go b/calculate_deps.go index dbeea23..937060d 100755 --- a/calculate_deps.go +++ b/calculate_deps.go @@ -4,7 +4,6 @@ import ( "fmt" "reflect" - "github.com/spiral/endure/structures" "github.com/spiral/errors" "go.uber.org/zap" ) @@ -32,17 +31,17 @@ func (e *Endure) addProviders(vertexID string, vertex interface{}) error { // get the Vertex from the graph (gVertex) gVertex := e.graph.GetVertex(vertexID) if gVertex.Provides == nil { - gVertex.Provides = make(map[string]structures.ProvidedEntry) + gVertex.Provides = make(map[string]ProvidedEntry) } // Make a slice if gVertex.Meta.FnsProviderToInvoke == nil { - gVertex.Meta.FnsProviderToInvoke = make([]structures.ProviderEntry, 0, 1) + gVertex.Meta.FnsProviderToInvoke = make([]ProviderEntry, 0, 1) } // TODO merge function calls into one. Plugin1 -> fn's to invoke ProvideDB, ProvideDB2 // Append functions which we will invoke when we start calling the structure functions after Init stage - gVertex.Meta.FnsProviderToInvoke = append(gVertex.Meta.FnsProviderToInvoke, structures.ProviderEntry{ + gVertex.Meta.FnsProviderToInvoke = append(gVertex.Meta.FnsProviderToInvoke, ProviderEntry{ /* For example: we need to invoke function ProvideDB - that will be FunctionName @@ -64,13 +63,13 @@ func (e *Endure) addProviders(vertexID string, vertex interface{}) error { if reflect.TypeOf(vertex).Implements(ret) { tmpValue := reflect.ValueOf(vertex) tmpIsRef := isReference(ret) - gVertex.Provides[typeStr] = structures.ProvidedEntry{ + gVertex.Provides[typeStr] = ProvidedEntry{ IsReference: &tmpIsRef, Value: &tmpValue, } } } else { - gVertex.Provides[typeStr] = structures.ProvidedEntry{ + gVertex.Provides[typeStr] = ProvidedEntry{ IsReference: nil, Value: nil, } @@ -82,7 +81,7 @@ func (e *Endure) addProviders(vertexID string, vertex interface{}) error { // addEdges calculates simple graph for the dependencies func (e *Endure) addEdges() error { - const Op = "add_edges" + const Op = errors.Op("add_edges") // vertexID for example S2 for vertexID, vrtx := range e.graph.VerticesMap { // we already checked the interface satisfaction @@ -90,11 +89,11 @@ func (e *Endure) addEdges() error { init, _ := reflect.TypeOf(vrtx.Iface).MethodByName(InitMethodName) if init.Type == nil { - e.logger.Fatal("init method is absent in struct", zap.String("vertex id", vertexID)) - return errors.E(Op, fmt.Errorf("init method is absent in struct")) + e.logger.Fatal("internal_init method is absent in struct", zap.String("vertex id", vertexID)) + return errors.E(Op, fmt.Errorf("internal_init method is absent in struct")) } - /* Add the dependencies (if) which this vertex needs to init + /* Add the dependencies (if) which this vertex needs to internal_init Information we know at this step is: 1. vertexID 2. Vertex structure value (interface) @@ -124,7 +123,7 @@ func (e *Endure) addEdges() error { } func (e *Endure) addCollectorsDeps(vertexID string, vertex interface{}) error { - const Op = "add_collectors_deps" + const Op = errors.Op("add_collectors_deps") if register, ok := vertex.(Collector); ok { for _, fn := range register.Collects() { // what type it might depend on? @@ -167,7 +166,7 @@ func (e *Endure) addCollectorsDeps(vertexID string, vertex interface{}) error { // vertex - S4 func // we store pointer in the Deps structure in the isRef field - err = e.graph.AddDep(vertexID, removePointerAsterisk(atStr), structures.Collects, isReference(at), at.Kind()) + err = e.graph.AddDep(vertexID, removePointerAsterisk(atStr), Collects, isReference(at), at.Kind()) if err != nil { return errors.E(Op, err) } @@ -177,7 +176,7 @@ func (e *Endure) addCollectorsDeps(vertexID string, vertex interface{}) error { // get the Vertex from the graph (gVertex) gVertex := e.graph.GetVertex(vertexID) if gVertex.Provides == nil { - gVertex.Provides = make(map[string]structures.ProvidedEntry) + gVertex.Provides = make(map[string]ProvidedEntry) } if gVertex.Meta.FnsCollectorToInvoke == nil { @@ -194,7 +193,7 @@ func (e *Endure) addCollectorsDeps(vertexID string, vertex interface{}) error { } func (e *Endure) addInitDeps(vertexID string, initMethod reflect.Method) error { - const Op = "add_init_deps" + const Op = errors.Op("add_init_deps") // Init function in arguments initArgs := functionParameters(initMethod) @@ -223,11 +222,11 @@ func (e *Endure) addInitDeps(vertexID string, initMethod reflect.Method) error { } } - err := e.graph.AddDep(vertexID, removePointerAsterisk(initArg.String()), structures.Init, isReference(initArg), initArg.Kind()) + err := e.graph.AddDep(vertexID, removePointerAsterisk(initArg.String()), Init, isReference(initArg), initArg.Kind()) if err != nil { return errors.E(Op, err) } - e.logger.Debug("adding dependency via Init()", zap.String("vertex id", vertexID), zap.String("depends", initArg.String())) + e.logger.Debug("adding dependency via Init()", zap.String("vertex id", vertexID), zap.String("depends on", initArg.String())) } return nil } diff --git a/common.go b/common.go new file mode 100755 index 0000000..a5f8050 --- /dev/null +++ b/common.go @@ -0,0 +1,130 @@ +package endure + +import ( + "reflect" + + "github.com/cenkalti/backoff/v4" + "github.com/spiral/errors" + "go.uber.org/zap" +) + +func (e *Endure) sendResultToUser(res *result) { + e.userResultsCh <- &Result{ + Error: res.err, + VertexID: res.vertexID, + } +} + +// traverseBackStop used to visit every Prev node and internalStop vertices +func (e *Endure) traverseBackStop(n *DllNode) { + const op = errors.Op("traverse_back_stop") + e.logger.Debug("stopping vertex in the first Serve call", zap.String("vertex id", n.Vertex.ID)) + nCopy := n + err := e.shutdown(nCopy, false) + if err != nil { + nCopy.Vertex.SetState(Error) + // ignore errors from internal_stop + e.logger.Error("failed to traverse vertex back", zap.String("vertex id", nCopy.Vertex.ID), zap.Error(errors.E(op, err))) + } +} + +func (e *Endure) retryHandler(res *result) { + const op = errors.Op("internal_retry_handler") + // get vertex from the graph + vertex := e.graph.GetVertex(res.vertexID) + if vertex == nil { + e.logger.Error("failed to get vertex from the graph, vertex is nil", zap.String("vertex id from the handleErrorCh channel", res.vertexID)) + e.userResultsCh <- &Result{ + Error: errors.E(op, errors.Traverse, errors.Str("failed to get vertex from the graph, vertex is nil")), + VertexID: res.vertexID, + } + return + } + + // stop without setting Stopped state to the Endure + n := e.runList.Head + err := e.shutdown(n, true) + if err != nil { + e.logger.Error("error happened during shutdown", zap.Error(err)) + } + + // reset vertex and dependencies to the initial state + // numOfDeps and visited/visiting + vertices := e.graph.Reset(vertex) + + // Topologically sort the graph + sorted, err := TopologicalSort(vertices) + if err != nil { + e.logger.Error("error sorting the graph", zap.Error(err)) + return + } + if sorted == nil { + e.logger.Error("sorted list should not be nil", zap.String("vertex id from the handleErrorCh channel", res.vertexID)) + e.userResultsCh <- &Result{ + Error: errors.E(op, errors.Traverse, errors.Str("failed to topologically sort the graph")), + VertexID: res.vertexID, + } + return + } + + // Init backoff + b := backoff.NewExponentialBackOff() + b.MaxElapsedTime = e.maxInterval + b.InitialInterval = e.initialInterval + + affectedRunList := NewDoublyLinkedList() + for i := 0; i <= len(sorted)-1; i++ { + affectedRunList.Push(sorted[i]) + } + + // call internal_init + headCopy := affectedRunList.Head + for headCopy != nil { + berr := backoff.Retry(e.backoffInit(headCopy.Vertex), b) + if berr != nil { + e.logger.Error("backoff failed", zap.String("vertex id", headCopy.Vertex.ID), zap.Error(berr)) + e.userResultsCh <- &Result{ + Error: errors.E(op, errors.FunctionCall, errors.Errorf("error during the Init function call")), + VertexID: headCopy.Vertex.ID, + } + return + } + headCopy = headCopy.Next + } + + // call serveInternal + headCopy = affectedRunList.Head + for headCopy != nil { + err := e.serveInternal(headCopy) + if err != nil { + e.userResultsCh <- &Result{ + Error: errors.E(op, errors.FunctionCall, errors.Errorf("error during the Serve function call")), + VertexID: headCopy.Vertex.ID, + } + e.logger.Error("fatal error during the serveInternal in the main thread", zap.String("vertex id", headCopy.Vertex.ID), zap.Error(err)) + return + } + headCopy = headCopy.Next + } + + e.sendResultToUser(res) +} + +func (e *Endure) backoffInit(v *Vertex) func() error { + return func() error { + const op = errors.Op("internal_backoff_init") + // we already checked the Interface satisfaction + // at this step absence of Init() is impossible + init, _ := reflect.TypeOf(v.Iface).MethodByName(InitMethodName) + v.SetState(Initializing) + err := e.callInitFn(init, v) + if err != nil { + v.SetState(Error) + e.logger.Error("error occurred during the call INIT function", zap.String("vertex id", v.ID), zap.Error(err)) + return errors.E(op, errors.FunctionCall, err) + } + + v.SetState(Initialized) + return nil + } +} diff --git a/container.go b/container.go index 3334723..535cd0c 100755 --- a/container.go +++ b/container.go @@ -15,10 +15,7 @@ type Result struct { VertexID string } -type notify struct { - // stop used to notify vertex goroutine, that we need to stop vertex and return from goroutine - stop bool -} +type notify struct{} type result struct { // error channel from vertex diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..8c4b264 --- /dev/null +++ b/doc.go @@ -0,0 +1,30 @@ +package endure + +/* + +MIT License + +Copyright (c) 2020 Spiral Scout + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + + + + +*/ diff --git a/endure.go b/endure.go index 873a4f5..daf8069 100755 --- a/endure.go +++ b/endure.go @@ -10,7 +10,6 @@ import ( "sync" "time" - "github.com/spiral/endure/structures" "github.com/spiral/errors" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -18,6 +17,15 @@ import ( var order int = 1 +const ( + // InitializeMethodName is the method name to invoke in transition map + InitializeMethodName = "Initialize" + // StartMethodName is the method name to invoke in transition map + StartMethodName = "Start" + // ShutdownMethodName is the method name to invoke in transition map + ShutdownMethodName = "Shutdown" +) + // A Level is a logging priority. Higher levels are more important. type Level int8 @@ -44,9 +52,9 @@ const ( type Endure struct { // Dependency graph - graph *structures.Graph + graph *Graph // DLL used as run list to run in order - runList *structures.DoublyLinkedList + runList *DoublyLinkedList // logger logger *zap.Logger // OPTIONS @@ -54,9 +62,12 @@ type Endure struct { retry bool maxInterval time.Duration initialInterval time.Duration - // option to visualize resulted (before init) graph + stopTimeout time.Duration + // option to visualize resulted (before internalInit) graph visualize bool + FSM + mutex *sync.RWMutex // result always points on healthy channel associated with vertex @@ -81,15 +92,59 @@ type Options func(endure *Endure) 7 - Disabled disables the logger. see the endure.Level */ -func NewContainer(logLevel Level, options ...Options) (*Endure, error) { +func NewContainer(logLevel Level, logger *zap.Logger, options ...Options) (*Endure, error) { const op = errors.Op("NewContainer") c := &Endure{ mutex: &sync.RWMutex{}, initialInterval: time.Second * 1, maxInterval: time.Second * 60, results: sync.Map{}, + stopTimeout: time.Second * 10, } + // Transition map + transitionMap := make(map[Event]reflect.Method) + init, _ := reflect.TypeOf(c).MethodByName(InitializeMethodName) + // event -> Initialize + transitionMap[Initialize] = init + + serve, _ := reflect.TypeOf(c).MethodByName(StartMethodName) + // event -> Start + transitionMap[Start] = serve + + shutdown, _ := reflect.TypeOf(c).MethodByName(ShutdownMethodName) + // event -> Stop + transitionMap[Stop] = shutdown + + c.FSM = NewFSM(Uninitialized, transitionMap) + + if logger == nil { + log, err := internalLogger(logLevel) + if err != nil { + return nil, errors.E(op, err) + } + c.logger = log + } else { + c.logger = logger + } + + c.graph = NewGraph() + c.runList = NewDoublyLinkedList() + + // Main thread channels + c.handleErrorCh = make(chan *result) + c.userResultsCh = make(chan *Result) + + // append options + for _, option := range options { + option(c) + } + + return c, nil +} + +func internalLogger(logLevel Level) (*zap.Logger, error) { + const op = errors.Op("internal_logger") var lvl zap.AtomicLevel switch logLevel { case DebugLevel: @@ -133,22 +188,8 @@ func NewContainer(logLevel Level, options ...Options) (*Endure, error) { if err != nil { return nil, errors.E(op, errors.Logger, err) } - c.logger = logger - - c.graph = structures.NewGraph() - c.runList = structures.NewDoublyLinkedList() - c.logger = logger - - // Main thread channels - c.handleErrorCh = make(chan *result) - c.userResultsCh = make(chan *Result) - - // append options - for _, option := range options { - option(c) - } - return c, nil + return logger, nil } func pprof() { @@ -176,6 +217,12 @@ func Visualize(print bool) Options { } } +func SetStopTimeOut(to time.Duration) Options { + return func(endure *Endure) { + endure.stopTimeout = to + } +} + // Register registers the dependencies in the Endure graph without invoking any methods func (e *Endure) Register(vertex interface{}) error { const op = errors.Op("Register") @@ -216,6 +263,36 @@ func (e *Endure) Register(vertex interface{}) error { // Init container and all service edges. func (e *Endure) Init() error { + _, err := e.Transition(Initialize, e) + if err != nil { + return err + } + return nil +} + +// Serve starts serving the graph +// This is the initial serveInternal, if error produced immediately in the initial serveInternal, endure will traverse deps back, call internal_stop and exit +func (e *Endure) Serve() (<-chan *Result, error) { + data, err := e.Transition(Start, e) + if err != nil { + return nil, err + } + // god save this construction + return data.(<-chan *Result), nil +} + +// Stop stops the execution and call Stop on every vertex +func (e *Endure) Stop() error { + _, err := e.Transition(Stop, e) + if err != nil { + return err + } + return nil +} + +// Initialize used to add edges between vertices, sort graph topologically +// Do not change this method name, sync with constants in the beginning of this file +func (e *Endure) Initialize() error { const op = errors.Op("Init") // traverse the graph err := e.addEdges() @@ -226,14 +303,14 @@ func (e *Endure) Init() error { // if failed - continue, just send warning to a user // visualize is not critical if e.visualize { - err = structures.Visualize(e.graph.Vertices) + err = e.Visualize(e.graph.Vertices) if err != nil { e.logger.Warn("failed to visualize the graph", zap.Error(err)) } } - // we should build init list in the reverse order - sorted, err := structures.TopologicalSort(e.graph.Vertices) + // we should build internal_init list in the reverse order + sorted, err := TopologicalSort(e.graph.Vertices) if err != nil { e.logger.Error("error sorting the graph", zap.Error(err)) return errors.E(op, errors.Init, err) @@ -244,7 +321,7 @@ func (e *Endure) Init() error { return errors.E(op, errors.Init, errors.Errorf("graph should contain at least 1 vertex, possibly you forget to invoke registers")) } - e.runList = structures.NewDoublyLinkedList() + e.runList = NewDoublyLinkedList() for i := len(sorted) - 1; i >= 0; i-- { e.runList.Push(sorted[i]) } @@ -252,20 +329,23 @@ func (e *Endure) Init() error { head := e.runList.Head headCopy := head for headCopy != nil { - err = e.init(headCopy.Vertex) + headCopy.Vertex.SetState(Initializing) + err = e.internalInit(headCopy.Vertex) if err != nil { - e.logger.Error("error during the init", zap.Error(err)) + headCopy.Vertex.SetState(Error) + e.logger.Error("error during the internal_init", zap.Error(err)) return errors.E(op, errors.Init, err) } + headCopy.Vertex.SetState(Initialized) headCopy = headCopy.Next } return nil } -// Serve starts serving the graph -// This is the initial serve, if error produced immediately in the initial serve, endure will traverse deps back, call stop and exit -func (e *Endure) Serve() (<-chan *Result, error) { +// Start used to start serving vertices +// Do not change this method name, sync with constants in the beginning of this file +func (e *Endure) Start() (<-chan *Result, error) { e.mutex.Lock() defer e.mutex.Unlock() @@ -282,27 +362,27 @@ func (e *Endure) Serve() (<-chan *Result, error) { continue } atLeastOne = true - err := e.serve(nCopy) + nCopy.Vertex.SetState(Starting) + err := e.serveInternal(nCopy) if err != nil { + nCopy.Vertex.SetState(Error) e.traverseBackStop(nCopy) return nil, errors.E(op, errors.Serve, err) } + nCopy.Vertex.SetState(Started) nCopy = nCopy.Next } // all vertices disabled if atLeastOne == false { - return nil, errors.E(op, errors.Disabled, errors.Str("all vertices disabled, nothing to serve")) + return nil, errors.E(op, errors.Disabled, errors.Str("all vertices disabled, nothing to serveInternal")) } return e.userResultsCh, nil } -// Stop stops the execution and call Stop on every vertex -func (e *Endure) Stop() error { - e.mutex.Lock() - defer e.mutex.Unlock() - +// Shutdown used to shutdown the Endure +// Do not change this method name, sync with constants in the beginning of this file +func (e *Endure) Shutdown() error { e.logger.Info("exiting from the Endure") n := e.runList.Head - e.shutdown(n) - return nil + return e.shutdown(n, true) } diff --git a/fsm.go b/fsm.go new file mode 100644 index 0000000..cccfebb --- /dev/null +++ b/fsm.go @@ -0,0 +1,207 @@ +package endure + +import ( + "reflect" + "sync" + "sync/atomic" + + "github.com/spiral/errors" +) + +type FSM interface { + CurrentState() State + InitialState(st State) + Transition(event Event, args ...interface{}) (interface{}, error) +} + +func NewFSM(initialState State, callbacks map[Event]reflect.Method) FSM { + st := uint32(initialState) + return &FSMImpl{ + callbacks: callbacks, + currentState: &st, + } +} + +type FSMImpl struct { + mutex sync.Mutex + currentState *uint32 + callbacks map[Event]reflect.Method +} + +type Event uint32 + +func (ev Event) String() string { + switch ev { + case Initialize: + return "Initialize" + case Start: + return "Start" + case Stop: + return "Stop" + default: + return "Unknown event" + } +} + +const ( + Initialize Event = iota // Init func + Start // Serve func + Stop // Stop func +) + +type State uint32 + +func (st State) String() string { + switch st { + case Uninitialized: + return "Uninitialized" + case Initializing: + return "Initializing" + case Initialized: + return "Initialized" + case Starting: + return "Starting" + case Started: + return "Started" + case Stopping: + return "Stopping" + case Stopped: + return "Stopped" + case Error: + return "Error" + default: + return "Unknown state" + } +} + +const ( + Uninitialized State = iota + Initializing + Initialized + Starting + Started + Stopping + Stopped + Error // ?? +) + +// Acceptors (also called detectors or recognizers) produce binary output, +// indicating whether or not the received input is accepted. +// Each event of an acceptor is either accepting or non accepting. +func (f *FSMImpl) recognizer(event Event) error { + const op = errors.Op("recognizer") + switch event { + case Initialize: + if f.current() == Uninitialized || f.current() == Error { + return nil + } + return errors.E(op, errors.Errorf("can't transition from state: %s by event %s", f.current(), event)) + case Start: + if f.current() != Initialized { + return errors.E(op, errors.Errorf("can't transition from state: %s by event %s", f.current(), event)) + } + case Stop: + if f.current() == Started || f.current() == Error { + return nil + } + return errors.E(op, errors.Errorf("can't transition from state: %s by event %s", f.current(), event)) + } + + return nil +} + +// SetState sets state +func (f *FSMImpl) set(st State) { + atomic.StoreUint32(f.currentState, uint32(st)) +} + +// CurrentState returns current state +func (f *FSMImpl) current() State { + return State(atomic.LoadUint32(f.currentState)) +} + +func (f *FSMImpl) InitialState(st State) { + f.set(st) +} + +func (f *FSMImpl) CurrentState() State { + return f.current() +} + +/* +Rules: +Transition table: +Event -> Init. Error on other events (Start, Stop) +1. Initializing -> Initialized +Event -> Start. Error on other events (Initialize, Stop) +2. Starting -> Started +Event -> Stop. Error on other events (Start, Initialize) +3. Stopping -> Stopped +*/ +func (f *FSMImpl) Transition(event Event, args ...interface{}) (interface{}, error) { + f.mutex.Lock() + defer f.mutex.Unlock() + err := f.recognizer(event) + if err != nil { + return nil, err + } + + switch event { + case Initialize: + f.set(Initializing) + method := f.callbacks[event] + values := make([]reflect.Value, 0, len(args)) + for _, v := range args { + values = append(values, reflect.ValueOf(v)) + } + + ret := method.Func.Call(values) + if ret[0].Interface() != nil { + if ret[0].Interface().(error) != nil { + f.set(Error) + return nil, ret[0].Interface().(error) + } + } + + f.set(Initialized) + return nil, nil + case Start: + f.set(Starting) + method := f.callbacks[event] + values := make([]reflect.Value, 0, len(args)) + for _, v := range args { + values = append(values, reflect.ValueOf(v)) + } + + ret := method.Func.Call(values) + if ret[1].Interface() != nil { + if ret[1].Interface().(error) != nil { + f.set(Error) + return nil, ret[1].Interface().(error) + } + } + + f.set(Started) + return ret[0].Interface(), nil + case Stop: + f.set(Stopping) + method := f.callbacks[event] + values := make([]reflect.Value, 0, len(args)) + for _, v := range args { + values = append(values, reflect.ValueOf(v)) + } + + ret := method.Func.Call(values) + if ret[0].Interface() != nil { + if ret[0].Interface().(error) != nil { + f.set(Error) + return nil, ret[0].Interface().(error) + } + } + + f.set(Stopped) + return nil, nil + default: + return nil, errors.E("can't be here") + } +} diff --git a/go.mod b/go.mod index 7aca5c8..4168fa1 100755 --- a/go.mod +++ b/go.mod @@ -5,8 +5,7 @@ go 1.15 require ( github.com/cenkalti/backoff/v4 v4.1.0 github.com/goccy/go-graphviz v0.0.8 - github.com/spiral/errors v0.0.1 + github.com/spiral/errors v1.0.1 github.com/stretchr/testify v1.6.1 go.uber.org/zap v1.16.0 - golang.org/x/image v0.0.0-20200927104501-e162460cd6b5 // indirect ) diff --git a/go.sum b/go.sum index 5bd6d23..b9bd02f 100755 --- a/go.sum +++ b/go.sum @@ -2,6 +2,7 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc= github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= +github.com/corona10/goimagehash v1.0.2 h1:pUfB0LnsJASMPGEZLj7tGY251vF+qLGqOgEP4rUs6kA= github.com/corona10/goimagehash v1.0.2/go.mod h1:/l9umBhvcHQXVtQO1V6Gp1yD20STawkhRnnX0D1bvVI= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -21,6 +22,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5 h1:BvoENQQU+fZ9uukda/RzCAL/191HHwJA5b13R6diVlY= github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -29,8 +31,9 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/spiral/errors v0.0.1 h1:KVLYwAZyXdVBjy8acLue8YK894jsDDriZVuYqypW4gg= -github.com/spiral/errors v0.0.1/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= +github.com/spiral/errors v1.0.0 h1:wdmJqAFY2Uf8KFK6b8Wkh6vxX++2+GwJWAUplrNTLD0= +github.com/spiral/errors v1.0.0/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= +github.com/spiral/errors v1.0.1/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -49,8 +52,6 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/image v0.0.0-20200119044424-58c23975cae1 h1:5h3ngYt7+vXCDZCup/HkCQgW5XwmSvR/nA2JmJ0RErg= golang.org/x/image v0.0.0-20200119044424-58c23975cae1/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= -golang.org/x/image v0.0.0-20200927104501-e162460cd6b5 h1:QelT11PB4FXiDEXucrfNckHoFxwt8USGY1ajP1ZF5lM= -golang.org/x/image v0.0.0-20200927104501-e162460cd6b5/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= diff --git a/structures/graph.go b/graph.go similarity index 96% rename from structures/graph.go rename to graph.go index ff5cfd4..81e7c10 100755 --- a/structures/graph.go +++ b/graph.go @@ -1,9 +1,10 @@ -package structures +package endure import ( "errors" "fmt" "reflect" + "sync/atomic" ) type Kind int @@ -76,6 +77,9 @@ type Vertex struct { // Set of entries which can vertex provide (for example, foo4 vertex can provide DB instance and logger) Provides map[string]ProvidedEntry + // current state + state uint32 + // If vertex disabled it removed from the processing (Init, Serve, Stop), but present in the graph IsDisabled bool // for the topological sort, private @@ -105,6 +109,14 @@ func (v *Vertex) AddProvider(valueKey string, value reflect.Value, isRef bool, k } } +func (v *Vertex) SetState(st State) { + atomic.StoreUint32(&v.state, uint32(st)) +} + +func (v *Vertex) GetState() State { + return State(atomic.LoadUint32(&v.state)) +} + func (g *Graph) DisableById(vid string) { v := g.VerticesMap[vid] for i := 0; i < len(g.Vertices); i++ { @@ -282,6 +294,7 @@ func (g *Graph) Reset(vertex *Vertex) []*Vertex { vertex.numOfDeps = len(vertex.Dependencies) vertex.visiting = false vertex.visited = false + vertex.SetState(Uninitialized) vertices := make([]*Vertex, 0, 5) vertices = append(vertices, vertex) @@ -307,12 +320,15 @@ func (g *Graph) depthFirstSearch(deps []*Vertex, tmp map[string]*Vertex) { } func (g *Graph) AddVertex(vertexID string, vertexIface interface{}, meta Meta) { - g.VerticesMap[vertexID] = &Vertex{ + v := &Vertex{ ID: vertexID, Iface: vertexIface, Meta: meta, Dependencies: nil, } + v.SetState(Uninitialized) + g.VerticesMap[vertexID] = v + g.Vertices = append(g.Vertices, g.VerticesMap[vertexID]) } diff --git a/init.go b/init.go new file mode 100644 index 0000000..e7f2924 --- /dev/null +++ b/init.go @@ -0,0 +1,133 @@ +package endure + +import ( + "reflect" + + "github.com/spiral/errors" + "go.uber.org/zap" +) + +/* + Traverse the DLL in the forward direction + +*/ +func (e *Endure) internalInit(vertex *Vertex) error { + const op = errors.Op("internal_init") + if vertex.IsDisabled { + e.logger.Warn("vertex is disabled due to error.Disabled in the Init func or due to Endure decision (Disabled dependency)", zap.String("vertex id", vertex.ID)) + return nil + } + // we already checked the Interface satisfaction + // at this step absence of Init() is impoosssibruuu + initMethod, _ := reflect.TypeOf(vertex.Iface).MethodByName(InitMethodName) + + err := e.callInitFn(initMethod, vertex) + if err != nil { + e.logger.Error("error occurred during the call INIT function", zap.String("vertex id", vertex.ID), zap.Error(err)) + return errors.E(op, errors.FunctionCall, err) + } + + return nil +} + +/* +Here we also track the Disabled vertices. If the vertex is disabled we should re-calculate the tree + +*/ +func (e *Endure) callInitFn(init reflect.Method, vertex *Vertex) error { + const op = errors.Op("internal_call_init_function") + if vertex.GetState() != Initializing { + return errors.E("vertex should be in Initializing state") + } + in, err := e.findInitParameters(vertex) + if err != nil { + return errors.E(op, errors.FunctionCall, err) + } + // Iterate over dependencies + // And search in Vertices for the provided types + ret := init.Func.Call(in) + rErr := ret[0].Interface() + if rErr != nil { + if err, ok := rErr.(error); ok && e != nil { + /* + If vertex is disabled we skip all processing for it: + 1. We don't add Init function args as dependencies + */ + if errors.Is(errors.Disabled, err) { + /* + DisableById vertex + 1. But if vertex is disabled it can't PROVIDE via v.Provided value of itself for other vertices + and we should recalculate whole three without this dep. + */ + e.logger.Warn("vertex disabled", zap.String("vertex id", vertex.ID), zap.Error(err)) + // disable current vertex + vertex.IsDisabled = true + // disable all vertices in the vertex which depends on current + e.graph.DisableById(vertex.ID) + // Disabled is actually to an error, just notification to the graph, that it has some vertices which are disabled + return nil + } else { + e.logger.Error("error calling internal_init", zap.String("vertex id", vertex.ID), zap.Error(err)) + return errors.E(op, errors.FunctionCall, err) + } + } else { + return errors.E(op, errors.FunctionCall, errors.Str("unknown error occurred during the function call")) + } + } + + // just to be safe here + // len should be at least 1 (receiver) + if len(in) > 0 { + /* + n.Vertex.AddProvider + 1. removePointerAsterisk to have uniform way of adding and searching the function args + 2. if value already exists, AddProvider will replace it with new one + */ + vertex.AddProvider(removePointerAsterisk(in[0].Type().String()), in[0], isReference(in[0].Type()), in[0].Kind()) + e.logger.Debug("value added successfully", zap.String("vertex id", vertex.ID), zap.String("parameter", in[0].Type().String())) + } else { + e.logger.Error("0 or less parameters for Init", zap.String("vertex id", vertex.ID)) + return errors.E(op, errors.ArgType, errors.Str("0 or less parameters for Init")) + } + + if len(vertex.Meta.CollectsDepsToInvoke) > 0 { + for i := 0; i < len(vertex.Meta.CollectsDepsToInvoke); i++ { + // Interface dependency + if vertex.Meta.CollectsDepsToInvoke[i].Kind == reflect.Interface { + err = e.traverseCallCollectorsInterface(vertex) + if err != nil { + return errors.E(op, errors.Traverse, err) + } + } else { + // structure dependence + err = e.traverseCallCollectors(vertex) + if err != nil { + return errors.E(op, errors.Traverse, err) + } + } + } + } + return nil +} + +func (e *Endure) findInitParameters(vertex *Vertex) ([]reflect.Value, error) { + const op = errors.Op("internal_find_init_parameters") + in := make([]reflect.Value, 0, 2) + + // add service itself + in = append(in, reflect.ValueOf(vertex.Iface)) + + // add dependencies + if len(vertex.Meta.InitDepsToInvoke) > 0 { + for i := 0; i < len(vertex.Meta.InitDepsToInvoke); i++ { + depID := vertex.Meta.InitDepsToInvoke[i].Name + v := e.graph.FindProviders(depID) + var err error + in, err = e.traverseProviders(vertex.Meta.InitDepsToInvoke[i], v[0], depID, vertex.ID, in) + if err != nil { + return nil, errors.E(op, errors.Traverse, err) + } + } + } + return in, nil +} diff --git a/internal.go b/internal.go deleted file mode 100755 index e810239..0000000 --- a/internal.go +++ /dev/null @@ -1,787 +0,0 @@ -package endure - -import ( - "context" - "fmt" - "reflect" - "time" - - "github.com/cenkalti/backoff/v4" - "github.com/spiral/endure/structures" - "github.com/spiral/errors" - "go.uber.org/zap" -) - -/* - Traverse the DLL in the forward direction - -*/ -func (e *Endure) init(vertex *structures.Vertex) error { - const op = errors.Op("internal_init") - if vertex.IsDisabled { - e.logger.Warn("vertex is disabled due to error.Disabled in the Init func or due to Endure decision (Disabled dependency)", zap.String("vertex id", vertex.ID)) - return nil - } - // we already checked the Interface satisfaction - // at this step absence of Init() is impoosssibruuu - initMethod, _ := reflect.TypeOf(vertex.Iface).MethodByName(InitMethodName) - - err := e.callInitFn(initMethod, vertex) - if err != nil { - e.logger.Error("error occurred during the call INIT function", zap.String("vertex id", vertex.ID), zap.Error(err)) - return errors.E(op, errors.FunctionCall, err) - } - - return nil -} - -/* -Here we also track the Disabled vertices. If the vertex is disabled we should re-calculate the tree - -*/ -func (e *Endure) callInitFn(init reflect.Method, vertex *structures.Vertex) error { - const op = errors.Op("internal_call_init_function") - in, err := e.findInitParameters(vertex) - if err != nil { - return errors.E(op, errors.FunctionCall, err) - } - // Iterate over dependencies - // And search in Vertices for the provided types - ret := init.Func.Call(in) - rErr := ret[0].Interface() - if rErr != nil { - if err, ok := rErr.(error); ok && e != nil { - /* - If vertex is disabled we skip all processing for it: - 1. We don't add Init function args as dependencies - */ - if errors.Is(errors.Disabled, err) { - /* - DisableById vertex - 1. But if vertex is disabled it can't PROVIDE via v.Provided value of itself for other vertices - and we should recalculate whole three without this dep. - */ - e.logger.Warn("vertex disabled", zap.String("vertex id", vertex.ID), zap.Error(err)) - // disable current vertex - vertex.IsDisabled = true - // disable all vertices in the vertex which depends on current - e.graph.DisableById(vertex.ID) - // Disabled is actually to an error, just notification to the graph, that it has some vertices which are disabled - return nil - } else { - e.logger.Error("error calling init", zap.String("vertex id", vertex.ID), zap.Error(err)) - return errors.E(op, errors.FunctionCall, err) - } - } else { - return errors.E(op, errors.FunctionCall, errors.Str("unknown error occurred during the function call")) - } - } - - // just to be safe here - // len should be at least 1 (receiver) - if len(in) > 0 { - /* - n.Vertex.AddProvider - 1. removePointerAsterisk to have uniform way of adding and searching the function args - 2. if value already exists, AddProvider will replace it with new one - */ - vertex.AddProvider(removePointerAsterisk(in[0].Type().String()), in[0], isReference(in[0].Type()), in[0].Kind()) - e.logger.Debug("value added successfully", zap.String("vertex id", vertex.ID), zap.String("parameter", in[0].Type().String())) - } else { - e.logger.Error("0 or less parameters for Init", zap.String("vertex id", vertex.ID)) - return errors.E(op, errors.ArgType, errors.Str("0 or less parameters for Init")) - } - - if len(vertex.Meta.CollectsDepsToInvoke) > 0 { - for i := 0; i < len(vertex.Meta.CollectsDepsToInvoke); i++ { - // Interface dependency - if vertex.Meta.CollectsDepsToInvoke[i].Kind == reflect.Interface { - err = e.traverseCallCollectorsInterface(vertex) - if err != nil { - return errors.E(op, errors.Traverse, err) - } - } else { - // structure dependence - err = e.traverseCallCollectors(vertex) - if err != nil { - return errors.E(op, errors.Traverse, err) - } - } - } - } - return nil -} - -func (e *Endure) traverseCallCollectorsInterface(vertex *structures.Vertex) error { - const op = errors.Op("internal_traverse_call_collectors_interface") - for i := 0; i < len(vertex.Meta.CollectsDepsToInvoke); i++ { - // get dependency id (vertex id) - depID := vertex.Meta.CollectsDepsToInvoke[i].Name - // find vertex which provides dependency - providers := e.graph.FindProviders(depID) - - // Depend from interface - /* - In this case we need to be careful with IN parameters - 1. We need to find type, which implements that interface - 2. Calculate IN args - 3. And invoke - */ - - // search for providers - for j := 0; j < len(providers); j++ { - // vertexKey is for example foo.DB - // vertexValue is value for that key - for vertexKey, vertexVal := range providers[j].Provides { - if depID != vertexKey { - continue - } - // init - inInterface := make([]reflect.Value, 0, 2) - // add service itself - inInterface = append(inInterface, reflect.ValueOf(vertex.Iface)) - // if type provides needed type - // value - reference and init dep also reference - switch { - case *vertexVal.IsReference == *vertex.Meta.CollectsDepsToInvoke[i].IsReference: - inInterface = append(inInterface, *vertexVal.Value) - case *vertexVal.IsReference: - // same type, but difference in the refs - // Init needs to be a value - // But Vertex provided reference - inInterface = append(inInterface, vertexVal.Value.Elem()) - case !*vertexVal.IsReference: - // vice versa - // Vertex provided value - // but Init needs to be a reference - if vertexVal.Value.CanAddr() { - inInterface = append(inInterface, vertexVal.Value.Addr()) - } else { - e.logger.Warn(fmt.Sprintf("value is not addressible. TIP: consider to return a pointer from %s", vertexVal.Value.Type()), zap.String("type", vertexVal.Value.Type().String())) - e.logger.Warn("making a fresh pointer") - nt := reflect.New(vertexVal.Value.Type()) - inInterface = append(inInterface, nt) - } - } - - err := e.callCollectorFns(vertex, inInterface) - if err != nil { - return errors.E(op, errors.Traverse, err) - } - } - } - } - - return nil -} - -func (e *Endure) traverseCallCollectors(vertex *structures.Vertex) error { - const op = "internal_traverse_call_collectors" - in := make([]reflect.Value, 0, 2) - // add service itself - in = append(in, reflect.ValueOf(vertex.Iface)) - - for i := 0; i < len(vertex.Meta.CollectsDepsToInvoke); i++ { - // get dependency id (vertex id) - depID := vertex.Meta.CollectsDepsToInvoke[i].Name - // find vertex which provides dependency - providers := e.graph.FindProviders(depID) - // search for providers - for j := 0; j < len(providers); j++ { - for vertexID, val := range providers[j].Provides { - // if type provides needed type - if vertexID == depID { - switch { - case *val.IsReference == *vertex.Meta.CollectsDepsToInvoke[i].IsReference: - in = append(in, *val.Value) - case *val.IsReference: - // same type, but difference in the refs - // Init needs to be a value - // But Vertex provided reference - in = append(in, val.Value.Elem()) - case !*val.IsReference: - // vice versa - // Vertex provided value - // but Init needs to be a reference - if val.Value.CanAddr() { - in = append(in, val.Value.Addr()) - } else { - e.logger.Warn(fmt.Sprintf("value is not addressible. TIP: consider to return a pointer from %s", val.Value.Type()), zap.String("type", val.Value.Type().String())) - e.logger.Warn("making a fresh pointer") - nt := reflect.New(val.Value.Type()) - in = append(in, nt) - } - } - } - } - } - } - - err := e.callCollectorFns(vertex, in) - if err != nil { - return errors.E(op, errors.Traverse, err) - } - - return nil -} - -func (e *Endure) callCollectorFns(vertex *structures.Vertex, in []reflect.Value) error { - const op = errors.Op("internal_call_collector_functions") - // type implements Collector interface - if reflect.TypeOf(vertex.Iface).Implements(reflect.TypeOf((*Collector)(nil)).Elem()) { - // if type implements Collector() it should has FnsProviderToInvoke - if vertex.Meta.CollectsDepsToInvoke != nil { - for k := 0; k < len(vertex.Meta.FnsCollectorToInvoke); k++ { - m, ok := reflect.TypeOf(vertex.Iface).MethodByName(vertex.Meta.FnsCollectorToInvoke[k]) - if !ok { - e.logger.Error("type has missing method in FnsCollectorToInvoke", zap.String("vertex id", vertex.ID), zap.String("method", vertex.Meta.FnsCollectorToInvoke[k])) - return errors.E(op, errors.FunctionCall, errors.Str("type has missing method in FnsCollectorToInvoke")) - } - - ret := m.Func.Call(in) - // handle error - if len(ret) > 0 { - // error is the last return parameter in line - rErr := ret[len(ret)-1].Interface() - if rErr != nil { - if err, ok := rErr.(error); ok && e != nil { - e.logger.Error("error calling CollectorFns", zap.String("vertex id", vertex.ID), zap.Error(err)) - return errors.E(op, errors.FunctionCall, err) - } - return errors.E(op, errors.FunctionCall, errors.Str("unknown error occurred during the function call")) - } - } else { - return errors.E(op, errors.FunctionCall, errors.Str("collector should return Value and error types")) - } - } - } - } - return nil -} - -func (e *Endure) findInitParameters(vertex *structures.Vertex) ([]reflect.Value, error) { - const op = errors.Op("internal_find_init_parameters") - in := make([]reflect.Value, 0, 2) - - // add service itself - in = append(in, reflect.ValueOf(vertex.Iface)) - - // add dependencies - if len(vertex.Meta.InitDepsToInvoke) > 0 { - for i := 0; i < len(vertex.Meta.InitDepsToInvoke); i++ { - depID := vertex.Meta.InitDepsToInvoke[i].Name - v := e.graph.FindProviders(depID) - var err error - in, err = e.traverseProviders(vertex.Meta.InitDepsToInvoke[i], v[0], depID, vertex.ID, in) - if err != nil { - return nil, errors.E(op, errors.Traverse, err) - } - } - } - return in, nil -} - -func (e *Endure) traverseProviders(depsEntry structures.Entry, depVertex *structures.Vertex, depID string, calleeID string, in []reflect.Value) ([]reflect.Value, error) { - const op = errors.Op("internal_traverse_providers") - err := e.traverseCallProvider(depVertex, []reflect.Value{reflect.ValueOf(depVertex.Iface)}, calleeID, depID) - if err != nil { - return nil, errors.E(op, errors.Traverse, err) - } - - // to index function name in defer - for providerID, providedEntry := range depVertex.Provides { - if providerID == depID { - in = e.appendProviderFuncArgs(depsEntry, providedEntry, in) - } - } - - return in, nil -} - -func (e *Endure) appendProviderFuncArgs(depsEntry structures.Entry, providedEntry structures.ProvidedEntry, in []reflect.Value) []reflect.Value { - switch { - case *providedEntry.IsReference == *depsEntry.IsReference: - in = append(in, *providedEntry.Value) - case *providedEntry.IsReference: - // same type, but difference in the refs - // Init needs to be a value - // But Vertex provided reference - in = append(in, providedEntry.Value.Elem()) - case !*providedEntry.IsReference: - // vice versa - // Vertex provided value - // but Init needs to be a reference - if providedEntry.Value.CanAddr() { - in = append(in, providedEntry.Value.Addr()) - } else { - e.logger.Warn(fmt.Sprintf("value is not addressible. TIP: consider to return a pointer from %s", providedEntry.Value.Type()), zap.String("type", providedEntry.Value.Type().String())) - e.logger.Warn("making a fresh pointer") - nt := reflect.New(providedEntry.Value.Type()) - in = append(in, nt) - } - } - return in -} - -func (e *Endure) traverseCallProvider(vertex *structures.Vertex, in []reflect.Value, callerID, depId string) error { - const op = errors.Op("internal_traverse_call_provider") - // to index function name in defer - i := 0 - defer func() { - if r := recover(); r != nil { - e.logger.Error("panic during the function call", zap.String("function name", vertex.Meta.FnsProviderToInvoke[i].FunctionName), zap.String("error", fmt.Sprint(r))) - } - }() - // type implements Provider interface - if reflect.TypeOf(vertex.Iface).Implements(reflect.TypeOf((*Provider)(nil)).Elem()) { - // if type implements Provider() it should has FnsProviderToInvoke - if vertex.Meta.FnsProviderToInvoke != nil { - // go over all function to invoke - // invoke it - // and save its return values - for i = 0; i < len(vertex.Meta.FnsProviderToInvoke); i++ { - m, ok := reflect.TypeOf(vertex.Iface).MethodByName(vertex.Meta.FnsProviderToInvoke[i].FunctionName) - if !ok { - e.logger.Panic("should implement the Provider interface", zap.String("function name", vertex.Meta.FnsProviderToInvoke[i].FunctionName)) - } - - if vertex.Meta.FnsProviderToInvoke[i].ReturnTypeId != depId { - continue - } - - /* - think about better solution here TODO - We copy IN params here because only in slice is constant - */ - inCopy := make([]reflect.Value, len(in)) - copy(inCopy, in) - - /* - cases when func NumIn can be more than one - is that function accepts some other type except of receiver - at the moment we assume, that this "other type" is FunctionName interface - */ - if m.Func.Type().NumIn() > 1 { - /* - here we should add type which implement Named interface - at the moment we seek for implementation in the callerID only - */ - - callerV := e.graph.GetVertex(callerID) - if callerV == nil { - return errors.E(op, errors.Traverse, errors.Str("caller vertex is nil")) - } - - // skip function receiver - for j := 1; j < m.Func.Type().NumIn(); j++ { - // current function IN type (interface) - t := m.Func.Type().In(j) - if t.Kind() != reflect.Interface { - e.logger.Panic("Provider accepts only interfaces", zap.String("function name", vertex.Meta.FnsProviderToInvoke[i].FunctionName)) - } - - // if Caller struct implements interface -- ok, add it to the inCopy list - // else panic - if reflect.TypeOf(callerV.Iface).Implements(t) == false { - e.logger.Panic("Caller should implement callee interface", zap.String("function name", vertex.Meta.FnsProviderToInvoke[i].FunctionName)) - } - - inCopy = append(inCopy, reflect.ValueOf(callerV.Iface)) - } - } - - ret := m.Func.Call(inCopy) - // handle error - if len(ret) > 1 { - rErr := ret[1].Interface() - if rErr != nil { - if err, ok := rErr.(error); ok && e != nil { - e.logger.Error("error occurred in the traverseCallProvider", zap.String("vertex id", vertex.ID)) - return errors.E(op, errors.FunctionCall, err) - } - return errors.E(op, errors.FunctionCall, errors.Str("unknown error occurred during the function call")) - } - - // add the value to the Providers - e.logger.Debug("value added successfully", zap.String("vertex id", vertex.ID), zap.String("caller id", callerID), zap.String("parameter", in[0].Type().String())) - vertex.AddProvider(removePointerAsterisk(ret[0].Type().String()), ret[0], isReference(ret[0].Type()), in[0].Kind()) - } else { - return errors.E(op, errors.FunctionCall, errors.Str("provider should return Value and error types")) - } - } - } - } - return nil -} - -/* -Algorithm is the following (all steps executing in the topological order): -2. Call Serve() on all services -- OPTIONAL -3. Call Stop() on all services -- OPTIONAL -4. Call Clear() on a services, which implements this interface -- OPTIONAL -*/ -// call configure on the node - -func (e *Endure) callServeFn(vertex *structures.Vertex, in []reflect.Value) (*result, error) { - const op = errors.Op("call_serve_fn") - e.logger.Debug("preparing to serve the vertex", zap.String("vertex id", vertex.ID)) - m, _ := reflect.TypeOf(vertex.Iface).MethodByName(ServeMethodName) - ret := m.Func.Call(in) - res := ret[0].Interface() - if res != nil { - e.logger.Debug("called serve on the vertex", zap.String("vertex id", vertex.ID)) - if e, ok := res.(chan error); ok && e != nil { - // error come righth after we start serving the vertex - if len(e) > 0 { - return nil, errors.E(op, errors.FunctionCall, errors.Str("got first run error from vertex, stopping serve execution")) - } - return &result{ - errCh: e, - signal: make(chan notify), - vertexID: vertex.ID, - }, nil - } - } - // error, result should not be nil - // the only one reason to be nil is to vertex return parameter (channel) is not initialized - return nil, nil -} - -func (e *Endure) stop(vID string) error { - const op = errors.Op("internal_stop") - vertex := e.graph.GetVertex(vID) - if reflect.TypeOf(vertex.Iface).Implements(reflect.TypeOf((*Service)(nil)).Elem()) { - in := make([]reflect.Value, 0, 1) - // add service itself - in = append(in, reflect.ValueOf(vertex.Iface)) - - err := e.callStopFn(vertex, in) - if err != nil { - e.logger.Error("error occurred during the callStopFn", zap.String("vertex id", vertex.ID)) - return errors.E(op, errors.FunctionCall, err) - } - } - - return nil -} - -func (e *Endure) callStopFn(vertex *structures.Vertex, in []reflect.Value) error { - const op = errors.Op("internal_call_stop_function") - // Call Stop() method, which returns only error (or nil) - e.logger.Debug("calling stop function on the vertex", zap.String("vertex id", vertex.ID)) - m, _ := reflect.TypeOf(vertex.Iface).MethodByName(StopMethodName) - ret := m.Func.Call(in) - rErr := ret[0].Interface() - if rErr != nil { - if e, ok := rErr.(error); ok && e != nil { - return errors.E(op, errors.FunctionCall, e) - } - return errors.E(op, errors.FunctionCall, errors.Str("unknown error occurred during the function call")) - } - return nil -} - -func (e *Endure) sendStopSignal(sorted []*structures.Vertex) { - for _, v := range sorted { - // get result by vertex ID - tmp, ok := e.results.Load(v.ID) - if !ok { - continue - } - res := tmp.(*result) - if tmp == nil { - continue - } - // send exit signal to the goroutine in sorted order - e.logger.Debug("sending exit signal to the vertex from the main thread", zap.String("vertex id", res.vertexID)) - res.signal <- notify{ - stop: true, - } - - e.results.Delete(v.ID) - } -} - -func (e *Endure) sendResultToUser(res *result) { - e.userResultsCh <- &Result{ - Error: res.err, - VertexID: res.vertexID, - } -} - -func (e *Endure) shutdown(n *structures.DllNode) { - // channel with nodes to stop - sh := make(chan *structures.DllNode) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) - go func() { - // process all nodes one by one - nCopy := n - for nCopy != nil { - if nCopy.Vertex.IsDisabled == true { - nCopy = nCopy.Next - continue - } - sh <- nCopy - nCopy = nCopy.Next - } - // after all nodes will be processed, send ctx.Done signal to finish the stopHandler - cancel() - }() - // block until process all nodes - e.forceExitHandler(ctx, sh) -} - -func (e *Endure) forceExitHandler(ctx context.Context, data chan *structures.DllNode) { - for { - select { - case node := <-data: - // stop vertex - err := e.stop(node.Vertex.ID) - if err != nil { - // TODO do not return until finished - // just log the errors - // stack it in slice and if slice is not empty, visualize it ?? - e.logger.Error("error occurred during the services stopping", zap.String("vertex id", node.Vertex.ID), zap.Error(err)) - } - // exit from vertex poller - tmp, ok := e.results.Load(node.Vertex.ID) - if !ok { - continue - } - - channel := tmp.(*result) - channel.signal <- notify{ - // false because we called stop already - stop: false, - } - - case <-ctx.Done(): - return - } - } -} - -// serve run calls callServeFn for each node and put the results in the map -func (e *Endure) serve(n *structures.DllNode) error { - const op = errors.Op("internal_serve") - // check if type implements serve, if implements, call serve - if reflect.TypeOf(n.Vertex.Iface).Implements(reflect.TypeOf((*Service)(nil)).Elem()) { - in := make([]reflect.Value, 0, 1) - // add service itself - in = append(in, reflect.ValueOf(n.Vertex.Iface)) - - res, err := e.callServeFn(n.Vertex, in) - if err != nil { - return errors.E(op, errors.FunctionCall, err) - } - if res != nil { - e.results.Store(res.vertexID, res) - } else { - e.logger.Error("nil result returned from the vertex", zap.String("vertex id", n.Vertex.ID), zap.String("tip:", "serve function should return initialized channel with errors")) - return errors.E(op, errors.FunctionCall, errors.Errorf("nil result returned from the vertex, vertex id: %s", n.Vertex.ID)) - } - - // start poll the vertex - e.poll(res) - } - - return nil -} - -// traverseBackStop used to visit every Prev node and stop vertices -func (e *Endure) traverseBackStop(n *structures.DllNode) { - const op = errors.Op("traverse_back_stop") - e.logger.Debug("stopping vertex in the first Serve call", zap.String("vertex id", n.Vertex.ID)) - nCopy := n - for nCopy != nil { - err := e.stop(nCopy.Vertex.ID) - if err != nil { - // ignore errors from stop - e.logger.Error("failed to traverse vertex back", zap.String("vertex id", nCopy.Vertex.ID), zap.Error(errors.E(op, err))) - } - nCopy = nCopy.Prev - } -} - -func (e *Endure) startMainThread() { - /* - Main thread is the main Endure unit of work - It used to handle errors from vertices, notify user about result, re-calculating graph according to failed vertices and sending stop signals - */ - go func() { - for { - select { - // failed Vertex - case res, ok := <-e.handleErrorCh: - // lock the handleErrorCh processing - if !ok { - e.logger.Debug("handle error channel was closed") - return - } - - e.logger.Debug("processing error in the main thread", zap.String("vertex id", res.vertexID)) - if e.retry { - // TODO handle error from the retry handler - e.retryHandler(res) - } else { - e.logger.Info("retry is turned off, sending exit signal to every vertex in the graph") - // send exit signal to whole graph - e.sendStopSignal(e.graph.Vertices) - e.sendResultToUser(res) - } - } - } - }() -} - -func (e *Endure) retryHandler(res *result) { - const op = errors.Op("internal_retry_handler") - // get vertex from the graph - vertex := e.graph.GetVertex(res.vertexID) - if vertex == nil { - e.logger.Error("failed to get vertex from the graph, vertex is nil", zap.String("vertex id from the handleErrorCh channel", res.vertexID)) - e.userResultsCh <- &Result{ - Error: errors.E(op, errors.Traverse, errors.Str("failed to get vertex from the graph, vertex is nil")), - VertexID: "", - } - return - } - - // reset vertex and dependencies to the initial state - // numOfDeps and visited/visiting - vertices := e.graph.Reset(vertex) - - // Topologically sort the graph - sorted, err := structures.TopologicalSort(vertices) - if err != nil { - e.logger.Error("error sorting the graph", zap.Error(err)) - return - } - if sorted == nil { - e.logger.Error("sorted list should not be nil", zap.String("vertex id from the handleErrorCh channel", res.vertexID)) - e.userResultsCh <- &Result{ - Error: errors.E(op, errors.Traverse, errors.Str("failed to topologically sort the graph")), - VertexID: res.vertexID, - } - return - } - - // send exit signal only to sorted and involved vertices - // stop will be called inside poller - e.sendStopSignal(sorted) - - // Init backoff - b := backoff.NewExponentialBackOff() - b.MaxElapsedTime = e.maxInterval - b.InitialInterval = e.initialInterval - - affectedRunList := structures.NewDoublyLinkedList() - for i := 0; i <= len(sorted)-1; i++ { - affectedRunList.Push(sorted[i]) - } - - // call init - headCopy := affectedRunList.Head - for headCopy != nil { - berr := backoff.Retry(e.backoffInit(headCopy.Vertex), b) - if berr != nil { - e.logger.Error("backoff failed", zap.String("vertex id", headCopy.Vertex.ID), zap.Error(berr)) - e.userResultsCh <- &Result{ - Error: errors.E(op, errors.FunctionCall, errors.Errorf("error during the Init function call")), - VertexID: headCopy.Vertex.ID, - } - return - } - headCopy = headCopy.Next - } - - // call serve - headCopy = affectedRunList.Head - for headCopy != nil { - err = e.serve(headCopy) - if err != nil { - e.userResultsCh <- &Result{ - Error: errors.E(op, errors.FunctionCall, errors.Errorf("error during the Serve function call")), - VertexID: headCopy.Vertex.ID, - } - e.logger.Error("fatal error during the serve in the main thread", zap.String("vertex id", headCopy.Vertex.ID), zap.Error(err)) - return - } - - headCopy = headCopy.Next - } - - e.sendResultToUser(res) -} - -// poll is used to poll the errors from the vertex -// and exit from it -func (e *Endure) poll(r *result) { - rr := r - go func(res *result) { - for { - select { - // error - case err := <-res.errCh: - if err != nil { - // log error message - e.logger.Error("vertex got an error", zap.String("vertex id", res.vertexID), zap.Error(err)) - - // set the error - res.err = err - - // send handleErrorCh signal - e.handleErrorCh <- res - } - // exit from the goroutine - case n := <-res.signal: - if n.stop { - e.mutex.Lock() - e.logger.Info("vertex got exit signal", zap.String("vertex id", res.vertexID)) - err := e.stop(res.vertexID) - if err != nil { - e.logger.Error("error during exit signal", zap.String("error while stopping the vertex:", res.vertexID), zap.Error(err)) - e.mutex.Unlock() - } - e.mutex.Unlock() - return - } - return - } - } - }(rr) -} - -func (e *Endure) register(name string, vertex interface{}, order int) error { - // check the vertex - const op = errors.Op("internal_register") - if e.graph.HasVertex(name) { - return errors.E(op, errors.Traverse, errors.Errorf("vertex `%s` already exists", name)) - } - - meta := structures.Meta{ - Order: order, - } - - // just push the vertex - // here we can append in future some meta information - e.graph.AddVertex(name, vertex, meta) - return nil -} - -func (e *Endure) backoffInit(v *structures.Vertex) func() error { - return func() error { - const op = errors.Op("internal_backoff_init") - // we already checked the Interface satisfaction - // at this step absence of Init() is impossible - init, _ := reflect.TypeOf(v.Iface).MethodByName(InitMethodName) - - err := e.callInitFn(init, v) - if err != nil { - e.logger.Error("error occurred during the call INIT function", zap.String("vertex id", v.ID), zap.Error(err)) - return errors.E(op, errors.FunctionCall, err) - } - - return nil - } -} diff --git a/structures/linked_list.go b/linked_list.go similarity index 99% rename from structures/linked_list.go rename to linked_list.go index 9c61fca..70e0105 100755 --- a/structures/linked_list.go +++ b/linked_list.go @@ -1,4 +1,4 @@ -package structures +package endure // Vertex of the DoublyLL type DllNode struct { diff --git a/poller.go b/poller.go new file mode 100644 index 0000000..90c1276 --- /dev/null +++ b/poller.go @@ -0,0 +1,65 @@ +package endure + +import "go.uber.org/zap" + +// poll is used to poll the errors from the vertex +// and exit from it +func (e *Endure) poll(r *result) { + rr := r + go func(res *result) { + for { + select { + // error + case err := <-res.errCh: + if err != nil { + // log error message + e.logger.Error("vertex got an error", zap.String("vertex id", res.vertexID), zap.Error(err)) + + // set the error + res.err = err + + // send handleErrorCh signal + e.handleErrorCh <- res + } + // exit from the goroutine + case <-res.signal: + e.logger.Info("vertex got exit signal, exiting from poller", zap.String("vertex id", res.vertexID)) + return + } + } + }(rr) +} + +func (e *Endure) startMainThread() { + /* + Main thread is the main Endure unit of work + It used to handle errors from vertices, notify user about result, re-calculating graph according to failed vertices and sending internal_stop signals + */ + go func() { + for { + select { + // failed Vertex + case res, ok := <-e.handleErrorCh: + // lock the handleErrorCh processing + if !ok { + e.logger.Debug("handle error channel was closed") + return + } + + e.logger.Debug("processing error in the main thread", zap.String("vertex id", res.vertexID)) + if e.retry { + // TODO handle error from the retry handler + e.retryHandler(res) + } else { + e.logger.Info("retry is turned off, sending exit signal to every vertex in the graph") + // send exit signal to whole graph + err := e.Stop() + if err != nil { + e.logger.Error("error during stopping vertex", zap.String("vertex id", res.vertexID), zap.Error(err)) + } + e.sendResultToUser(res) + } + } + } + }() +} diff --git a/register.go b/register.go new file mode 100644 index 0000000..60c2279 --- /dev/null +++ b/register.go @@ -0,0 +1,22 @@ +package endure + +import ( + "github.com/spiral/errors" +) + +func (e *Endure) register(name string, vertex interface{}, order int) error { + // check the vertex + const op = errors.Op("internal_register") + if e.graph.HasVertex(name) { + return errors.E(op, errors.Traverse, errors.Errorf("vertex `%s` already exists", name)) + } + + meta := Meta{ + Order: order, + } + + // just push the vertex + // here we can append in future some meta information + e.graph.AddVertex(name, vertex, meta) + return nil +} diff --git a/serve.go b/serve.go new file mode 100644 index 0000000..4678a01 --- /dev/null +++ b/serve.go @@ -0,0 +1,60 @@ +package endure + +import ( + "reflect" + + "github.com/spiral/errors" + "go.uber.org/zap" +) + +func (e *Endure) callServeFn(vertex *Vertex, in []reflect.Value) (*result, error) { + const op = errors.Op("call_serve_fn") + e.logger.Debug("preparing to serveInternal the vertex", zap.String("vertex id", vertex.ID)) + m, _ := reflect.TypeOf(vertex.Iface).MethodByName(ServeMethodName) + ret := m.Func.Call(in) + res := ret[0].Interface() + if res != nil { + e.logger.Debug("called serveInternal on the vertex", zap.String("vertex id", vertex.ID)) + if e, ok := res.(chan error); ok && e != nil { + // error come right after we start serving the vertex + if len(e) > 0 { + return nil, errors.E(op, errors.FunctionCall, errors.Errorf("got first run error from vertex %s, stopping execution", vertex.ID)) + } + return &result{ + errCh: e, + signal: make(chan notify), + vertexID: vertex.ID, + }, nil + } + } + // error, result should not be nil + // the only one reason to be nil is to vertex return parameter (channel) is not initialized + return nil, nil +} + +// serveInternal run calls callServeFn for each node and put the results in the map +func (e *Endure) serveInternal(n *DllNode) error { + const op = errors.Op("internal_serve") + // check if type implements serveInternal, if implements, call serveInternal + if reflect.TypeOf(n.Vertex.Iface).Implements(reflect.TypeOf((*Service)(nil)).Elem()) { + in := make([]reflect.Value, 0, 1) + // add service itself + in = append(in, reflect.ValueOf(n.Vertex.Iface)) + + res, err := e.callServeFn(n.Vertex, in) + if err != nil { + return errors.E(op, errors.FunctionCall, err) + } + if res != nil { + e.results.Store(res.vertexID, res) + } else { + e.logger.Error("nil result returned from the vertex", zap.String("vertex id", n.Vertex.ID), zap.String("tip:", "serveInternal function should return initialized channel with errors")) + return errors.E(op, errors.FunctionCall, errors.Errorf("nil result returned from the vertex, vertex id: %s", n.Vertex.ID)) + } + + // start polling the vertex + e.poll(res) + } + + return nil +} diff --git a/stop.go b/stop.go new file mode 100644 index 0000000..ff6e41d --- /dev/null +++ b/stop.go @@ -0,0 +1,156 @@ +package endure + +import ( + "context" + "reflect" + + "github.com/spiral/errors" + "go.uber.org/zap" +) + +func (e *Endure) internalStop(vID string) error { + const op = errors.Op("internal_stop") + vertex := e.graph.GetVertex(vID) + if reflect.TypeOf(vertex.Iface).Implements(reflect.TypeOf((*Service)(nil)).Elem()) { + in := make([]reflect.Value, 0, 1) + // add service itself + in = append(in, reflect.ValueOf(vertex.Iface)) + + err := e.callStopFn(vertex, in) + if err != nil { + e.logger.Error("error occurred during the callStopFn", zap.String("vertex id", vertex.ID)) + return errors.E(op, errors.FunctionCall, err) + } + } + return nil +} + +func (e *Endure) callStopFn(vertex *Vertex, in []reflect.Value) error { + const op = errors.Op("internal_call_stop_function") + // Call Stop() method, which returns only error (or nil) + e.logger.Debug("calling internal_stop function on the vertex", zap.String("vertex id", vertex.ID)) + m, _ := reflect.TypeOf(vertex.Iface).MethodByName(StopMethodName) + ret := m.Func.Call(in) + rErr := ret[0].Interface() + if rErr != nil { + if e, ok := rErr.(error); ok && e != nil { + return errors.E(op, errors.FunctionCall, e) + } + return errors.E(op, errors.FunctionCall, errors.Str("unknown error occurred during the function call")) + } + return nil +} + +// true -> next +// false -> prev +func (e *Endure) shutdown(n *DllNode, traverseNext bool) error { + const op = errors.Op("shutdown") + numOfVertices := calculateDepth(n, traverseNext) + if numOfVertices == 0 { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), e.stopTimeout) + defer cancel() + c := make(chan string) + + // used to properly exit + // if the total number of vertices equal to the stopped, it means, that we stopped all + stopped := 0 + + go func() { + // process all nodes one by one + nCopy := n + for nCopy != nil { + go func(v *Vertex) { + // if vertex is disabled, just skip it, but send to the channel ID + if v.IsDisabled == true { + c <- v.ID + return + } + + // if vertex is Uninitialized or already stopped + // Skip vertices which are not Started + if v.GetState() != Started { + c <- v.ID + return + } + + v.SetState(Stopping) + + // if we have a running poller, exit from it + tmp, ok := e.results.Load(v.ID) + if ok { + channel := tmp.(*result) + + // exit from vertex poller + channel.signal <- notify{} + e.results.Delete(v.ID) + } + + // call Stop on the Vertex + err := e.internalStop(v.ID) + if err != nil { + v.SetState(Error) + c <- v.ID + e.logger.Error("error stopping vertex", zap.String("vertex id", v.ID), zap.Error(err)) + return + } + v.SetState(Stopped) + c <- v.ID + }(nCopy.Vertex) + if traverseNext { + nCopy = nCopy.Next + } else { + nCopy = nCopy.Prev + } + } + }() + + for { + select { + // get notification about stopped vertex + case vid := <-c: + e.logger.Info("vertex stopped", zap.String("vertex id", vid)) + stopped += 1 + if stopped == numOfVertices { + return nil + } + case <-ctx.Done(): + e.logger.Info("timeout exceed, some vertices are not stopped", zap.Error(ctx.Err())) + // iterate to see vertices, which are not stopped + VIDs := make([]string, 0, 1) + for i := 0; i < len(e.graph.Vertices); i++ { + state := e.graph.Vertices[i].GetState() + if state == Started || state == Stopping { + VIDs = append(VIDs, e.graph.Vertices[i].ID) + } + } + if len(VIDs) > 0 { + e.logger.Error("vertices which are not stopped", zap.Any("vertex id", VIDs)) + } + + return errors.E(op, errors.TimeOut, errors.Str("timeout exceed, some vertices may not be stopped and can cause memory leak")) + } + } +} + +// Using to calculate number of Vertices in DLL +func calculateDepth(n *DllNode, traverse bool) int { + num := 0 + if traverse { + tmp := n + for tmp != nil { + num += 1 + tmp = tmp.Next + } + return num + } else { + tmp := n + for tmp != nil { + num += 1 + tmp = tmp.Prev + } + return num + } +} diff --git a/tests/backoff/backoff_test.go b/tests/backoff/backoff_test.go index bed09ee..bfec14d 100755 --- a/tests/backoff/backoff_test.go +++ b/tests/backoff/backoff_test.go @@ -13,7 +13,7 @@ import ( ) func TestEndure_MainThread_Serve_Backoff(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true)) + c, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(true)) assert.NoError(t, err) assert.NoError(t, c.Register(&plugin4.Plugin4{})) @@ -37,7 +37,7 @@ func TestEndure_MainThread_Serve_Backoff(t *testing.T) { } func TestEndure_MainThread_Init_Backoff(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true), endure.SetBackoffTimes(time.Second, time.Second*10)) + c, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(true), endure.SetBackoffTimes(time.Second, time.Second*10)) assert.NoError(t, err) assert.NoError(t, c.Register(&plugin3.Plugin3{})) @@ -66,7 +66,7 @@ func TestEndure_MainThread_Init_Backoff(t *testing.T) { } func TestEndure_BackoffTimers(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true), endure.SetBackoffTimes(time.Second, time.Second*5)) + c, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(true), endure.SetBackoffTimes(time.Second, time.Second*5)) assert.NoError(t, err) assert.NoError(t, c.Register(&plugin2.Plugin2{})) diff --git a/tests/disabled_vertices/disabled_vertices_test.go b/tests/disabled_vertices/disabled_vertices_test.go index f0fd9b0..2032f94 100755 --- a/tests/disabled_vertices/disabled_vertices_test.go +++ b/tests/disabled_vertices/disabled_vertices_test.go @@ -18,7 +18,7 @@ import ( ) func TestVertexDisabled(t *testing.T) { - cont, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true)) + cont, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(true)) if err != nil { t.Fatal(err) } @@ -43,7 +43,7 @@ func TestVertexDisabled(t *testing.T) { } func TestDisabledViaInterface(t *testing.T) { - cont, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true)) + cont, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(true)) if err != nil { t.Fatal(err) } @@ -86,7 +86,7 @@ func TestDisabledViaInterface(t *testing.T) { } func TestDisabledRoot(t *testing.T) { - cont, err := endure.NewContainer(endure.DebugLevel) + cont, err := endure.NewContainer(endure.DebugLevel, nil) if err != nil { t.Fatal(err) } diff --git a/tests/happy_scenarios/happyScenario_test.go b/tests/happy_scenarios/happyScenario_test.go index 5315514..dc70301 100755 --- a/tests/happy_scenarios/happyScenario_test.go +++ b/tests/happy_scenarios/happyScenario_test.go @@ -29,7 +29,7 @@ func TestEndure_DifferentLogLevels(t *testing.T) { } func testLog(t *testing.T, level endure.Level) { - c, err := endure.NewContainer(level) + c, err := endure.NewContainer(level, nil) assert.NoError(t, err) assert.NoError(t, c.Register(&plugin4.S4{})) @@ -57,7 +57,7 @@ func testLog(t *testing.T, level endure.Level) { } func TestEndure_Init_OK(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel, endure.Visualize(true)) + c, err := endure.NewContainer(endure.DebugLevel, nil) assert.NoError(t, err) assert.NoError(t, c.Register(&plugin4.S4{})) @@ -66,11 +66,75 @@ func TestEndure_Init_OK(t *testing.T) { assert.NoError(t, c.Register(&plugin1.S1{})) assert.NoError(t, c.Register(&plugin5.S5{})) assert.NoError(t, c.Register(&plugin6.S6Interface{})) + assert.NoError(t, c.Init()) res, err := c.Serve() assert.NoError(t, err) + go func() { + for r := range res { + if r.Error != nil { + assert.NoError(t, r.Error) + return + } + } + }() + time.Sleep(time.Second * 2) + assert.NoError(t, c.Stop()) +} + +func TestEndure_DoubleInitDoubleServe_OK(t *testing.T) { + c, err := endure.NewContainer(endure.DebugLevel, nil) + assert.NoError(t, err) + + assert.NoError(t, c.Register(&plugin4.S4{})) + assert.NoError(t, c.Register(&plugin2.S2{})) + assert.NoError(t, c.Register(&plugin3.S3{})) + assert.NoError(t, c.Register(&plugin1.S1{})) + assert.NoError(t, c.Register(&plugin5.S5{})) + assert.NoError(t, c.Register(&plugin6.S6Interface{})) + + assert.NoError(t, c.Init()) + assert.Error(t, c.Init()) + + res, err := c.Serve() + assert.NoError(t, err) + res, err = c.Serve() + assert.Error(t, err) + go func() { + for r := range res { + if r.Error != nil { + assert.NoError(t, r.Error) + return + } + } + }() + + time.Sleep(time.Second * 2) + assert.NoError(t, c.Stop()) +} + +func TestEndure_WrongOrder(t *testing.T) { + c, err := endure.NewContainer(endure.DebugLevel, nil) + assert.NoError(t, err) + + assert.Error(t, c.Stop()) //recognizer: can't transition from state: Uninitialized by event Stop + assert.NoError(t, c.Register(&plugin4.S4{})) + assert.NoError(t, c.Register(&plugin2.S2{})) + assert.NoError(t, c.Register(&plugin3.S3{})) + assert.NoError(t, c.Register(&plugin1.S1{})) + assert.NoError(t, c.Register(&plugin5.S5{})) + assert.NoError(t, c.Register(&plugin6.S6Interface{})) + + _, err = c.Serve() + assert.Error(t, err) + + assert.NoError(t, c.Init()) + assert.Error(t, c.Init()) + + res, err := c.Serve() + assert.NoError(t, err) go func() { for r := range res { if r.Error != nil { @@ -85,7 +149,7 @@ func TestEndure_Init_OK(t *testing.T) { } func TestEndure_Init_1_Element(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel) + c, err := endure.NewContainer(endure.DebugLevel, nil) assert.NoError(t, err) assert.NoError(t, c.Register(&plugin7.Plugin7{})) @@ -110,7 +174,7 @@ func TestEndure_Init_1_Element(t *testing.T) { } func TestEndure_ProvidedValueButNeedPointer(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel) + c, err := endure.NewContainer(endure.DebugLevel, nil) assert.NoError(t, err) assert.NoError(t, c.Register(&plugin12.Plugin1{})) @@ -141,7 +205,7 @@ func TestEndure_PrimitiveTypes(t *testing.T) { println("test should panic") } }() - c, err := endure.NewContainer(endure.DebugLevel) + c, err := endure.NewContainer(endure.DebugLevel, nil) assert.NoError(t, err) assert.NoError(t, c.Register(&primitive.Plugin8{})) diff --git a/tests/interfaces/interfaces_test.go b/tests/interfaces/interfaces_test.go index 0fa76ee..fe77e18 100755 --- a/tests/interfaces/interfaces_test.go +++ b/tests/interfaces/interfaces_test.go @@ -19,7 +19,7 @@ import ( ) func TestEndure_Interfaces_OK(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel) + c, err := endure.NewContainer(endure.DebugLevel, nil) assert.NoError(t, err) assert.NoError(t, c.Register(&plugin1.Plugin1{})) @@ -48,7 +48,7 @@ func TestEndure_Interfaces_OK(t *testing.T) { } func TestEndure_InterfacesCollects_Ok(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel) + c, err := endure.NewContainer(endure.DebugLevel, nil) assert.NoError(t, err) assert.NoError(t, c.Register(&plugin3.Plugin3{})) @@ -64,7 +64,7 @@ func TestEndure_InterfacesCollects_Ok(t *testing.T) { } func TestEndure_NamedProvides_Ok(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel) + c, err := endure.NewContainer(endure.DebugLevel, nil) assert.NoError(t, err) assert.NoError(t, c.Register(®isters.Plugin2{})) @@ -79,7 +79,7 @@ func TestEndure_NamedProvides_Ok(t *testing.T) { } func TestEndure_NamedProvides_NotImplement_Ok(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel) + c, err := endure.NewContainer(endure.DebugLevel, nil) assert.NoError(t, err) assert.NoError(t, c.Register(&randominterface.Plugin2{})) @@ -99,7 +99,7 @@ func TestEndure_NamedProvides_WrongType_Fail(t *testing.T) { println("test should panic") } }() - c, err := endure.NewContainer(endure.DebugLevel) + c, err := endure.NewContainer(endure.DebugLevel, nil) assert.NoError(t, err) assert.NoError(t, c.Register(®istersfail.Plugin2{})) @@ -114,7 +114,7 @@ func TestEndure_NamedProvides_WrongType_Fail(t *testing.T) { } func TestEndure_ServiceInterface_NotImplemented_Ok(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel) + c, err := endure.NewContainer(endure.DebugLevel, nil) assert.NoError(t, err) assert.NoError(t, c.Register(¬ImplPlugin1.Foo{})) diff --git a/tests/issues/issues_test.go b/tests/issues/issues_test.go index 9b06390..949cbbd 100755 --- a/tests/issues/issues_test.go +++ b/tests/issues/issues_test.go @@ -15,7 +15,7 @@ import ( // Provided structure instead of function func TestEndure_Issue33(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true)) + c, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(true)) assert.NoError(t, err) assert.Error(t, c.Register(&issue33.Plugin1{})) @@ -27,7 +27,7 @@ func TestEndure_Issue33(t *testing.T) { // Call Stop on the container // Should be only 1 stop func TestEndure_Issue55(t *testing.T) { - container, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(false)) + container, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(false)) assert.NoError(t, err) assert.NoError(t, container.Register(&issue55_p1.Plugin1{})) diff --git a/tests/stress/ServeRetryErr/plugin1.go b/tests/stress/ServeRetryErr/plugin1.go index 0664085..9b122d5 100755 --- a/tests/stress/ServeRetryErr/plugin1.go +++ b/tests/stress/ServeRetryErr/plugin1.go @@ -18,7 +18,7 @@ func (s *S1ServeErr) Serve() chan error { var op = errors.Op("S1 Serve") errCh := make(chan error, 1) go func() { - time.Sleep(time.Second * 10) + time.Sleep(time.Second) err := errors.E(op, errors.Serve, "test serve error") errCh <- err }() diff --git a/tests/stress/mixed/foo.go b/tests/stress/mixed/foo.go new file mode 100644 index 0000000..8c7c322 --- /dev/null +++ b/tests/stress/mixed/foo.go @@ -0,0 +1,22 @@ +package mixed + +import ( + "time" +) + +type Foo struct { +} + +func (f *Foo) Init() error { + return nil +} + +func (f *Foo) Serve() chan error { + errCh := make(chan error, 1) + return errCh +} + +func (f *Foo) Stop() error { + time.Sleep(time.Second * 15) + return nil +} diff --git a/tests/stress/stress_test.go b/tests/stress/stress_test.go index db82787..dfd5b04 100755 --- a/tests/stress/stress_test.go +++ b/tests/stress/stress_test.go @@ -9,12 +9,13 @@ import ( "github.com/spiral/endure/tests/stress/InitErr" "github.com/spiral/endure/tests/stress/ServeErr" "github.com/spiral/endure/tests/stress/ServeRetryErr" + "github.com/spiral/endure/tests/stress/mixed" "github.com/spiral/errors" "github.com/stretchr/testify/assert" ) func TestEndure_Init_Err(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(false)) + c, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(false)) assert.NoError(t, err) assert.NoError(t, c.Register(&InitErr.S1Err{})) @@ -22,8 +23,20 @@ func TestEndure_Init_Err(t *testing.T) { assert.Error(t, c.Init()) } +func TestEndure_DoubleStop_Err(t *testing.T) { + c, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(false)) + assert.NoError(t, err) + + assert.NoError(t, c.Register(&InitErr.S1Err{})) + assert.NoError(t, c.Register(&InitErr.S2Err{})) // should produce an error during the Init + assert.Error(t, c.Init()) + assert.NoError(t, c.Stop()) + // recognizer: can't transition from state: Stopped by event Stop + assert.Error(t, c.Stop()) +} + func TestEndure_Serve_Err(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(false)) + c, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(false)) assert.NoError(t, err) assert.NoError(t, c.Register(&ServeErr.S4ServeError{})) // should produce an error during the Serve @@ -48,7 +61,7 @@ time X is 0s 4. In case of S1Err vertices S5 -> S4V -> S2ServeErr (with error in Serve in X+5s) -> S1Err should be restarted */ func TestEndure_Serve_Retry_Err(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true)) + c, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(true)) assert.NoError(t, err) assert.NoError(t, c.Register(&ServeRetryErr.S4{})) @@ -99,7 +112,7 @@ time X is 0s 5. Test should receive at least 100 errors */ func TestEndure_Serve_Retry_100_Err(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true)) + c, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(true)) assert.NoError(t, err) assert.NoError(t, c.Register(&ServeRetryErr.S4{})) @@ -147,7 +160,7 @@ func TestEndure_Serve_Retry_100_Err(t *testing.T) { } func TestEndure_Serve_Retry_100_With_Random_Err(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true)) + c, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(true)) assert.NoError(t, err) assert.NoError(t, c.Register(&ServeRetryErr.S4{})) @@ -193,7 +206,7 @@ func TestEndure_Serve_Retry_100_With_Random_Err(t *testing.T) { } func TestEndure_NoRegisterInvoke(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true)) + c, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(true)) assert.NoError(t, err) assert.Error(t, c.Init()) @@ -205,7 +218,7 @@ func TestEndure_NoRegisterInvoke(t *testing.T) { } func TestEndure_CollectorFuncReturnError(t *testing.T) { - c, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true)) + c, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(true)) assert.NoError(t, err) assert.NoError(t, c.Register(&CollectorFuncReturn.FooDep{})) @@ -217,3 +230,16 @@ func TestEndure_CollectorFuncReturnError(t *testing.T) { assert.NoError(t, c.Stop()) } + +func TestEndure_ForceExit(t *testing.T) { + c, err := endure.NewContainer(endure.DebugLevel, nil, endure.RetryOnFail(false)) // stop timeout 10 seconds + assert.NoError(t, err) + + assert.NoError(t, c.Register(&mixed.Foo{})) // sleep for 15 seconds + assert.NoError(t, c.Init()) + + _, err = c.Serve() + assert.NoError(t, err) + + assert.Error(t, c.Stop()) // shutdown: timeout exceed, some vertices are not stopped and can cause memory leak +} diff --git a/visit_collectors.go b/visit_collectors.go new file mode 100644 index 0000000..f09a24c --- /dev/null +++ b/visit_collectors.go @@ -0,0 +1,156 @@ +package endure + +import ( + "fmt" + "reflect" + + "github.com/spiral/errors" + "go.uber.org/zap" +) + +func (e *Endure) traverseCallCollectorsInterface(vertex *Vertex) error { + const op = errors.Op("internal_traverse_call_collectors_interface") + for i := 0; i < len(vertex.Meta.CollectsDepsToInvoke); i++ { + // get dependency id (vertex id) + depID := vertex.Meta.CollectsDepsToInvoke[i].Name + // find vertex which provides dependency + providers := e.graph.FindProviders(depID) + + // Depend from interface + /* + In this case we need to be careful with IN parameters + 1. We need to find type, which implements that interface + 2. Calculate IN args + 3. And invoke + */ + + // search for providers + for j := 0; j < len(providers); j++ { + // vertexKey is for example foo.DB + // vertexValue is value for that key + for vertexKey, vertexVal := range providers[j].Provides { + if depID != vertexKey { + continue + } + // internal_init + inInterface := make([]reflect.Value, 0, 2) + // add service itself + inInterface = append(inInterface, reflect.ValueOf(vertex.Iface)) + // if type provides needed type + // value - reference and internal_init dep also reference + switch { + case *vertexVal.IsReference == *vertex.Meta.CollectsDepsToInvoke[i].IsReference: + inInterface = append(inInterface, *vertexVal.Value) + case *vertexVal.IsReference: + // same type, but difference in the refs + // Init needs to be a value + // But Vertex provided reference + inInterface = append(inInterface, vertexVal.Value.Elem()) + case !*vertexVal.IsReference: + // vice versa + // Vertex provided value + // but Init needs to be a reference + if vertexVal.Value.CanAddr() { + inInterface = append(inInterface, vertexVal.Value.Addr()) + } else { + e.logger.Warn(fmt.Sprintf("value is not addressible. TIP: consider to return a pointer from %s", vertexVal.Value.Type()), zap.String("type", vertexVal.Value.Type().String())) + e.logger.Warn("making a fresh pointer") + nt := reflect.New(vertexVal.Value.Type()) + inInterface = append(inInterface, nt) + } + } + + err := e.callCollectorFns(vertex, inInterface) + if err != nil { + return errors.E(op, errors.Traverse, err) + } + } + } + } + + return nil +} + +func (e *Endure) traverseCallCollectors(vertex *Vertex) error { + const op = "internal_traverse_call_collectors" + in := make([]reflect.Value, 0, 2) + // add service itself + in = append(in, reflect.ValueOf(vertex.Iface)) + + for i := 0; i < len(vertex.Meta.CollectsDepsToInvoke); i++ { + // get dependency id (vertex id) + depID := vertex.Meta.CollectsDepsToInvoke[i].Name + // find vertex which provides dependency + providers := e.graph.FindProviders(depID) + // search for providers + for j := 0; j < len(providers); j++ { + for vertexID, val := range providers[j].Provides { + // if type provides needed type + if vertexID == depID { + switch { + case *val.IsReference == *vertex.Meta.CollectsDepsToInvoke[i].IsReference: + in = append(in, *val.Value) + case *val.IsReference: + // same type, but difference in the refs + // Init needs to be a value + // But Vertex provided reference + in = append(in, val.Value.Elem()) + case !*val.IsReference: + // vice versa + // Vertex provided value + // but Init needs to be a reference + if val.Value.CanAddr() { + in = append(in, val.Value.Addr()) + } else { + e.logger.Warn(fmt.Sprintf("value is not addressible. TIP: consider to return a pointer from %s", val.Value.Type()), zap.String("type", val.Value.Type().String())) + e.logger.Warn("making a fresh pointer") + nt := reflect.New(val.Value.Type()) + in = append(in, nt) + } + } + } + } + } + } + + err := e.callCollectorFns(vertex, in) + if err != nil { + return errors.E(op, errors.Traverse, err) + } + + return nil +} + +func (e *Endure) callCollectorFns(vertex *Vertex, in []reflect.Value) error { + const op = errors.Op("internal_call_collector_functions") + // type implements Collector interface + if reflect.TypeOf(vertex.Iface).Implements(reflect.TypeOf((*Collector)(nil)).Elem()) { + // if type implements Collector() it should has FnsProviderToInvoke + if vertex.Meta.CollectsDepsToInvoke != nil { + for k := 0; k < len(vertex.Meta.FnsCollectorToInvoke); k++ { + m, ok := reflect.TypeOf(vertex.Iface).MethodByName(vertex.Meta.FnsCollectorToInvoke[k]) + if !ok { + e.logger.Error("type has missing method in FnsCollectorToInvoke", zap.String("vertex id", vertex.ID), zap.String("method", vertex.Meta.FnsCollectorToInvoke[k])) + return errors.E(op, errors.FunctionCall, errors.Str("type has missing method in FnsCollectorToInvoke")) + } + + ret := m.Func.Call(in) + // handle error + if len(ret) > 0 { + // error is the last return parameter in line + rErr := ret[len(ret)-1].Interface() + if rErr != nil { + if err, ok := rErr.(error); ok && e != nil { + e.logger.Error("error calling CollectorFns", zap.String("vertex id", vertex.ID), zap.Error(err)) + return errors.E(op, errors.FunctionCall, err) + } + return errors.E(op, errors.FunctionCall, errors.Str("unknown error occurred during the function call")) + } + } else { + return errors.E(op, errors.FunctionCall, errors.Str("collector should return Value and error types")) + } + } + } + } + return nil +} diff --git a/visit_providers.go b/visit_providers.go new file mode 100644 index 0000000..53f9099 --- /dev/null +++ b/visit_providers.go @@ -0,0 +1,142 @@ +package endure + +import ( + "fmt" + "reflect" + + "github.com/spiral/errors" + "go.uber.org/zap" +) + +func (e *Endure) traverseProviders(depsEntry Entry, depVertex *Vertex, depID string, calleeID string, in []reflect.Value) ([]reflect.Value, error) { + const op = errors.Op("internal_traverse_providers") + err := e.traverseCallProvider(depVertex, []reflect.Value{reflect.ValueOf(depVertex.Iface)}, calleeID, depID) + if err != nil { + return nil, errors.E(op, errors.Traverse, err) + } + + // to index function name in defer + for providerID, providedEntry := range depVertex.Provides { + if providerID == depID { + in = e.appendProviderFuncArgs(depsEntry, providedEntry, in) + } + } + + return in, nil +} + +func (e *Endure) appendProviderFuncArgs(depsEntry Entry, providedEntry ProvidedEntry, in []reflect.Value) []reflect.Value { + switch { + case *providedEntry.IsReference == *depsEntry.IsReference: + in = append(in, *providedEntry.Value) + case *providedEntry.IsReference: + // same type, but difference in the refs + // Init needs to be a value + // But Vertex provided reference + in = append(in, providedEntry.Value.Elem()) + case !*providedEntry.IsReference: + // vice versa + // Vertex provided value + // but Init needs to be a reference + if providedEntry.Value.CanAddr() { + in = append(in, providedEntry.Value.Addr()) + } else { + e.logger.Warn(fmt.Sprintf("value is not addressible. TIP: consider to return a pointer from %s", providedEntry.Value.Type()), zap.String("type", providedEntry.Value.Type().String())) + e.logger.Warn("making a fresh pointer") + nt := reflect.New(providedEntry.Value.Type()) + in = append(in, nt) + } + } + return in +} + +func (e *Endure) traverseCallProvider(vertex *Vertex, in []reflect.Value, callerID, depId string) error { + const op = errors.Op("internal_traverse_call_provider") + // to index function name in defer + i := 0 + defer func() { + if r := recover(); r != nil { + e.logger.Error("panic during the function call", zap.String("function name", vertex.Meta.FnsProviderToInvoke[i].FunctionName), zap.String("error", fmt.Sprint(r))) + } + }() + // type implements Provider interface + if reflect.TypeOf(vertex.Iface).Implements(reflect.TypeOf((*Provider)(nil)).Elem()) { + // if type implements Provider() it should has FnsProviderToInvoke + if vertex.Meta.FnsProviderToInvoke != nil { + // go over all function to invoke + // invoke it + // and save its return values + for i = 0; i < len(vertex.Meta.FnsProviderToInvoke); i++ { + m, ok := reflect.TypeOf(vertex.Iface).MethodByName(vertex.Meta.FnsProviderToInvoke[i].FunctionName) + if !ok { + e.logger.Panic("should implement the Provider interface", zap.String("function name", vertex.Meta.FnsProviderToInvoke[i].FunctionName)) + } + + if vertex.Meta.FnsProviderToInvoke[i].ReturnTypeId != depId { + continue + } + + /* + think about better solution here TODO + We copy IN params here because only in slice is constant + */ + inCopy := make([]reflect.Value, len(in)) + copy(inCopy, in) + + /* + cases when func NumIn can be more than one + is that function accepts some other type except of receiver + at the moment we assume, that this "other type" is FunctionName interface + */ + if m.Func.Type().NumIn() > 1 { + /* + here we should add type which implement Named interface + at the moment we seek for implementation in the callerID only + */ + + callerV := e.graph.GetVertex(callerID) + if callerV == nil { + return errors.E(op, errors.Traverse, errors.Str("caller vertex is nil")) + } + + // skip function receiver + for j := 1; j < m.Func.Type().NumIn(); j++ { + // current function IN type (interface) + t := m.Func.Type().In(j) + if t.Kind() != reflect.Interface { + e.logger.Panic("Provider accepts only interfaces", zap.String("function name", vertex.Meta.FnsProviderToInvoke[i].FunctionName)) + } + + // if Caller struct implements interface -- ok, add it to the inCopy list + // else panic + if reflect.TypeOf(callerV.Iface).Implements(t) == false { + e.logger.Panic("Caller should implement callee interface", zap.String("function name", vertex.Meta.FnsProviderToInvoke[i].FunctionName)) + } + + inCopy = append(inCopy, reflect.ValueOf(callerV.Iface)) + } + } + + ret := m.Func.Call(inCopy) + // handle error + if len(ret) > 1 { + rErr := ret[1].Interface() + if rErr != nil { + if err, ok := rErr.(error); ok && e != nil { + e.logger.Error("error occurred in the traverseCallProvider", zap.String("vertex id", vertex.ID)) + return errors.E(op, errors.FunctionCall, err) + } + return errors.E(op, errors.FunctionCall, errors.Str("unknown error occurred during the function call")) + } + + // add the value to the Providers + e.logger.Debug("value added successfully", zap.String("vertex id", vertex.ID), zap.String("caller id", callerID), zap.String("parameter", in[0].Type().String())) + vertex.AddProvider(removePointerAsterisk(ret[0].Type().String()), ret[0], isReference(ret[0].Type()), in[0].Kind()) + } else { + return errors.E(op, errors.FunctionCall, errors.Str("provider should return Value and error types")) + } + } + } + } + return nil +} diff --git a/structures/visualize_graph.go b/visualize.go similarity index 92% rename from structures/visualize_graph.go rename to visualize.go index 4d5eaff..6d59612 100755 --- a/structures/visualize_graph.go +++ b/visualize.go @@ -1,13 +1,13 @@ // +build !windows -package structures +package endure import ( "github.com/goccy/go-graphviz" "github.com/spiral/errors" ) -func Visualize(vertices []*Vertex) error { +func (e *Endure) Visualize(vertices []*Vertex) error { const op = errors.Op("print_graph") gr := graphviz.New() graph, err := gr.Graph() diff --git a/structures/visualize_graph_windows.go b/visualize_windows.go similarity index 74% rename from structures/visualize_graph_windows.go rename to visualize_windows.go index 5db3a15..5f8d373 100755 --- a/structures/visualize_graph_windows.go +++ b/visualize_windows.go @@ -1,12 +1,12 @@ // +build windows -package structures +package endure import ( "github.com/spiral/errors" ) -func Visualize(vertices []*Vertex) error { +func (e *Endure) Visualize(vertices []*Vertex) error { const op = errors.Op("print_graph") return errors.E(op, errors.Unsupported, errors.Str("windows currently not supported for this feature")) }