Skip to content

Commit

Permalink
Moving locks to util and some refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Annaraya Narasagond <annarayanarasagond@gmail.com>
Signed-off-by: Annaraya Narasagond <annarayanarasagond@gmail.com> Annaraya Narasagond <annarayanarasagond@gmail.com>
  • Loading branch information
asn1809 committed Sep 5, 2024
1 parent ed18630 commit e127357
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 54 deletions.
45 changes: 45 additions & 0 deletions internal/controller/util/nslock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// SPDX-FileCopyrightText: The RamenDR authors
// SPDX-License-Identifier: Apache-2.0

package util

import (
"sync"
)

// NamespaceLock implements atomic operation for namespace. It will have the namespace
// having multiple vrgs in which VRGs are being processed.
type NamespaceLock struct {
namespace string
mux sync.Mutex
}

// NewNamespaceLock returns new NamespaceLock
func NewNamespaceLock() *NamespaceLock {
return &NamespaceLock{}
}

// TryToAcquireLock tries to acquire the lock for processing VRG in a namespace having
// multiple VRGs and returns true if successful.
// If processing has already begun in the namespace, returns false.
func (nl *NamespaceLock) TryToAcquireLock(namespace string) bool {
nl.mux.Lock()
defer nl.mux.Unlock()

if nl.namespace == namespace {
return false
}

if nl.namespace == "" {
nl.namespace = namespace
}

return true
}

// Release removes lock on the namespace
func (nl *NamespaceLock) Release(namespace string) {
nl.mux.Lock()
defer nl.mux.Unlock()
nl.namespace = ""
}
18 changes: 18 additions & 0 deletions internal/controller/util/nslock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// SPDX-FileCopyrightText: The RamenDR authors
// SPDX-License-Identifier: Apache-2.0

package util_test

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/ramendr/ramen/internal/controller/util"
)

var _ = Describe("Testing Locks", func() {
nsLock := util.NewNamespaceLock()
Expect(nsLock.TryToAcquireLock("test")).To(BeTrue())
Expect(nsLock.TryToAcquireLock("test")).To(BeFalse())
nsLock.Release("test")
Expect(nsLock.TryToAcquireLock("test")).To(BeTrue())
})
83 changes: 29 additions & 54 deletions internal/controller/volumereplicationgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"reflect"
"sync"
"time"

"github.com/go-logr/logr"
Expand All @@ -28,7 +27,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -57,14 +55,7 @@ type VolumeReplicationGroupReconciler struct {
kubeObjects kubeobjects.RequestsManager
RateLimiter *workqueue.RateLimiter
veleroCRsAreWatched bool
locks *NamespaceLock
}

// NamespaceLock implements atomic operation for namespace. It will have the namespace
// having multiple vrgs in which VRGs are being processed.
type NamespaceLock struct {
namespaces sets.String
mux sync.Mutex
locks *rmnutil.NamespaceLock
}

// SetupWithManager sets up the controller with the Manager.
Expand Down Expand Up @@ -125,7 +116,7 @@ func (r *VolumeReplicationGroupReconciler) SetupWithManager(
r.Log.Info("Kube object protection disabled; don't watch kube objects requests")
}

r.locks = NewNamespaceLock()
r.locks = rmnutil.NewNamespaceLock()

return ctrlBuilder.Complete(r)
}
Expand Down Expand Up @@ -449,27 +440,10 @@ func (r *VolumeReplicationGroupReconciler) Reconcile(ctx context.Context, req ct
"Please install velero/oadp and restart the operator", v.instance.Namespace, v.instance.Name)
}

/*
Acquire lock on the namespace if:
1. It is non-admin ns.
2. More than 1 vrg is found within the ns having same label selector
*/
if !adminNamespaceVRG {
vrgList := &ramendrv1alpha1.VolumeReplicationGroupList{}
listOps := &client.ListOptions{
Namespace: req.Namespace,
}
err = r.APIReader.List(context.Background(), vrgList, listOps)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to list VRGs in the namespace: %w", err)
}
if len(vrgList.Items) > 1 {
if isLockAcquired := r.locks.TryToAcquireLock(req.Namespace); !isLockAcquired {
// Acquiring lock failed, VRG should be errored
}
defer r.locks.Release(req.Namespace)
}
err = v.vrgParallelProcessingCheck(adminNamespaceVRG)

if err != nil {
return ctrl.Result{Requeue: true}, err
}

v.volSyncHandler = volsync.NewVSHandler(ctx, r.Client, log, v.instance,
Expand Down Expand Up @@ -1662,30 +1636,31 @@ func (r *VolumeReplicationGroupReconciler) addKubeObjectsOwnsAndWatches(ctrlBuil
return ctrlBuilder
}

// NewNamespaceLock returns new NamespaceLock
func NewNamespaceLock() *NamespaceLock {
return &NamespaceLock{
namespaces: sets.NewString(),
}
}
func (v *VRGInstance) vrgParallelProcessingCheck(adminNamespaceVRG bool) error {
ns := v.instance.Namespace

// TryToAcquireLock tries to acquire the lock for processing VRG in a namespace having
// multiple VRGs and returns true if successful.
// If processing has already begun in the namespace, returns false.
func (nl *NamespaceLock) TryToAcquireLock(namespace string) bool {
nl.mux.Lock()
defer nl.mux.Unlock()
if nl.namespaces.Has(namespace) {
return false
}
nl.namespaces.Insert(namespace)
if !adminNamespaceVRG {
vrgList := &ramendrv1alpha1.VolumeReplicationGroupList{}
listOps := &client.ListOptions{
Namespace: ns,
}
err := v.reconciler.APIReader.List(context.Background(), vrgList, listOps)

return false
}
if err != nil {
v.log.Error(err, "Unable to list the VRGs in the", " namespace ", ns)

// Release removes lock on the namespace
func (nl *NamespaceLock) Release(namespace string) {
nl.mux.Lock()
defer nl.mux.Unlock()
nl.namespaces.Delete(namespace)
return err
}

// if the number of vrgs in the ns is more than 1, lock is needed.
if len(vrgList.Items) > 1 {
if isLockAcquired := v.reconciler.locks.TryToAcquireLock(ns); !isLockAcquired {
// Acquiring lock failed, VRG reconcile should be requeued
return err
}
defer v.reconciler.locks.Release(ns)
}
}

return nil
}

0 comments on commit e127357

Please sign in to comment.