Skip to content

Commit

Permalink
Merge branch 'master' into issue-8456
Browse files Browse the repository at this point in the history
  • Loading branch information
innobead committed May 2, 2024
2 parents 7d803c7 + 4e7f281 commit ac947eb
Show file tree
Hide file tree
Showing 953 changed files with 80,334 additions and 30,712 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha }}

# Build binaries
- name: Run ci
Expand Down
12 changes: 7 additions & 5 deletions controller/backup_target_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,9 @@ func getBackupTarget(controllerID string, backupTarget *longhorn.BackupTarget, d
return nil, nil, errors.Wrap(err, "failed to get available data engine for getting backup target")
}

instanceManager, err := ds.GetDefaultInstanceManagerByNodeRO(controllerID, dataEngine)
instanceManager, err := ds.GetRunningInstanceManagerByNodeRO(controllerID, dataEngine)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to get default engine instance manager for proxy client")
return nil, nil, errors.Wrap(err, "failed to get running instance manager for proxy client")
}

engineClientProxy, err = engineapi.NewEngineClientProxy(instanceManager, log, proxyConnCounter)
Expand Down Expand Up @@ -635,10 +635,12 @@ func (btc *BackupTargetController) isResponsibleFor(bt *longhorn.BackupTarget, d
return false, err
}

if instanceManager, err := btc.ds.GetDefaultInstanceManagerByNodeRO(btc.controllerID, ""); err != nil {
instanceManager, err := btc.ds.GetRunningInstanceManagerByNodeRO(btc.controllerID, "")
if err != nil {
return false, err
} else if instanceManager == nil || instanceManager.Status.CurrentState != longhorn.InstanceManagerStateRunning {
return false, errors.New("failed to get default running instance manager")
}
if instanceManager == nil {
return false, errors.New("failed to get running instance manager")
}

isPreferredOwner := currentNodeEngineAvailable && isResponsible
Expand Down
10 changes: 0 additions & 10 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
rbacv1 "k8s.io/api/rbac/v1"
storagev1 "k8s.io/api/storage/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
Expand Down Expand Up @@ -355,15 +354,6 @@ func newDeployment(name string, spec appsv1.DeploymentSpec) *appsv1.Deployment {
}
}

func newPodSecurityPolicy(spec policyv1beta1.PodSecurityPolicySpec) *policyv1beta1.PodSecurityPolicy {
return &policyv1beta1.PodSecurityPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: TestPodSecurityPolicyName,
},
Spec: spec,
}
}

