Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share manager HA (backport #2811) #2994

Merged
merged 21 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ccc53c5
Share manager HA
james-munson May 4, 2024
4bdb74a
Removed unused cleanupShareManagerService function
PhanLe1010 Jul 17, 2024
70b3b76
Fix recovery backend failure
PhanLe1010 Jul 18, 2024
594d57b
Code refinement: refactor the isResponsibleFor
PhanLe1010 Jul 19, 2024
47ed540
Code refinement: remove the logic to remember and add node affinity
PhanLe1010 Jul 19, 2024
7590d3f
Code refinement: refactor ownership transfer for volume/engine/replica
PhanLe1010 Jul 20, 2024
91eb0e6
Fix spelling typo. Add a debug output.
james-munson Jul 17, 2024
0ec3bc7
Add envariables to set config params in share-manager pod.
james-munson Jul 20, 2024
e1c8c94
Code refinement: refactor engine/replica controller to try to delete …
PhanLe1010 Jul 20, 2024
7f9fdac
Update k8s/pkg/apis/longhorn/v1beta2/node.go
james-munson Jul 20, 2024
139d9a0
Code refinement: refactor engine/replica controller to try to delete …
PhanLe1010 Jul 20, 2024
5460f0c
Get rid of babbling debug log line.
james-munson Jul 20, 2024
6df97c7
Fix up the recovery backend service selector.
james-munson Jul 21, 2024
29432bc
Reenable the RWX service deletion logic from ChinYa
PhanLe1010 Jul 22, 2024
3576af4
Code refinement: remove debugging comments
PhanLe1010 Jul 22, 2024
ccddd14
Update controller/engine_controller.go
james-munson Jul 22, 2024
0ed72bc
Remove pod infomer in engine controller and fix some NIT for Eric's
PhanLe1010 Jul 22, 2024
8e38ad2
Remove enqueueShareManagerPodChange
PhanLe1010 Jul 22, 2024
790ff49
Revert share manager pod informer back to share manager CR informer
PhanLe1010 Jul 22, 2024
a99a20f
Rename RWX volume fast failover setting.
james-munson Jul 22, 2024
cdd2a96
Fix Derek's review comments
PhanLe1010 Jul 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions app/daemon.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app

import (
"context"
"fmt"
"net/http"
_ "net/http/pprof" // for runtime profiling
Expand All @@ -11,6 +12,7 @@ import (
"github.com/rancher/wrangler/pkg/signals"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/longhorn/go-iscsi-helper/iscsi"

Expand Down Expand Up @@ -144,21 +146,46 @@ func startManager(c *cli.Context) error {
return fmt.Errorf("failed to detect the node IP")
}

podName, err := util.GetRequiredEnv(types.EnvPodName)
if err != nil {
return fmt.Errorf("failed to detect the manager pod name")
}

podNamespace, err := util.GetRequiredEnv(types.EnvPodNamespace)
if err != nil {
return fmt.Errorf("failed to detect the manager pod namespace")
}

ctx := signals.SetupSignalContext()

logger := logrus.StandardLogger().WithField("node", currentNodeID)

// Conversion webhook needs to be started first since we use its port 9501 as readiness port.
// longhorn-manager pod becomes ready only when conversion webhook is running.
// The services in the longhorn-manager can then start to receive the requests.
// Conversion webhook does not longhorn datastore.
// Conversion webhook does not use datastore, since it is a prerequisite for
// datastore operation.
clientsWithoutDatastore, err := client.NewClients(kubeconfigPath, false, ctx.Done())
if err != nil {
return err
}
if err := webhook.StartWebhook(ctx, types.WebhookTypeConversion, clientsWithoutDatastore); err != nil {
return err
}

// This adds the label for the conversion webhook's selector. We do it the hard way without datastore to avoid chicken-and-egg.
pod, err := clientsWithoutDatastore.Clients.K8s.CoreV1().Pods(podNamespace).Get(context.Background(), podName, v1.GetOptions{})
if err != nil {
return err
}
labels := types.GetConversionWebhookLabel()
for key, value := range labels {
pod.Labels[key] = value
}
_, err = clientsWithoutDatastore.Clients.K8s.CoreV1().Pods(podNamespace).Update(context.Background(), pod, v1.UpdateOptions{})
if err != nil {
return err
}
if err := webhook.CheckWebhookServiceAvailability(types.WebhookTypeConversion); err != nil {
return err
}
Expand All @@ -167,9 +194,13 @@ func startManager(c *cli.Context) error {
if err != nil {
return err
}

if err := webhook.StartWebhook(ctx, types.WebhookTypeAdmission, clients); err != nil {
return err
}
if err := clients.Datastore.AddLabelToManagerPod(currentNodeID, types.GetAdmissionWebhookLabel()); err != nil {
return err
}
if err := webhook.CheckWebhookServiceAvailability(types.WebhookTypeAdmission); err != nil {
return err
}
Expand All @@ -178,6 +209,10 @@ func startManager(c *cli.Context) error {
return err
}

if err := clients.Datastore.AddLabelToManagerPod(currentNodeID, types.GetRecoveryBackendLabel()); err != nil {
return err
}

if err := upgrade.Upgrade(kubeconfigPath, currentNodeID, managerImage, c.Bool(FlagUpgradeVersionCheck)); err != nil {
return err
}
Expand Down Expand Up @@ -209,7 +244,7 @@ func startManager(c *cli.Context) error {
return err
}

if err := initDaemonNode(clients.Datastore); err != nil {
if err := initDaemonNode(clients.Datastore, currentNodeID); err != nil {
return err
}

Expand Down Expand Up @@ -286,8 +321,7 @@ func updateRegistrySecretName(m *manager.VolumeManager) error {
return nil
}

func initDaemonNode(ds *datastore.DataStore) error {
nodeName := os.Getenv("NODE_NAME")
func initDaemonNode(ds *datastore.DataStore, nodeName string) error {
if _, err := ds.GetNode(nodeName); err != nil {
// init default disk on node when starting longhorn-manager
if datastore.ErrorIsNotFound(err) {
Expand Down
45 changes: 32 additions & 13 deletions controller/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func (ec *EngineController) syncEngine(key string) (err error) {
if !isResponsible {
return nil
}

if engine.Status.OwnerID != ec.controllerID {
engine.Status.OwnerID = ec.controllerID
engine, err = ec.ds.UpdateEngineStatus(engine)
Expand Down Expand Up @@ -455,7 +456,6 @@ func (ec *EngineController) enqueueInstanceManagerChange(obj interface{}) {
for _, e := range engineMap {
ec.enqueueEngine(e)
}

}

func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceProcess, error) {
Expand Down Expand Up @@ -556,16 +556,9 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) {
}
}

v, err := ec.ds.GetVolumeRO(e.Spec.VolumeName)
isRWXVolume, err := ec.ds.IsRegularRWXVolume(e.Spec.VolumeName)
if err != nil {
if !apierrors.IsNotFound(err) {
return err
}
}

isRWXVolume := false
if v != nil && v.Spec.AccessMode == longhorn.AccessModeReadWriteMany && !v.Spec.Migratable {
isRWXVolume = true
return err
}

// For a RWX volume, the node down, for example, caused by kubelet restart, leads to share-manager pod deletion/recreation
Expand All @@ -580,16 +573,21 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) {
}
}

isDelinquent, err := ec.ds.IsNodeDelinquent(im.Spec.NodeID, e.Spec.VolumeName)
if err != nil {
return err
}

log.Info("Deleting engine instance")

defer func() {
if err != nil {
log.WithError(err).Warnf("Failed to delete engine %v", e.Name)
}
if isRWXVolume && im.Status.CurrentState != longhorn.InstanceManagerStateRunning {
if isRWXVolume && (im.Status.CurrentState != longhorn.InstanceManagerStateRunning || isDelinquent) {
// Try the best to delete engine instance.
// To prevent that the volume is stuck at detaching state, ignore the error when volume is
// a RWX volume and the instance manager is not running.
// a RWX volume and the instance manager is not running or the RWX volume is currently delinquent.
//
// If the engine instance of a RWX volume is not deleted successfully:
// If a RWX volume is on node A and the network of this node is partitioned,
Expand All @@ -599,7 +597,13 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) {
// After shifting to node A, the first reattachment fail due to the IO error resulting from the
// orphaned engine instance and block device. Then, the detachment will trigger the teardown of the
// problematic engine instance and block device. The next reattachment then will succeed.
log.Warnf("Ignored the failure of deleting engine %v", e.Name)
if im.Status.CurrentState != longhorn.InstanceManagerStateRunning {
log.Warnf("Ignored the failure of deleting engine %v because im.Status.CurrentState is %v", e.Name, im.Status.CurrentState)
}
if isDelinquent {
log.Warnf("Ignored the failure of deleting engine %v because the RWX volume is currently delinquent", e.Name)
}

err = nil
}
}()
Expand Down Expand Up @@ -2228,6 +2232,21 @@ func (ec *EngineController) isResponsibleFor(e *longhorn.Engine, defaultEngineIm
err = errors.Wrap(err, "error while checking isResponsibleFor")
}()

// If a regular RWX is delinquent, try to switch ownership quickly to the node of the newly created share-manager pod
isDelinquent, err := ec.ds.IsNodeDelinquent(e.Status.OwnerID, e.Spec.VolumeName)
if err != nil {
return false, err
}
if isDelinquent {
pod, err := ec.ds.GetPodRO(ec.namespace, types.GetShareManagerPodNameFromShareManagerName(e.Spec.VolumeName))
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
if pod != nil && ec.controllerID == pod.Spec.NodeName {
return true, nil
}
}

isResponsible := isControllerResponsibleFor(ec.controllerID, ec.ds, e.Name, e.Spec.NodeID, e.Status.OwnerID)

// The engine is not running, the owner node doesn't need to have e.Status.CurrentImage
Expand Down
9 changes: 7 additions & 2 deletions controller/instance_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@ func (h *InstanceHandler) syncStatusWithInstanceManager(im *longhorn.InstanceMan
}
}()

if im == nil || im.Status.CurrentState == longhorn.InstanceManagerStateUnknown {
isDelinquent := false
if im != nil {
isDelinquent, _ = h.ds.IsNodeDelinquent(im.Spec.NodeID, spec.VolumeName)
}

if im == nil || im.Status.CurrentState == longhorn.InstanceManagerStateUnknown || isDelinquent {
if status.Started {
if status.CurrentState != longhorn.InstanceStateUnknown {
logrus.Warnf("Marking the instance as state UNKNOWN since the related node %v of instance %v is down or deleted", spec.NodeID, instanceName)
Expand Down Expand Up @@ -281,7 +286,7 @@ func (h *InstanceHandler) ReconcileInstanceState(obj interface{}, spec *longhorn
return nil
}
// The related node maybe cleaned up then there is no available instance manager for this instance (typically it's replica).
isNodeDownOrDeleted, err := h.ds.IsNodeDownOrDeleted(spec.NodeID)
isNodeDownOrDeleted, err := h.ds.IsNodeDownOrDeletedOrDelinquent(spec.NodeID, spec.VolumeName)
if err != nil {
return err
}
Expand Down
10 changes: 9 additions & 1 deletion controller/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (nc *NodeController) syncNode(key string) (err error) {

existingNode := node.DeepCopy()
defer func() {
// we're going to update volume assume things changes
// we're going to update node assume things changes
if err == nil && !reflect.DeepEqual(existingNode.Status, node.Status) {
_, err = nc.ds.UpdateNodeStatus(node)
}
Expand Down Expand Up @@ -519,6 +519,14 @@ func (nc *NodeController) syncNode(key string) (err error) {
return nil
}

// Getting here is enough proof of life to turn on the services that might
// have been turned off for RWX failover.
labels := types.MergeStringMaps(types.GetAdmissionWebhookLabel(), types.GetRecoveryBackendLabel())
if err := nc.ds.AddLabelToManagerPod(node.Name, labels); err != nil {
log.WithError(err).Error("Failed to restore its admission webhook and recovery backend")
return err
}

// Create a monitor for collecting disk information
if _, err := nc.createDiskMonitor(); err != nil {
return err
Expand Down
43 changes: 38 additions & 5 deletions controller/replica_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,11 @@ func (rc *ReplicaController) syncReplica(key string) (err error) {

log := getLoggerForReplica(rc.logger, replica)

if !rc.isResponsibleFor(replica) {
isResponsible, err := rc.isResponsibleFor(replica)
if err != nil {
return err
}
if !isResponsible {
return nil
}
if replica.Status.OwnerID != rc.controllerID {
Expand Down Expand Up @@ -510,7 +514,7 @@ func (rc *ReplicaController) CanStartRebuildingReplica(r *longhorn.Replica) (boo
return true, nil
}

func (rc *ReplicaController) DeleteInstance(obj interface{}) error {
func (rc *ReplicaController) DeleteInstance(obj interface{}) (err error) {
r, ok := obj.(*longhorn.Replica)
if !ok {
return fmt.Errorf("invalid object for replica instance deletion: %v", obj)
Expand All @@ -524,7 +528,6 @@ func (rc *ReplicaController) DeleteInstance(obj interface{}) error {
}

var im *longhorn.InstanceManager
var err error
// Not assigned or not updated, try best to delete
if r.Status.InstanceManagerName == "" {
if r.Spec.NodeID == "" {
Expand Down Expand Up @@ -553,6 +556,21 @@ func (rc *ReplicaController) DeleteInstance(obj interface{}) error {
return nil
}

isDelinquent, err := rc.ds.IsNodeDelinquent(im.Spec.NodeID, r.Spec.VolumeName)
if err != nil {
return err
}

defer func() {
if err != nil {
log.WithError(err).Warnf("Failed to delete replica process %v", r.Name)
if isDelinquent {
log.Warnf("Ignored the failure of deleting replica process %v because the RWX volume is currently delinquent", r.Name)
err = nil
}
}
}()

c, err := engineapi.NewInstanceManagerClient(im)
if err != nil {
return err
Expand Down Expand Up @@ -900,8 +918,23 @@ func (rc *ReplicaController) enqueueAllRebuildingReplicaOnCurrentNode() {
}
}

func (rc *ReplicaController) isResponsibleFor(r *longhorn.Replica) bool {
return isControllerResponsibleFor(rc.controllerID, rc.ds, r.Name, r.Spec.NodeID, r.Status.OwnerID)
func (rc *ReplicaController) isResponsibleFor(r *longhorn.Replica) (bool, error) {
// If a regular RWX is delinquent, try to switch ownership quickly to the node of the newly created share-manager pod
isDelinquent, err := rc.ds.IsNodeDelinquent(r.Status.OwnerID, r.Spec.VolumeName)
if err != nil {
return false, err
}
if isDelinquent {
pod, err := rc.ds.GetPodRO(rc.namespace, types.GetShareManagerPodNameFromShareManagerName(r.Spec.VolumeName))
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
if pod != nil && rc.controllerID == pod.Spec.NodeName {
return true, nil
}
}

return isControllerResponsibleFor(rc.controllerID, rc.ds, r.Name, r.Spec.NodeID, r.Status.OwnerID), nil
}

func hasMatchingReplica(replica *longhorn.Replica, replicas map[string]*longhorn.Replica) bool {
Expand Down
Loading