Skip to content

Commit

Permalink
fixup! support gRPC communication between primary and secondary mantl…
Browse files Browse the repository at this point in the history
…e controllers
  • Loading branch information
ushitora-anqou committed Aug 30, 2024
1 parent 6cbd772 commit ffe0e5a
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and Cust
cat config/rbac/role.yaml | yq '.metadata.name = "mantle-controller"' > charts/mantle-cluster-wide/templates/clusterrole.yaml

.PHONY: generate
generate: $(PROTOBUF_GEN) controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
generate: $(PROTOBUF_GEN) controller-gen ## Generate boilerplate code.
$(CONTROLLER_GEN) object paths="./..."

.PHONY: fmt
Expand Down
52 changes: 26 additions & 26 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ var (
setupLog = ctrl.Log.WithName("setup")
)

const (
RoleStandalone = "standalone"
RolePrimary = "primary"
RoleSecondary = "secondary"
)

func init() {
flags := ControllerCmd.Flags()
flags.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
Expand Down Expand Up @@ -97,15 +91,15 @@ func init() {

func checkCommandlineArgs() error {
switch role {
case RoleStandalone:
case controller.RoleStandalone:
// nothing to do
case RolePrimary:
case controller.RolePrimary:
if mantleServiceEndpoint == "" {
return errors.New("--mantle-service-endpoint should be specified if --role is 'primary'")
return errors.New("--mantle-service-endpoint must be specified if --role is 'primary'")
}
case RoleSecondary:
case controller.RoleSecondary:
if mantleServiceEndpoint == "" {
return errors.New("--mantle-service-endpoint should be specified if --role is 'secondary'")
return errors.New("--mantle-service-endpoint must be specified if --role is 'secondary'")
}
default:
return fmt.Errorf("role should be one of 'standalone', 'primary', or 'secondary': %s", role)
Expand All @@ -120,12 +114,16 @@ func setupReconcilers(mgr manager.Manager, primarySettings *controller.PrimarySe
return errors.New("POD_NAMESPACE is empty")
}

backupReconciler := controller.NewMantleBackupReconciler(
backupReconciler, err := controller.NewMantleBackupReconciler(
mgr.GetClient(),
mgr.GetScheme(),
managedCephClusterID,
role,
primarySettings,
)
if err != nil {
return err
}
if err := backupReconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MantleBackup")
return err
Expand All @@ -135,6 +133,7 @@ func setupReconcilers(mgr manager.Manager, primarySettings *controller.PrimarySe
mgr.GetClient(),
mgr.GetScheme(),
managedCephClusterID,
role,
)
if err := restoreReconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MantleRestore")
Expand All @@ -160,12 +159,6 @@ func setupStandalone(mgr manager.Manager) error {
}

func setupPrimary(ctx context.Context, mgr manager.Manager, wg *sync.WaitGroup) error {
managedCephClusterID := os.Getenv("POD_NAMESPACE")
if managedCephClusterID == "" {
setupLog.Error(errors.New("POD_NAMESPACE is empty"), "POD_NAMESPACE is empty")
return errors.New("POD_NAMESPACE is empty")
}

conn, err := grpc.NewClient(
mantleServiceEndpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
Expand All @@ -177,6 +170,8 @@ func setupPrimary(ctx context.Context, mgr manager.Manager, wg *sync.WaitGroup)
setupLog.Error(err, "failed to create a new client for the secondary mantle")
return err
}

wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
Expand All @@ -192,7 +187,7 @@ func setupPrimary(ctx context.Context, mgr manager.Manager, wg *sync.WaitGroup)
return setupReconcilers(mgr, primarySettings)
}

func setupSecondary(ctx context.Context, wg *sync.WaitGroup, cancel context.CancelFunc) error {
func setupSecondary(ctx context.Context, mgr manager.Manager, wg *sync.WaitGroup, cancel context.CancelFunc) error {
logger := ctrl.Log.WithName("grpc")

serv := grpc.NewServer(grpc.ChainUnaryInterceptor(
Expand All @@ -208,22 +203,27 @@ func setupSecondary(ctx context.Context, wg *sync.WaitGroup, cancel context.Canc

l, err := net.Listen("tcp", mantleServiceEndpoint)
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
return fmt.Errorf("failed to listen %s: %w", mantleServiceEndpoint, err)
}

wg.Add(1)
go func() {
defer wg.Done()
_ = serv.Serve(l)
err := serv.Serve(l)
if err != nil {
logger.Error(err, "gRPC server failed")
}
cancel()
}()

wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
serv.GracefulStop()
}()

return nil
return setupReconcilers(mgr, nil)
}

func subMain() error {
Expand Down Expand Up @@ -264,16 +264,16 @@ func subMain() error {
defer cancel()

switch role {
case RoleStandalone:
case controller.RoleStandalone:
if err := setupStandalone(mgr); err != nil {
return err
}
case RolePrimary:
case controller.RolePrimary:
if err := setupPrimary(ctx, mgr, &wg); err != nil {
return err
}
case RoleSecondary:
if err := setupSecondary(ctx, &wg, cancel); err != nil {
case controller.RoleSecondary:
if err := setupSecondary(ctx, mgr, &wg, cancel); err != nil {
return err
}
}
Expand Down
12 changes: 9 additions & 3 deletions internal/controller/mantlebackup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type MantleBackupReconciler struct {
client.Client
Scheme *runtime.Scheme
managedCephClusterID string
primarySettings *PrimarySettings
role string
primarySettings *PrimarySettings // This should be non-nil if and only if role equals 'primary'.
}

type Snapshot struct {
Expand All @@ -44,13 +45,14 @@ const (
)

// NewMantleBackupReconciler returns NodeReconciler.
func NewMantleBackupReconciler(client client.Client, scheme *runtime.Scheme, managedCephClusterID string, primarySettings *PrimarySettings) *MantleBackupReconciler {
func NewMantleBackupReconciler(client client.Client, scheme *runtime.Scheme, managedCephClusterID, role string, primarySettings *PrimarySettings) (*MantleBackupReconciler, error) {
return &MantleBackupReconciler{
Client: client,
Scheme: scheme,
managedCephClusterID: managedCephClusterID,
role: role,
primarySettings: primarySettings,
}
}, nil
}

func executeCommandImpl(command []string, input io.Reader) ([]byte, error) {
Expand Down Expand Up @@ -198,6 +200,10 @@ func (r *MantleBackupReconciler) Reconcile(ctx context.Context, req ctrl.Request
var backup mantlev1.MantleBackup
logger := logger.With("MantleBackup", req.NamespacedName)

if r.role == RoleSecondary {
return ctrl.Result{}, nil
}

err := r.Get(ctx, req.NamespacedName, &backup)
if errors.IsNotFound(err) {
logger.Info("MantleBackup is not found", "name", backup.Name, "error", err)
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/mantlebackup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ func mockExecuteCommand(command []string, input io.Reader) ([]byte, error) {
var _ = Describe("MantleBackup controller", func() {
ctx := context.Background()
var mgrUtil util.ManagerUtil
var reconciler *MantleBackupReconciler

storageClassClusterID := dummyStorageClassClusterID
storageClassName := dummyStorageClassName

BeforeEach(func() {
mgrUtil = util.NewManagerUtil(ctx, cfg, scheme.Scheme)

reconciler = NewMantleBackupReconciler(k8sClient, mgrUtil.GetScheme(), storageClassClusterID, nil)
err := reconciler.SetupWithManager(mgrUtil.GetManager())
reconciler, err := NewMantleBackupReconciler(k8sClient, mgrUtil.GetScheme(), storageClassClusterID, RoleStandalone, nil)
Expect(err).NotTo(HaveOccurred())
err = reconciler.SetupWithManager(mgrUtil.GetManager())
Expect(err).NotTo(HaveOccurred())

executeCommand = mockExecuteCommand
Expand Down
11 changes: 8 additions & 3 deletions internal/controller/mantlerestore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type MantleRestoreReconciler struct {
Scheme *runtime.Scheme
managedCephClusterID string
ceph ceph.CephCmd
role string
}

const (
Expand All @@ -40,24 +41,28 @@ const (
// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get;list;watch;create;update;patch;delete

func NewMantleRestoreReconciler(cli client.Client, scheme *runtime.Scheme, managedCephClusterID string) *MantleRestoreReconciler {
func NewMantleRestoreReconciler(cli client.Client, scheme *runtime.Scheme, managedCephClusterID, role string) *MantleRestoreReconciler {
return &MantleRestoreReconciler{
Client: cli,
Scheme: scheme,
managedCephClusterID: managedCephClusterID,
ceph: ceph.NewCephCmd(),
role: role,
}
}

func (r *MantleRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := logger.With("MantleRestore", req.NamespacedName)

if r.role == RoleSecondary {
return ctrl.Result{}, nil
}

var restore mantlev1.MantleRestore
err := r.Get(ctx, req.NamespacedName, &restore)
if errors.IsNotFound(err) {
logger.Info("MantleRestore resource not found", "name", req.Name, "error", err)
return ctrl.
Result{}, nil
return ctrl.Result{}, nil
}
if err != nil {
logger.Error("failed to get MantleRestore", "name", req.NamespacedName, "error", err)
Expand Down
7 changes: 4 additions & 3 deletions internal/controller/mantlerestore_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@ func (test *mantleRestoreControllerUnitTest) setupEnv() {
By("prepare MantleBackup reconciler")
executeCommand = mockExecuteCommand
test.mgrUtil = testutil.NewManagerUtil(ctx, cfg, scheme.Scheme)
backupReconciler := NewMantleBackupReconciler(k8sClient, test.mgrUtil.GetScheme(), test.cephClusterID, nil)
err := backupReconciler.SetupWithManager(test.mgrUtil.GetManager())
backupReconciler, err := NewMantleBackupReconciler(k8sClient, test.mgrUtil.GetScheme(), test.cephClusterID, RoleStandalone, nil)
Expect(err).NotTo(HaveOccurred())
err = backupReconciler.SetupWithManager(test.mgrUtil.GetManager())
Expect(err).NotTo(HaveOccurred())

By("prepare MantleRestore reconciler")
// just allocate the reconciler, and does not start it.
test.reconciler = NewMantleRestoreReconciler(k8sClient, test.mgrUtil.GetScheme(), test.cephClusterID)
test.reconciler = NewMantleRestoreReconciler(k8sClient, test.mgrUtil.GetScheme(), test.cephClusterID, RoleStandalone)

test.mgrUtil.Start()
time.Sleep(100 * time.Millisecond)
Expand Down
6 changes: 6 additions & 0 deletions internal/controller/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ import (
"google.golang.org/grpc"
)

const (
RoleStandalone = "standalone"
RolePrimary = "primary"
RoleSecondary = "secondary"
)

type PrimarySettings struct {
ServiceEndpoint string
Conn *grpc.ClientConn
Expand Down

0 comments on commit ffe0e5a

Please sign in to comment.