func newRecurringJob(name string, spec longhorn.RecurringJobSpec) *longhorn.RecurringJob {
return &longhorn.RecurringJob{
ObjectMeta: metav1.ObjectMeta{
Expand Down
11 changes: 8 additions & 3 deletions controller/instance_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,15 +455,20 @@ func (imc *InstanceManagerController) syncInstanceStatus(im *longhorn.InstanceMa
return nil
}

func (imc *InstanceManagerController) syncLogSettingsToIMPod(im *longhorn.InstanceManager) error {
func (imc *InstanceManagerController) syncLogSettingsToInstanceManagerPod(im *longhorn.InstanceManager) error {
if types.IsDataEngineV1(im.Spec.DataEngine) {
return nil
}

if im.Status.CurrentState != longhorn.InstanceManagerStateRunning {
return nil
}

client, err := engineapi.NewInstanceManagerClient(im)
if err != nil {
return errors.Wrapf(err, "failed to create instance manager client for %v", im.Name)
}
defer client.Close()

settingNames := []types.SettingName{
types.SettingNameV2DataEngineLogLevel,
Expand Down Expand Up @@ -501,9 +506,9 @@ func (imc *InstanceManagerController) handlePod(im *longhorn.InstanceManager) er
return err
}

err = imc.syncLogSettingsToIMPod(im)
err = imc.syncLogSettingsToInstanceManagerPod(im)
if err != nil {
log.WithError(err).Warnf("Failed to sync log settings to instance manager pod")
log.WithError(err).Warnf("Failed to sync log settings to instance manager pod %v", im.Name)
}

isSettingSynced, isPodDeletedOrNotRunning, areInstancesRunningInPod, err := imc.areDangerZoneSettingsSyncedToIMPod(im)
Expand Down
2 changes: 1 addition & 1 deletion controller/kubernetes_pv_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func newPVC() *corev1.PersistentVolumeClaim {
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.ResourceRequirements{
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI),
},
Expand Down
8 changes: 4 additions & 4 deletions controller/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1533,25 +1533,25 @@ func (nc *NodeController) alignDiskSpecAndStatus(node *longhorn.Node) {
if diskInstanceName == "" {
diskInstanceName = diskName
}
if err := nc.deleteDisk(node, diskStatus.Type, diskInstanceName, diskStatus.DiskUUID, diskStatus.DiskPath, string(diskStatus.DiskDriver)); err != nil {
if err := nc.deleteDisk(diskStatus.Type, diskInstanceName, diskStatus.DiskUUID, diskStatus.DiskPath, string(diskStatus.DiskDriver)); err != nil {
nc.logger.WithError(err).Warnf("Failed to delete disk %v", diskInstanceName)
}
delete(node.Status.DiskStatus, diskName)
}
}
}

func (nc *NodeController) deleteDisk(node *longhorn.Node, diskType longhorn.DiskType, diskName, diskUUID, diskPath, diskDriver string) error {
func (nc *NodeController) deleteDisk(diskType longhorn.DiskType, diskName, diskUUID, diskPath, diskDriver string) error {
if diskUUID == "" {
log.Infof("Disk %v has no diskUUID, skip deleting", diskName)
return nil
}

dataEngine := util.GetDataEngineForDiskType(diskType)

im, err := nc.ds.GetDefaultInstanceManagerByNodeRO(nc.controllerID, dataEngine)
im, err := nc.ds.GetRunningInstanceManagerByNodeRO(nc.controllerID, dataEngine)
if err != nil {
return errors.Wrapf(err, "failed to get default instance manager")
return errors.Wrapf(err, "failed to get running instance manager for data engine %v", dataEngine)
}

diskServiceClient, err := engineapi.NewDiskServiceClient(im, nc.logger)
Expand Down
10 changes: 5 additions & 5 deletions controller/orphan_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,22 +327,22 @@ func (oc *OrphanController) deleteOrphanedReplica(orphan *longhorn.Orphan) error
err := lhns.DeletePath(filepath.Join(diskPath, "replicas", replicaDirectoryName))
return errors.Wrapf(err, "failed to delete orphan replica directory %v in disk %v", replicaDirectoryName, diskPath)
case longhorn.DiskTypeBlock:
return oc.DeleteSpdkReplicaInstance(orphan.Spec.Parameters[longhorn.OrphanDiskName], orphan.Spec.Parameters[longhorn.OrphanDiskUUID], "", orphan.Spec.Parameters[longhorn.OrphanDataName])
return oc.DeleteV2ReplicaInstance(orphan.Spec.Parameters[longhorn.OrphanDiskName], orphan.Spec.Parameters[longhorn.OrphanDiskUUID], "", orphan.Spec.Parameters[longhorn.OrphanDataName])
default:
return fmt.Errorf("unknown disk type %v for orphan %v", diskType, orphan.Name)
}
}

func (oc *OrphanController) DeleteSpdkReplicaInstance(diskName, diskUUID, diskDriver, replicaInstanceName string) (err error) {
func (oc *OrphanController) DeleteV2ReplicaInstance(diskName, diskUUID, diskDriver, replicaInstanceName string) (err error) {
logrus.Infof("Deleting SPDK replica instance %v on disk %v on node %v", replicaInstanceName, diskUUID, oc.controllerID)

defer func() {
err = errors.Wrapf(err, "cannot delete SPDK replica instance %v", replicaInstanceName)
err = errors.Wrapf(err, "cannot delete v2 replica instance %v", replicaInstanceName)
}()

im, err := oc.ds.GetDefaultInstanceManagerByNodeRO(oc.controllerID, longhorn.DataEngineTypeV2)
im, err := oc.ds.GetRunningInstanceManagerByNodeRO(oc.controllerID, longhorn.DataEngineTypeV2)
if err != nil {
return errors.Wrapf(err, "failed to get instance manager for node %v for deleting SPDK replica instance %v", oc.controllerID, replicaInstanceName)
return errors.Wrapf(err, "failed to get running instance manager for node %v for deleting v2 replica instance %v", oc.controllerID, replicaInstanceName)
}

c, err := engineapi.NewDiskServiceClient(im, oc.logger)
Expand Down
54 changes: 2 additions & 52 deletions controller/system_backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"k8s.io/kubernetes/pkg/controller"

corev1 "k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -907,7 +906,7 @@ func (c *SystemBackupController) generateSystemBackupYAMLsForKubernetes(dir stri
return
}

err = c.generateSystemBackupYAMLsForPodSecurityPolicy(dir, "roles", "rolebindings", "podsecuritypolicies", scheme)
err = c.generateSystemBackupYAMLsForRoles(dir, "roles", "rolebindings", scheme)
if err != nil {
return
}
Expand Down Expand Up @@ -981,9 +980,7 @@ func (c *SystemBackupController) generateSystemBackupYAMLsForServices(dir, name
}, scheme)
}

func (c *SystemBackupController) generateSystemBackupYAMLsForPodSecurityPolicy(dir,
roleName, roleBindingName, podSecurityPolicyName string,
scheme *runtime.Scheme) (err error) {
func (c *SystemBackupController) generateSystemBackupYAMLsForRoles(dir, roleName, roleBindingName string, scheme *runtime.Scheme) (err error) {
// Generate Role YAML
roleObj, err := c.ds.GetAllRoleList()
if err != nil && !apierrors.IsNotFound(err) {
Expand All @@ -1002,55 +999,8 @@ func (c *SystemBackupController) generateSystemBackupYAMLsForPodSecurityPolicy(d
return
}

// Generate PodSecurityPolicy YAML
err = c.generateSystemBackupYAMLsForPodSecurityPoliciesByRoles(roleList, dir, podSecurityPolicyName, scheme)
if err != nil {
return
}

// Generate RoleBinding YAML
return getObjectsAndPrintToYAML(dir, roleBindingName, c.ds.GetAllRoleBindingList, scheme)

}

func (c *SystemBackupController) generateSystemBackupYAMLsForPodSecurityPoliciesByRoles(
roleList *rbacv1.RoleList,
dir, name string, scheme *runtime.Scheme) (err error) {
pspObj, err := c.ds.GetAllPodSecurityPolicyList()
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrap(err, "failed to get all podSecurityPolicies")
}

pspList, ok := pspObj.(*policyv1beta1.PodSecurityPolicyList)
if !ok {
return errors.Wrap(err, "failed to convert to podSecurityPolicyList object")
}

filtered := []policyv1beta1.PodSecurityPolicy{}
for _, psp := range pspList.Items {
shouldBackup := false
for _, role := range roleList.Items {
for _, rule := range role.Rules {
if util.Contains(rule.ResourceNames, psp.Name) {
shouldBackup = true
break
}
}

if shouldBackup {
break
}
}

if shouldBackup {
filtered = append(filtered, psp)
}
}
pspList.Items = filtered

return getObjectsAndPrintToYAML(dir, name, func() (runtime.Object, error) {
return pspList, nil
}, scheme)
}

func (c *SystemBackupController) generateSystemBackupYAMLsForServiceAccount(dir,
Expand Down
34 changes: 0 additions & 34 deletions controller/system_restore_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
rbacv1 "k8s.io/api/rbac/v1"
storagev1 "k8s.io/api/storage/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
Expand Down Expand Up @@ -551,39 +550,6 @@ func fakeSystemRolloutPersistentVolumeClaims(fakeObjs map[SystemRolloutCRName]*c
}
}

func fakeSystemRolloutPodSecurityPolicies(fakeObjs map[SystemRolloutCRName]*policyv1beta1.PodSecurityPolicy, c *C, informerFactory informers.SharedInformerFactory, client *fake.Clientset) {
indexer := informerFactory.Policy().V1beta1().PodSecurityPolicies().Informer().GetIndexer()

clientInterface := client.PolicyV1beta1().PodSecurityPolicies()

exists, err := clientInterface.List(context.TODO(), metav1.ListOptions{})
c.Assert(err, IsNil)

for _, exist := range exists.Items {
exist, err := clientInterface.Get(context.TODO(), exist.Name, metav1.GetOptions{})
c.Assert(err, IsNil)

err = clientInterface.Delete(context.TODO(), exist.Name, metav1.DeleteOptions{})
c.Assert(err, IsNil)

err = indexer.Delete(exist)
c.Assert(err, IsNil)
}

for k, fakeObj := range fakeObjs {
name := string(k)
if strings.HasSuffix(name, TestIgnoreSuffix) {
continue
}

exist, err := clientInterface.Create(context.TODO(), newPodSecurityPolicy(fakeObj.Spec), metav1.CreateOptions{})
c.Assert(err, IsNil)

err = indexer.Add(exist)
c.Assert(err, IsNil)
}
}

func fakeSystemRolloutRecurringJobs(fakeObjs map[SystemRolloutCRName]*longhorn.RecurringJob, c *C, informerFactory lhinformers.SharedInformerFactory, client *lhfake.Clientset) {
indexer := informerFactory.Longhorn().V1beta2().RecurringJobs().Informer().GetIndexer()

Expand Down
62 changes: 0 additions & 62 deletions controller/system_rollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
rbacv1 "k8s.io/api/rbac/v1"
storagev1 "k8s.io/api/storage/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
Expand Down Expand Up @@ -112,8 +111,6 @@ type extractedResources struct {

storageClassList *storagev1.StorageClassList

podSecurityPolicyList *policyv1beta1.PodSecurityPolicyList

engineImageList *longhorn.EngineImageList
recurringJobList *longhorn.RecurringJobList
settingList *longhorn.SettingList
Expand Down Expand Up @@ -439,7 +436,6 @@ func (c *SystemRolloutController) systemRollout() error {
types.KubernetesKindServiceAccountList: c.restoreServiceAccounts,
types.KubernetesKindClusterRoleList: c.restoreClusterRoles,
types.KubernetesKindClusterRoleBindingList: c.restoreClusterRoleBindings,
types.KubernetesKindPodSecurityPolicyList: c.restorePodSecurityPolicies,
types.KubernetesKindRoleList: c.restoreRoles,
types.KubernetesKindRoleBindingList: c.restoreRoleBindings,
types.KubernetesKindStorageClassList: c.restoreStorageClasses,
Expand Down Expand Up @@ -681,9 +677,6 @@ func (c *SystemRolloutController) cacheResourcesFromDirectory(name string, schem
// Kubernetes Storage
case types.KubernetesKindStorageClassList:
c.storageClassList = obj.(*storagev1.StorageClassList)
// Kubernetes Policy
case types.KubernetesKindPodSecurityPolicyList:
c.podSecurityPolicyList = obj.(*policyv1beta1.PodSecurityPolicyList)
// Longhorn
case types.LonghornKindEngineImageList:
c.engineImageList = obj.(*longhorn.EngineImageList)
Expand Down Expand Up @@ -1393,61 +1386,6 @@ func (c *SystemRolloutController) restorePersistentVolumeClaims() (err error) {
return nil
}

func (c *SystemRolloutController) restorePodSecurityPolicies() (err error) {
if c.podSecurityPolicyList == nil {
return nil
}

for _, restore := range c.podSecurityPolicyList.Items {
log := c.logger.WithField(types.KubernetesKindPodSecurityPolicy, restore.Name)

exist, err := c.ds.GetPodSecurityPolicy(restore.Name)
if err != nil {
if !datastore.ErrorIsNotFound(err) {
return err
}

restore.ResourceVersion = ""

log.Info(SystemRolloutMsgCreating)

fnCreate := func(restore runtime.Object) (runtime.Object, error) {
obj, ok := restore.(*policyv1beta1.PodSecurityPolicy)
if !ok {
return nil, fmt.Errorf(SystemRolloutErrFailedConvertToObjectFmt, restore.GetObjectKind(), types.KubernetesKindPodSecurityPolicy)
}
return c.ds.CreatePodSecurityPolicy(obj)
}
_, err := c.rolloutResource(&restore, fnCreate, false, log, SystemRolloutMsgRestoredItem)
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
continue
}

isSkipped := true
if !reflect.DeepEqual(exist.Spec, restore.Spec) {
log.Info(SystemRolloutMsgUpdating)
exist.Spec = restore.Spec

isSkipped = false
}
fnUpdate := func(exist runtime.Object) (runtime.Object, error) {
obj, ok := exist.(*policyv1beta1.PodSecurityPolicy)
if !ok {
return nil, fmt.Errorf(SystemRolloutErrFailedConvertToObjectFmt, exist.GetObjectKind(), types.KubernetesKindPodSecurityPolicy)
}
return c.ds.UpdatePodSecurityPolicy(obj)
}
_, err = c.rolloutResource(exist, fnUpdate, isSkipped, log, SystemRolloutMsgSkipIdentical)
if err != nil {
return err
}
}

return nil
}

func (c *SystemRolloutController) restoreRecurringJobs() (err error) {
if c.recurringJobList == nil {
return nil
Expand Down
Loading

0 comments on commit ac947eb

Please sign in to comment.