From 27bd4b8b9dca7578d79092a640cd415e9211a058 Mon Sep 17 00:00:00 2001 From: Erik Kristensen Date: Sat, 13 Jan 2024 18:01:26 -0700 Subject: [PATCH] feat: initial implementing of passing context --- pkg/nuke/nuke.go | 40 ++++++++++++++++++----------------- pkg/nuke/nuke_test.go | 17 ++++++++------- pkg/nuke/scan.go | 14 ++++++------ pkg/nuke/scan_test.go | 11 +++++----- pkg/queue/item.go | 5 +++-- pkg/resource/registry.go | 3 ++- pkg/resource/registry_test.go | 3 ++- 7 files changed, 51 insertions(+), 42 deletions(-) diff --git a/pkg/nuke/nuke.go b/pkg/nuke/nuke.go index e3f019c..fbe33af 100644 --- a/pkg/nuke/nuke.go +++ b/pkg/nuke/nuke.go @@ -3,6 +3,7 @@ package nuke import ( + "context" "fmt" "io" "slices" @@ -161,7 +162,7 @@ func (n *Nuke) Prompt() error { // Run is the main entry point for the library. It will run the validation handlers, prompt the user, scan for // resources, filter them and then process them. -func (n *Nuke) Run() error { +func (n *Nuke) Run(ctx context.Context) error { n.Version() if err := n.Validate(); err != nil { @@ -172,7 +173,7 @@ func (n *Nuke) Run() error { return err } - if err := n.Scan(); err != nil { + if err := n.Scan(ctx); err != nil { return err } @@ -190,7 +191,7 @@ func (n *Nuke) Run() error { return err } - if err := n.run(); err != nil { + if err := n.run(ctx); err != nil { return err } @@ -201,12 +202,12 @@ func (n *Nuke) Run() error { } // run handles the processing and loop of the queue of items -func (n *Nuke) run() error { +func (n *Nuke) run(ctx context.Context) error { failCount := 0 waitingCount := 0 for { - n.HandleQueue() + n.HandleQueue(ctx) if n.Queue.Count( queue.ItemStatePending, @@ -289,14 +290,14 @@ func (n *Nuke) Validate() error { // Scan is used to scan for resources. It will run the scanners that were registered with the library by the invoking // tool. It will also filter the resources based on the filters that were registered. It will also print the current // status of the resources. -func (n *Nuke) Scan() error { +func (n *Nuke) Scan(ctx context.Context) error { itemQueue := queue.Queue{ Items: make([]*queue.Item, 0), } for _, scanners := range n.Scanners { for _, scanner := range scanners { - err := scanner.Run() + err := scanner.Run(ctx) if err != nil { return err } @@ -408,27 +409,27 @@ func (n *Nuke) Filter(item *queue.Item) error { // HandleQueue is used to handle the queue of resources. It will iterate over the queue and trigger the appropriate // handlers based on the state of the resource. -func (n *Nuke) HandleQueue() { +func (n *Nuke) HandleQueue(ctx context.Context) { listCache := make(map[string]map[string][]resource.Resource) for _, item := range n.Queue.GetItems() { switch item.GetState() { case queue.ItemStateNew: - n.HandleRemove(item) + n.HandleRemove(ctx, item) item.Print() case queue.ItemStateNewDependency, queue.ItemStatePendingDependency: - n.HandleWaitDependency(item) + n.HandleWaitDependency(ctx, item) item.Print() case queue.ItemStateFailed: - n.HandleRemove(item) - n.HandleWait(item, listCache) + n.HandleRemove(ctx, item) + n.HandleWait(ctx, item, listCache) item.Print() case queue.ItemStatePending: - n.HandleWait(item, listCache) + n.HandleWait(ctx, item, listCache) item.State = queue.ItemStateWaiting item.Print() case queue.ItemStateWaiting: - n.HandleWait(item, listCache) + n.HandleWait(ctx, item, listCache) item.Print() } } @@ -450,7 +451,8 @@ func (n *Nuke) HandleQueue() { // HandleRemove is used to handle the removal of a resource. It will remove the resource and set the state of the // resource to pending if it was successful or failed if it was not. -func (n *Nuke) HandleRemove(item *queue.Item) { +func (n *Nuke) HandleRemove(_ context.Context, item *queue.Item) { + // TODO: pass context to remove for remove functions to use err := item.Resource.Remove() if err != nil { item.State = queue.ItemStateFailed @@ -464,7 +466,7 @@ func (n *Nuke) HandleRemove(item *queue.Item) { // HandleWaitDependency is used to handle the waiting of a resource. It will check if the resource has any dependencies // and if it does, it will check if the dependencies have been removed. If they have, it will trigger the remove handler. -func (n *Nuke) HandleWaitDependency(item *queue.Item) { +func (n *Nuke) HandleWaitDependency(ctx context.Context, item *queue.Item) { reg := resource.GetRegistration(item.Type) depCount := 0 for _, dep := range reg.DependsOn { @@ -476,7 +478,7 @@ func (n *Nuke) HandleWaitDependency(item *queue.Item) { } if depCount == 0 { - n.HandleRemove(item) + n.HandleRemove(ctx, item) return } @@ -486,7 +488,7 @@ func (n *Nuke) HandleWaitDependency(item *queue.Item) { // HandleWait is used to handle the waiting of a resource. It will check if the resource has been removed. If it has, // it will set the state of the resource to finished. If it has not, it will set the state of the resource to waiting. -func (n *Nuke) HandleWait(item *queue.Item, cache ListCache) { +func (n *Nuke) HandleWait(ctx context.Context, item *queue.Item, cache ListCache) { var err error ownerID := item.Owner @@ -497,7 +499,7 @@ func (n *Nuke) HandleWait(item *queue.Item, cache ListCache) { left, ok := cache[ownerID][item.Type] if !ok { - left, err = item.List(item.Opts) + left, err = item.List(ctx, item.Opts) if err != nil { item.State = queue.ItemStateFailed item.Reason = err.Error() diff --git a/pkg/nuke/nuke_test.go b/pkg/nuke/nuke_test.go index 5c77855..5779cbf 100644 --- a/pkg/nuke/nuke_test.go +++ b/pkg/nuke/nuke_test.go @@ -1,6 +1,7 @@ package nuke import ( + "context" "fmt" "io" "os" @@ -243,7 +244,7 @@ func Test_Nuke_Scan(t *testing.T) { sErr := n.RegisterScanner(testScope, scanner) assert.NoError(t, sErr) - err := n.Scan() + err := n.Scan(context.TODO()) assert.NoError(t, err) assert.Equal(t, 2, n.Queue.Total()) @@ -280,7 +281,7 @@ func Test_Nuke_Filters_Match(t *testing.T) { sErr := n.RegisterScanner(testScope, scanner) assert.NoError(t, sErr) - err := n.Scan() + err := n.Scan(context.TODO()) assert.NoError(t, err) assert.Equal(t, 1, n.Queue.Total()) assert.Equal(t, 1, n.Queue.Count(queue.ItemStateFiltered)) @@ -315,7 +316,7 @@ func Test_Nuke_Filters_NoMatch(t *testing.T) { sErr := n.RegisterScanner(testScope, scanner) assert.NoError(t, sErr) - err := n.Scan() + err := n.Scan(context.TODO()) assert.NoError(t, err) assert.Equal(t, 1, n.Queue.Total()) assert.Equal(t, 0, n.Queue.Count(queue.ItemStateFiltered)) @@ -349,7 +350,7 @@ func Test_Nuke_Filters_ErrorCustomProps(t *testing.T) { sErr := n.RegisterScanner(testScope, scanner) assert.NoError(t, sErr) - err := n.Scan() + err := n.Scan(context.TODO()) assert.Error(t, err) assert.Equal(t, "*nuke.TestResource does not support custom properties", err.Error()) } @@ -423,7 +424,7 @@ func Test_Nuke_HandleRemove(t *testing.T) { State: queue.ItemStateNew, } - n.HandleRemove(i) + n.HandleRemove(context.TODO(), i) assert.Equal(t, queue.ItemStatePending, i.State) } @@ -441,7 +442,7 @@ func Test_Nuke_HandleRemoveError(t *testing.T) { State: queue.ItemStateNew, } - n.HandleRemove(i) + n.HandleRemove(context.TODO(), i) assert.Equal(t, queue.ItemStateFailed, i.State) } @@ -471,7 +472,7 @@ func Test_Nuke_Run(t *testing.T) { sErr := n.RegisterScanner(testScope, scanner) assert.NoError(t, sErr) - err := n.Run() + err := n.Run(context.TODO()) assert.NoError(t, err) } @@ -505,6 +506,6 @@ func Test_Nuke_Run_Error(t *testing.T) { sErr := n.RegisterScanner(testScope, scanner) assert.NoError(t, sErr) - err := n.Run() + err := n.Run(context.TODO()) assert.NoError(t, err) } diff --git a/pkg/nuke/scan.go b/pkg/nuke/scan.go index bc3ea6a..c02e588 100644 --- a/pkg/nuke/scan.go +++ b/pkg/nuke/scan.go @@ -62,9 +62,7 @@ func (s *Scanner) RegisterMutateOptsFunc(morph MutateOptsFunc) { } // Run starts the scanner and runs the lister for each resource type. -func (s *Scanner) Run() error { - ctx := context.Background() - +func (s *Scanner) Run(ctx context.Context) error { for _, resourceType := range s.resourceTypes { err := s.semaphore.Acquire(ctx, 1) if err != nil { @@ -75,7 +73,7 @@ func (s *Scanner) Run() error { opts = s.mutateOptsFunc(opts, resourceType) } - go s.list(s.owner, resourceType, opts) + go s.list(ctx, s.owner, resourceType, opts) } // Wait for all routines to finish. @@ -89,7 +87,10 @@ func (s *Scanner) Run() error { return nil } -func (s *Scanner) list(owner, resourceType string, opts interface{}) { +func (s *Scanner) list(ctx context.Context, owner, resourceType string, opts interface{}) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer func() { if r := recover(); r != nil { err := fmt.Errorf("%v\n\n%s", r.(error), string(debug.Stack())) @@ -97,12 +98,13 @@ func (s *Scanner) list(owner, resourceType string, opts interface{}) { logrus.Errorf("Listing %s failed:\n%s", resourceType, dump) } }() + defer s.semaphore.Release(1) lister := resource.GetLister(resourceType) var rs []resource.Resource - rs, err := lister.List(opts) + rs, err := lister.List(ctx, opts) if err != nil { var errSkipRequest sdkerrors.ErrSkipRequest ok := errors.As(err, &errSkipRequest) diff --git a/pkg/nuke/scan_test.go b/pkg/nuke/scan_test.go index 41a2a40..f4c8690 100644 --- a/pkg/nuke/scan_test.go +++ b/pkg/nuke/scan_test.go @@ -1,6 +1,7 @@ package nuke import ( + "context" "flag" "fmt" "io" @@ -102,7 +103,7 @@ type TestResourceLister struct { RemoveError bool } -func (l TestResourceLister) List(o interface{}) ([]resource.Resource, error) { +func (l TestResourceLister) List(_ context.Context, o interface{}) ([]resource.Resource, error) { opts := o.(TestOpts) if opts.ThrowError { @@ -182,7 +183,7 @@ func Test_NewScannerWithMorphOpts(t *testing.T) { scanner := NewScanner("owner", []string{testResourceType}, opts) scanner.RegisterMutateOptsFunc(morphOpts) - err := scanner.Run() + err := scanner.Run(context.TODO()) assert.NoError(t, err) assert.Len(t, scanner.Items, 1) @@ -215,7 +216,7 @@ func Test_NewScannerWithResourceListerError(t *testing.T) { } scanner := NewScanner("owner", []string{testResourceType}, opts) - err := scanner.Run() + err := scanner.Run(context.TODO()) assert.NoError(t, err) assert.Len(t, scanner.Items, 0) @@ -248,7 +249,7 @@ func Test_NewScannerWithResourceListerErrorSkip(t *testing.T) { } scanner := NewScanner("owner", []string{testResourceType}, opts) - err := scanner.Run() + err := scanner.Run(context.TODO()) assert.NoError(t, err) assert.Len(t, scanner.Items, 0) @@ -281,7 +282,7 @@ func Test_NewScannerWithResourceListerErrorUnknownEndpoint(t *testing.T) { } scanner := NewScanner("owner", []string{testResourceType}, opts) - err := scanner.Run() + err := scanner.Run(context.TODO()) assert.NoError(t, err) assert.Len(t, scanner.Items, 0) diff --git a/pkg/queue/item.go b/pkg/queue/item.go index 74fde2a..cc8a158 100644 --- a/pkg/queue/item.go +++ b/pkg/queue/item.go @@ -1,6 +1,7 @@ package queue import ( + "context" "fmt" "github.com/ekristen/libnuke/pkg/featureflag" @@ -53,8 +54,8 @@ func (i *Item) GetReason() string { // List calls the List method for the lister for the Type that belongs to the Item which returns // a list of resources or an error. This primarily is used for the HandleWait function. -func (i *Item) List(opts interface{}) ([]resource.Resource, error) { - return resource.GetLister(i.Type).List(opts) +func (i *Item) List(ctx context.Context, opts interface{}) ([]resource.Resource, error) { + return resource.GetLister(i.Type).List(ctx, opts) } // GetProperty retrieves the string value of a property on the Item's Resource if it exists. diff --git a/pkg/resource/registry.go b/pkg/resource/registry.go index f3a48a9..1f7d4e8 100644 --- a/pkg/resource/registry.go +++ b/pkg/resource/registry.go @@ -1,6 +1,7 @@ package resource import ( + "context" "fmt" "github.com/sirupsen/logrus" @@ -40,7 +41,7 @@ type Registration struct { // Lister is an interface that represents a resource that can be listed type Lister interface { - List(opts interface{}) ([]Resource, error) + List(ctx context.Context, opts interface{}) ([]Resource, error) } // RegisterOption is a function that can be used to manipulate the lister for a given resource type at diff --git a/pkg/resource/registry_test.go b/pkg/resource/registry_test.go index 7cebd36..15546dc 100644 --- a/pkg/resource/registry_test.go +++ b/pkg/resource/registry_test.go @@ -1,6 +1,7 @@ package resource import ( + "context" "fmt" "testing" @@ -9,7 +10,7 @@ import ( type TestLister struct{} -func (l TestLister) List(o interface{}) ([]Resource, error) { return nil, nil } +func (l TestLister) List(_ context.Context, o interface{}) ([]Resource, error) { return nil, nil } func Test_RegisterNoScope(t *testing.T) { ClearRegistry()