diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index 69d3690cb97..2994da2d181 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -255,6 +255,35 @@ const ( DataProcessType OperationType = "DataProcess" ) +// AffinityPolicy the strategy for the affinity between Data Operation Pods. +type AffinityPolicy string + +const ( + DefaultAffinityStrategy AffinityPolicy = "" + RequireAffinityStrategy AffinityPolicy = "Require" + PreferAffinityStrategy AffinityPolicy = "Prefer" +) + +type AffinityStrategy struct { + // Policy one of: "", "Require", "Prefer" + // +optional + Policy AffinityPolicy `json:"policy,omitempty"` + + Prefer []Prefer `json:"prefer,omitempty"` + Require []Require `json:"require,omitempty"` +} + +// Prefer defines the label key and weight for generating a PreferredSchedulingTerm. +type Prefer struct { + Name string `json:"name"` + Weight int32 `json:"weight"` +} + +// Require defines the label key for generating a NodeSelectorTerm. +type Require struct { + Name string `json:"name"` +} + type OperationRef struct { // API version of the referent operation // +optional @@ -272,6 +301,10 @@ type OperationRef struct { // Namespace specifies the namespace of the referent operation. // +optional Namespace string `json:"namespace,omitempty"` + + // AffinityStrategy specifies the pod affinity strategy with the referent operation. + // +optional + AffinityStrategy AffinityStrategy `json:"affinityStrategy,omitempty"` } type WaitingStatus struct { diff --git a/api/v1alpha1/openapi_generated.go b/api/v1alpha1/openapi_generated.go index 610e7e23622..50df025c3f0 100644 --- a/api/v1alpha1/openapi_generated.go +++ b/api/v1alpha1/openapi_generated.go @@ -29,6 +29,7 @@ import ( func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition { return map[string]common.OpenAPIDefinition{ "github.com/fluid-cloudnative/fluid/api/v1alpha1.APIGatewayStatus": schema_fluid_cloudnative_fluid_api_v1alpha1_APIGatewayStatus(ref), + "github.com/fluid-cloudnative/fluid/api/v1alpha1.AffinityStrategy": schema_fluid_cloudnative_fluid_api_v1alpha1_AffinityStrategy(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.AlluxioCompTemplateSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_AlluxioCompTemplateSpec(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.AlluxioFuseSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_AlluxioFuseSpec(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.AlluxioRuntime": schema_fluid_cloudnative_fluid_api_v1alpha1_AlluxioRuntime(ref), @@ -95,7 +96,9 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationRef": schema_fluid_cloudnative_fluid_api_v1alpha1_OperationRef(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationStatus": schema_fluid_cloudnative_fluid_api_v1alpha1_OperationStatus(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.PodMetadata": schema_fluid_cloudnative_fluid_api_v1alpha1_PodMetadata(ref), + "github.com/fluid-cloudnative/fluid/api/v1alpha1.Prefer": schema_fluid_cloudnative_fluid_api_v1alpha1_Prefer(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.Processor": schema_fluid_cloudnative_fluid_api_v1alpha1_Processor(ref), + "github.com/fluid-cloudnative/fluid/api/v1alpha1.Require": schema_fluid_cloudnative_fluid_api_v1alpha1_Require(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.Runtime": schema_fluid_cloudnative_fluid_api_v1alpha1_Runtime(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.RuntimeCondition": schema_fluid_cloudnative_fluid_api_v1alpha1_RuntimeCondition(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.RuntimeManagement": schema_fluid_cloudnative_fluid_api_v1alpha1_RuntimeManagement(ref), @@ -147,6 +150,53 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_APIGatewayStatus(ref common.Ref } } +func schema_fluid_cloudnative_fluid_api_v1alpha1_AffinityStrategy(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "policy": { + SchemaProps: spec.SchemaProps{ + Description: "Policy one of: \"\", \"Require\", \"Prefer\"", + Type: []string{"string"}, + Format: "", + }, + }, + "prefer": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.Prefer"), + }, + }, + }, + }, + }, + "require": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.Require"), + }, + }, + }, + }, + }, + }, + }, + }, + Dependencies: []string{ + "github.com/fluid-cloudnative/fluid/api/v1alpha1.Prefer", "github.com/fluid-cloudnative/fluid/api/v1alpha1.Require"}, + } +} + func schema_fluid_cloudnative_fluid_api_v1alpha1_AlluxioCompTemplateSpec(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -4817,10 +4867,19 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_OperationRef(ref common.Referen Format: "", }, }, + "affinityStrategy": { + SchemaProps: spec.SchemaProps{ + Description: "AffinityStrategy specifies the pod affinity strategy with the referent operation.", + Default: map[string]interface{}{}, + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.AffinityStrategy"), + }, + }, }, Required: []string{"kind", "name"}, }, }, + Dependencies: []string{ + "github.com/fluid-cloudnative/fluid/api/v1alpha1.AffinityStrategy"}, } } @@ -4896,12 +4955,18 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_OperationStatus(ref common.Refe Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.WaitingStatus"), }, }, + "nodeAffinity": { + SchemaProps: spec.SchemaProps{ + Description: "NodeAffinity records the node affinity for operation pods", + Ref: ref("k8s.io/api/core/v1.NodeAffinity"), + }, + }, }, Required: []string{"phase", "duration", "conditions"}, }, }, Dependencies: []string{ - "github.com/fluid-cloudnative/fluid/api/v1alpha1.Condition", "github.com/fluid-cloudnative/fluid/api/v1alpha1.WaitingStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, + "github.com/fluid-cloudnative/fluid/api/v1alpha1.Condition", "github.com/fluid-cloudnative/fluid/api/v1alpha1.WaitingStatus", "k8s.io/api/core/v1.NodeAffinity", "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, } } @@ -4950,6 +5015,34 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_PodMetadata(ref common.Referenc } } +func schema_fluid_cloudnative_fluid_api_v1alpha1_Prefer(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "Prefer defines the label key and weight for generating a PreferredSchedulingTerm.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "name": { + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + "weight": { + SchemaProps: spec.SchemaProps{ + Default: 0, + Type: []string{"integer"}, + Format: "int32", + }, + }, + }, + Required: []string{"name", "weight"}, + }, + }, + } +} + func schema_fluid_cloudnative_fluid_api_v1alpha1_Processor(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -4991,6 +5084,27 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_Processor(ref common.ReferenceC } } +func schema_fluid_cloudnative_fluid_api_v1alpha1_Require(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "Require defines the label key for generating a NodeSelectorTerm.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "name": { + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"name"}, + }, + }, + } +} + func schema_fluid_cloudnative_fluid_api_v1alpha1_Runtime(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ diff --git a/api/v1alpha1/status.go b/api/v1alpha1/status.go index 3839d2e709a..57bf44e81fb 100644 --- a/api/v1alpha1/status.go +++ b/api/v1alpha1/status.go @@ -151,6 +151,9 @@ type OperationStatus struct { LastSuccessfulTime *metav1.Time `json:"lastSuccessfulTime,omitempty"` // WaitingStatus stores information about waiting operation. WaitingFor WaitingStatus `json:"waitingFor,omitempty"` + + // NodeAffinity records the node affinity for operation pods + NodeAffinity *corev1.NodeAffinity `json:"nodeAffinity,omitempty"` } type RuntimePhase string diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 9c1b39472e9..b7fd509f98b 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -41,6 +41,31 @@ func (in *APIGatewayStatus) DeepCopy() *APIGatewayStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AffinityStrategy) DeepCopyInto(out *AffinityStrategy) { + *out = *in + if in.Prefer != nil { + in, out := &in.Prefer, &out.Prefer + *out = make([]Prefer, len(*in)) + copy(*out, *in) + } + if in.Require != nil { + in, out := &in.Require, &out.Require + *out = make([]Require, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AffinityStrategy. +func (in *AffinityStrategy) DeepCopy() *AffinityStrategy { + if in == nil { + return nil + } + out := new(AffinityStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AlluxioCompTemplateSpec) DeepCopyInto(out *AlluxioCompTemplateSpec) { *out = *in @@ -410,7 +435,7 @@ func (in *DataBackupSpec) DeepCopyInto(out *DataBackupSpec) { if in.RunAfter != nil { in, out := &in.RunAfter, &out.RunAfter *out = new(OperationRef) - **out = **in + (*in).DeepCopyInto(*out) } if in.TTLSecondsAfterFinished != nil { in, out := &in.TTLSecondsAfterFinished, &out.TTLSecondsAfterFinished @@ -527,7 +552,7 @@ func (in *DataLoadSpec) DeepCopyInto(out *DataLoadSpec) { if in.RunAfter != nil { in, out := &in.RunAfter, &out.RunAfter *out = new(OperationRef) - **out = **in + (*in).DeepCopyInto(*out) } if in.TTLSecondsAfterFinished != nil { in, out := &in.TTLSecondsAfterFinished, &out.TTLSecondsAfterFinished @@ -642,7 +667,7 @@ func (in *DataMigrateSpec) DeepCopyInto(out *DataMigrateSpec) { if in.RunAfter != nil { in, out := &in.RunAfter, &out.RunAfter *out = new(OperationRef) - **out = **in + (*in).DeepCopyInto(*out) } if in.TTLSecondsAfterFinished != nil { in, out := &in.TTLSecondsAfterFinished, &out.TTLSecondsAfterFinished @@ -736,7 +761,7 @@ func (in *DataProcessSpec) DeepCopyInto(out *DataProcessSpec) { if in.RunAfter != nil { in, out := &in.RunAfter, &out.RunAfter *out = new(OperationRef) - **out = **in + (*in).DeepCopyInto(*out) } if in.TTLSecondsAfterFinished != nil { in, out := &in.TTLSecondsAfterFinished, &out.TTLSecondsAfterFinished @@ -2091,6 +2116,7 @@ func (in *OSAdvise) DeepCopy() *OSAdvise { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OperationRef) DeepCopyInto(out *OperationRef) { *out = *in + in.AffinityStrategy.DeepCopyInto(&out.AffinityStrategy) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OperationRef. @@ -2129,6 +2155,11 @@ func (in *OperationStatus) DeepCopyInto(out *OperationStatus) { *out = (*in).DeepCopy() } in.WaitingFor.DeepCopyInto(&out.WaitingFor) + if in.NodeAffinity != nil { + in, out := &in.NodeAffinity, &out.NodeAffinity + *out = new(v1.NodeAffinity) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OperationStatus. @@ -2170,6 +2201,21 @@ func (in *PodMetadata) DeepCopy() *PodMetadata { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Prefer) DeepCopyInto(out *Prefer) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Prefer. +func (in *Prefer) DeepCopy() *Prefer { + if in == nil { + return nil + } + out := new(Prefer) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Processor) DeepCopyInto(out *Processor) { *out = *in @@ -2196,6 +2242,21 @@ func (in *Processor) DeepCopy() *Processor { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Require) DeepCopyInto(out *Require) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Require. +func (in *Require) DeepCopy() *Require { + if in == nil { + return nil + } + out := new(Require) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Runtime) DeepCopyInto(out *Runtime) { *out = *in diff --git a/charts/fluid-databackup/alluxio/templates/databackup.yaml b/charts/fluid-databackup/alluxio/templates/databackup.yaml index c4aeb645f43..3a712211b3c 100644 --- a/charts/fluid-databackup/alluxio/templates/databackup.yaml +++ b/charts/fluid-databackup/alluxio/templates/databackup.yaml @@ -89,6 +89,11 @@ spec: mountPath: /etc/group readOnly: true {{- end}} + {{- if .Values.dataBackup.affinity }} + affinity: +{{ toYaml .Values.dataBackup.affinity | indent 4 }} + {{- end }} + restartPolicy: Never volumes: {{- if .Values.dataBackup.workdir }} diff --git a/charts/fluid-databackup/alluxio/values.yaml b/charts/fluid-databackup/alluxio/values.yaml index 09567c31696..fb6a0b4a028 100644 --- a/charts/fluid-databackup/alluxio/values.yaml +++ b/charts/fluid-databackup/alluxio/values.yaml @@ -45,6 +45,8 @@ dataBackup: # Description: optional image pull secrets on DataLoad pods imagePullSecrets: [] + affinity: + initUsers: enabled: false image: registry.cn-hangzhou.aliyuncs.com/fluid/init-users diff --git a/charts/fluid-dataprocess/common/templates/job.yaml b/charts/fluid-dataprocess/common/templates/job.yaml index 51444f3eef8..a78d2b10d89 100644 --- a/charts/fluid-dataprocess/common/templates/job.yaml +++ b/charts/fluid-dataprocess/common/templates/job.yaml @@ -69,6 +69,10 @@ spec: {{- if .Values.dataProcess.scriptProcessor.volumeMounts }} {{- toYaml .Values.dataProcess.scriptProcessor.volumeMounts | nindent 12 }} {{- end }} + {{- if .Values.dataProcess.scriptProcessor.affinity }} + affinity: +{{ toYaml .Values.dataProcess.scriptProcessor.affinity | indent 8 }} + {{- end }} volumes: - name: script-cm-vol configMap: diff --git a/charts/fluid-dataprocess/common/values.yaml b/charts/fluid-dataprocess/common/values.yaml index cb9962bdaa1..057f1d547f7 100644 --- a/charts/fluid-dataprocess/common/values.yaml +++ b/charts/fluid-dataprocess/common/values.yaml @@ -25,5 +25,6 @@ dataProcess: volumes: [] volumeMounts: [] resources: {} + affinity: {} jobProcessor: podSpec: {} diff --git a/charts/fluid/fluid/crds/data.fluid.io_databackups.yaml b/charts/fluid/fluid/crds/data.fluid.io_databackups.yaml index f2e88040b61..4e3e2412d62 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_databackups.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_databackups.yaml @@ -67,6 +67,40 @@ spec: runAfter: description: Specifies that the preceding operation in a workflow properties: + affinityStrategy: + description: AffinityStrategy specifies the pod affinity strategy + with the referent operation. + properties: + policy: + description: 'Policy one of: "", "Require", "Prefer"' + type: string + prefer: + items: + description: Prefer defines the label key and weight for + generating a PreferredSchedulingTerm. + properties: + name: + type: string + weight: + format: int32 + type: integer + required: + - name + - weight + type: object + type: array + require: + items: + description: Require defines the label key for generating + a NodeSelectorTerm. + properties: + name: + type: string + required: + - name + type: object + type: array + type: object apiVersion: description: API version of the referent operation type: string @@ -174,6 +208,200 @@ spec: successfully completed format: date-time type: string + nodeAffinity: + description: NodeAffinity records the node affinity for operation + pods + properties: + preferredDuringSchedulingIgnoredDuringExecution: + description: The scheduler will prefer to schedule pods to nodes + that satisfy the affinity expressions specified by this field, + but it may choose a node that violates one or more of the expressions. + The node that is most preferred is the one with the greatest + sum of weights, i.e. for each node that meets all of the scheduling + requirements (resource request, requiredDuringScheduling affinity + expressions, etc.), compute a sum by iterating through the elements + of this field and adding "weight" to the sum if the node matches + the corresponding matchExpressions; the node(s) with the highest + sum are the most preferred. + items: + description: An empty preferred scheduling term matches all + objects with implicit weight 0 (i.e. it's a no-op). A null + preferred scheduling term matches no objects (i.e. is also + a no-op). + properties: + preference: + description: A node selector term, associated with the corresponding + weight. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + weight: + description: Weight associated with matching the corresponding + nodeSelectorTerm, in the range 1-100. + format: int32 + type: integer + required: + - preference + - weight + type: object + type: array + requiredDuringSchedulingIgnoredDuringExecution: + description: If the affinity requirements specified by this field + are not met at scheduling time, the pod will not be scheduled + onto the node. If the affinity requirements specified by this + field cease to be met at some point during pod execution (e.g. + due to an update), the system may or may not try to eventually + evict the pod from its node. + properties: + nodeSelectorTerms: + description: Required. A list of node selector terms. The + terms are ORed. + items: + description: A null or empty node selector term matches + no objects. The requirements of them are ANDed. The TopologySelectorTerm + type implements a subset of the NodeSelectorTerm. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + type: array + required: + - nodeSelectorTerms + type: object + type: object phase: description: Phase describes current phase of operation type: string diff --git a/charts/fluid/fluid/crds/data.fluid.io_dataloads.yaml b/charts/fluid/fluid/crds/data.fluid.io_dataloads.yaml index 626fbc52323..a6217661944 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_dataloads.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_dataloads.yaml @@ -979,6 +979,40 @@ spec: runAfter: description: Specifies that the preceding operation in a workflow properties: + affinityStrategy: + description: AffinityStrategy specifies the pod affinity strategy + with the referent operation. + properties: + policy: + description: 'Policy one of: "", "Require", "Prefer"' + type: string + prefer: + items: + description: Prefer defines the label key and weight for + generating a PreferredSchedulingTerm. + properties: + name: + type: string + weight: + format: int32 + type: integer + required: + - name + - weight + type: object + type: array + require: + items: + description: Require defines the label key for generating + a NodeSelectorTerm. + properties: + name: + type: string + required: + - name + type: object + type: array + type: object apiVersion: description: API version of the referent operation type: string @@ -1127,6 +1161,200 @@ spec: successfully completed format: date-time type: string + nodeAffinity: + description: NodeAffinity records the node affinity for operation + pods + properties: + preferredDuringSchedulingIgnoredDuringExecution: + description: The scheduler will prefer to schedule pods to nodes + that satisfy the affinity expressions specified by this field, + but it may choose a node that violates one or more of the expressions. + The node that is most preferred is the one with the greatest + sum of weights, i.e. for each node that meets all of the scheduling + requirements (resource request, requiredDuringScheduling affinity + expressions, etc.), compute a sum by iterating through the elements + of this field and adding "weight" to the sum if the node matches + the corresponding matchExpressions; the node(s) with the highest + sum are the most preferred. + items: + description: An empty preferred scheduling term matches all + objects with implicit weight 0 (i.e. it's a no-op). A null + preferred scheduling term matches no objects (i.e. is also + a no-op). + properties: + preference: + description: A node selector term, associated with the corresponding + weight. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + weight: + description: Weight associated with matching the corresponding + nodeSelectorTerm, in the range 1-100. + format: int32 + type: integer + required: + - preference + - weight + type: object + type: array + requiredDuringSchedulingIgnoredDuringExecution: + description: If the affinity requirements specified by this field + are not met at scheduling time, the pod will not be scheduled + onto the node. If the affinity requirements specified by this + field cease to be met at some point during pod execution (e.g. + due to an update), the system may or may not try to eventually + evict the pod from its node. + properties: + nodeSelectorTerms: + description: Required. A list of node selector terms. The + terms are ORed. + items: + description: A null or empty node selector term matches + no objects. The requirements of them are ANDed. The TopologySelectorTerm + type implements a subset of the NodeSelectorTerm. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + type: array + required: + - nodeSelectorTerms + type: object + type: object phase: description: Phase describes current phase of operation type: string diff --git a/charts/fluid/fluid/crds/data.fluid.io_datamigrates.yaml b/charts/fluid/fluid/crds/data.fluid.io_datamigrates.yaml index 6f3ed5b135b..2b66bd9ec96 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_datamigrates.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_datamigrates.yaml @@ -1045,6 +1045,40 @@ spec: runAfter: description: Specifies that the preceding operation in a workflow properties: + affinityStrategy: + description: AffinityStrategy specifies the pod affinity strategy + with the referent operation. + properties: + policy: + description: 'Policy one of: "", "Require", "Prefer"' + type: string + prefer: + items: + description: Prefer defines the label key and weight for + generating a PreferredSchedulingTerm. + properties: + name: + type: string + weight: + format: int32 + type: integer + required: + - name + - weight + type: object + type: array + require: + items: + description: Require defines the label key for generating + a NodeSelectorTerm. + properties: + name: + type: string + required: + - name + type: object + type: array + type: object apiVersion: description: API version of the referent operation type: string @@ -1242,6 +1276,200 @@ spec: successfully completed format: date-time type: string + nodeAffinity: + description: NodeAffinity records the node affinity for operation + pods + properties: + preferredDuringSchedulingIgnoredDuringExecution: + description: The scheduler will prefer to schedule pods to nodes + that satisfy the affinity expressions specified by this field, + but it may choose a node that violates one or more of the expressions. + The node that is most preferred is the one with the greatest + sum of weights, i.e. for each node that meets all of the scheduling + requirements (resource request, requiredDuringScheduling affinity + expressions, etc.), compute a sum by iterating through the elements + of this field and adding "weight" to the sum if the node matches + the corresponding matchExpressions; the node(s) with the highest + sum are the most preferred. + items: + description: An empty preferred scheduling term matches all + objects with implicit weight 0 (i.e. it's a no-op). A null + preferred scheduling term matches no objects (i.e. is also + a no-op). + properties: + preference: + description: A node selector term, associated with the corresponding + weight. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + weight: + description: Weight associated with matching the corresponding + nodeSelectorTerm, in the range 1-100. + format: int32 + type: integer + required: + - preference + - weight + type: object + type: array + requiredDuringSchedulingIgnoredDuringExecution: + description: If the affinity requirements specified by this field + are not met at scheduling time, the pod will not be scheduled + onto the node. If the affinity requirements specified by this + field cease to be met at some point during pod execution (e.g. + due to an update), the system may or may not try to eventually + evict the pod from its node. + properties: + nodeSelectorTerms: + description: Required. A list of node selector terms. The + terms are ORed. + items: + description: A null or empty node selector term matches + no objects. The requirements of them are ANDed. The TopologySelectorTerm + type implements a subset of the NodeSelectorTerm. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + type: array + required: + - nodeSelectorTerms + type: object + type: object phase: description: Phase describes current phase of operation type: string diff --git a/charts/fluid/fluid/crds/data.fluid.io_dataprocesses.yaml b/charts/fluid/fluid/crds/data.fluid.io_dataprocesses.yaml index a60ba0e3281..5f2cab223d5 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_dataprocesses.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_dataprocesses.yaml @@ -9821,6 +9821,40 @@ spec: runAfter: description: Specifies that the preceding operation in a workflow properties: + affinityStrategy: + description: AffinityStrategy specifies the pod affinity strategy + with the referent operation. + properties: + policy: + description: 'Policy one of: "", "Require", "Prefer"' + type: string + prefer: + items: + description: Prefer defines the label key and weight for + generating a PreferredSchedulingTerm. + properties: + name: + type: string + weight: + format: int32 + type: integer + required: + - name + - weight + type: object + type: array + require: + items: + description: Require defines the label key for generating + a NodeSelectorTerm. + properties: + name: + type: string + required: + - name + type: object + type: array + type: object apiVersion: description: API version of the referent operation type: string @@ -9908,6 +9942,200 @@ spec: successfully completed format: date-time type: string + nodeAffinity: + description: NodeAffinity records the node affinity for operation + pods + properties: + preferredDuringSchedulingIgnoredDuringExecution: + description: The scheduler will prefer to schedule pods to nodes + that satisfy the affinity expressions specified by this field, + but it may choose a node that violates one or more of the expressions. + The node that is most preferred is the one with the greatest + sum of weights, i.e. for each node that meets all of the scheduling + requirements (resource request, requiredDuringScheduling affinity + expressions, etc.), compute a sum by iterating through the elements + of this field and adding "weight" to the sum if the node matches + the corresponding matchExpressions; the node(s) with the highest + sum are the most preferred. + items: + description: An empty preferred scheduling term matches all + objects with implicit weight 0 (i.e. it's a no-op). A null + preferred scheduling term matches no objects (i.e. is also + a no-op). + properties: + preference: + description: A node selector term, associated with the corresponding + weight. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + weight: + description: Weight associated with matching the corresponding + nodeSelectorTerm, in the range 1-100. + format: int32 + type: integer + required: + - preference + - weight + type: object + type: array + requiredDuringSchedulingIgnoredDuringExecution: + description: If the affinity requirements specified by this field + are not met at scheduling time, the pod will not be scheduled + onto the node. If the affinity requirements specified by this + field cease to be met at some point during pod execution (e.g. + due to an update), the system may or may not try to eventually + evict the pod from its node. + properties: + nodeSelectorTerms: + description: Required. A list of node selector terms. The + terms are ORed. + items: + description: A null or empty node selector term matches + no objects. The requirements of them are ANDed. The TopologySelectorTerm + type implements a subset of the NodeSelectorTerm. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + type: array + required: + - nodeSelectorTerms + type: object + type: object phase: description: Phase describes current phase of operation type: string diff --git a/charts/fluid/fluid/templates/role/dataset/rbac.yaml b/charts/fluid/fluid/templates/role/dataset/rbac.yaml index 3d12ec5d23f..38374e2a10a 100644 --- a/charts/fluid/fluid/templates/role/dataset/rbac.yaml +++ b/charts/fluid/fluid/templates/role/dataset/rbac.yaml @@ -13,6 +13,14 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - nodes + verbs: + - get + - list + - watch - apiGroups: - "" resources: diff --git a/config/crd/bases/data.fluid.io_databackups.yaml b/config/crd/bases/data.fluid.io_databackups.yaml index f2e88040b61..4e3e2412d62 100644 --- a/config/crd/bases/data.fluid.io_databackups.yaml +++ b/config/crd/bases/data.fluid.io_databackups.yaml @@ -67,6 +67,40 @@ spec: runAfter: description: Specifies that the preceding operation in a workflow properties: + affinityStrategy: + description: AffinityStrategy specifies the pod affinity strategy + with the referent operation. + properties: + policy: + description: 'Policy one of: "", "Require", "Prefer"' + type: string + prefer: + items: + description: Prefer defines the label key and weight for + generating a PreferredSchedulingTerm. + properties: + name: + type: string + weight: + format: int32 + type: integer + required: + - name + - weight + type: object + type: array + require: + items: + description: Require defines the label key for generating + a NodeSelectorTerm. + properties: + name: + type: string + required: + - name + type: object + type: array + type: object apiVersion: description: API version of the referent operation type: string @@ -174,6 +208,200 @@ spec: successfully completed format: date-time type: string + nodeAffinity: + description: NodeAffinity records the node affinity for operation + pods + properties: + preferredDuringSchedulingIgnoredDuringExecution: + description: The scheduler will prefer to schedule pods to nodes + that satisfy the affinity expressions specified by this field, + but it may choose a node that violates one or more of the expressions. + The node that is most preferred is the one with the greatest + sum of weights, i.e. for each node that meets all of the scheduling + requirements (resource request, requiredDuringScheduling affinity + expressions, etc.), compute a sum by iterating through the elements + of this field and adding "weight" to the sum if the node matches + the corresponding matchExpressions; the node(s) with the highest + sum are the most preferred. + items: + description: An empty preferred scheduling term matches all + objects with implicit weight 0 (i.e. it's a no-op). A null + preferred scheduling term matches no objects (i.e. is also + a no-op). + properties: + preference: + description: A node selector term, associated with the corresponding + weight. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + weight: + description: Weight associated with matching the corresponding + nodeSelectorTerm, in the range 1-100. + format: int32 + type: integer + required: + - preference + - weight + type: object + type: array + requiredDuringSchedulingIgnoredDuringExecution: + description: If the affinity requirements specified by this field + are not met at scheduling time, the pod will not be scheduled + onto the node. If the affinity requirements specified by this + field cease to be met at some point during pod execution (e.g. + due to an update), the system may or may not try to eventually + evict the pod from its node. + properties: + nodeSelectorTerms: + description: Required. A list of node selector terms. The + terms are ORed. + items: + description: A null or empty node selector term matches + no objects. The requirements of them are ANDed. The TopologySelectorTerm + type implements a subset of the NodeSelectorTerm. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + type: array + required: + - nodeSelectorTerms + type: object + type: object phase: description: Phase describes current phase of operation type: string diff --git a/config/crd/bases/data.fluid.io_dataloads.yaml b/config/crd/bases/data.fluid.io_dataloads.yaml index 626fbc52323..a6217661944 100644 --- a/config/crd/bases/data.fluid.io_dataloads.yaml +++ b/config/crd/bases/data.fluid.io_dataloads.yaml @@ -979,6 +979,40 @@ spec: runAfter: description: Specifies that the preceding operation in a workflow properties: + affinityStrategy: + description: AffinityStrategy specifies the pod affinity strategy + with the referent operation. + properties: + policy: + description: 'Policy one of: "", "Require", "Prefer"' + type: string + prefer: + items: + description: Prefer defines the label key and weight for + generating a PreferredSchedulingTerm. + properties: + name: + type: string + weight: + format: int32 + type: integer + required: + - name + - weight + type: object + type: array + require: + items: + description: Require defines the label key for generating + a NodeSelectorTerm. + properties: + name: + type: string + required: + - name + type: object + type: array + type: object apiVersion: description: API version of the referent operation type: string @@ -1127,6 +1161,200 @@ spec: successfully completed format: date-time type: string + nodeAffinity: + description: NodeAffinity records the node affinity for operation + pods + properties: + preferredDuringSchedulingIgnoredDuringExecution: + description: The scheduler will prefer to schedule pods to nodes + that satisfy the affinity expressions specified by this field, + but it may choose a node that violates one or more of the expressions. + The node that is most preferred is the one with the greatest + sum of weights, i.e. for each node that meets all of the scheduling + requirements (resource request, requiredDuringScheduling affinity + expressions, etc.), compute a sum by iterating through the elements + of this field and adding "weight" to the sum if the node matches + the corresponding matchExpressions; the node(s) with the highest + sum are the most preferred. + items: + description: An empty preferred scheduling term matches all + objects with implicit weight 0 (i.e. it's a no-op). A null + preferred scheduling term matches no objects (i.e. is also + a no-op). + properties: + preference: + description: A node selector term, associated with the corresponding + weight. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + weight: + description: Weight associated with matching the corresponding + nodeSelectorTerm, in the range 1-100. + format: int32 + type: integer + required: + - preference + - weight + type: object + type: array + requiredDuringSchedulingIgnoredDuringExecution: + description: If the affinity requirements specified by this field + are not met at scheduling time, the pod will not be scheduled + onto the node. If the affinity requirements specified by this + field cease to be met at some point during pod execution (e.g. + due to an update), the system may or may not try to eventually + evict the pod from its node. + properties: + nodeSelectorTerms: + description: Required. A list of node selector terms. The + terms are ORed. + items: + description: A null or empty node selector term matches + no objects. The requirements of them are ANDed. The TopologySelectorTerm + type implements a subset of the NodeSelectorTerm. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + type: array + required: + - nodeSelectorTerms + type: object + type: object phase: description: Phase describes current phase of operation type: string diff --git a/config/crd/bases/data.fluid.io_datamigrates.yaml b/config/crd/bases/data.fluid.io_datamigrates.yaml index 6f3ed5b135b..2b66bd9ec96 100644 --- a/config/crd/bases/data.fluid.io_datamigrates.yaml +++ b/config/crd/bases/data.fluid.io_datamigrates.yaml @@ -1045,6 +1045,40 @@ spec: runAfter: description: Specifies that the preceding operation in a workflow properties: + affinityStrategy: + description: AffinityStrategy specifies the pod affinity strategy + with the referent operation. + properties: + policy: + description: 'Policy one of: "", "Require", "Prefer"' + type: string + prefer: + items: + description: Prefer defines the label key and weight for + generating a PreferredSchedulingTerm. + properties: + name: + type: string + weight: + format: int32 + type: integer + required: + - name + - weight + type: object + type: array + require: + items: + description: Require defines the label key for generating + a NodeSelectorTerm. + properties: + name: + type: string + required: + - name + type: object + type: array + type: object apiVersion: description: API version of the referent operation type: string @@ -1242,6 +1276,200 @@ spec: successfully completed format: date-time type: string + nodeAffinity: + description: NodeAffinity records the node affinity for operation + pods + properties: + preferredDuringSchedulingIgnoredDuringExecution: + description: The scheduler will prefer to schedule pods to nodes + that satisfy the affinity expressions specified by this field, + but it may choose a node that violates one or more of the expressions. + The node that is most preferred is the one with the greatest + sum of weights, i.e. for each node that meets all of the scheduling + requirements (resource request, requiredDuringScheduling affinity + expressions, etc.), compute a sum by iterating through the elements + of this field and adding "weight" to the sum if the node matches + the corresponding matchExpressions; the node(s) with the highest + sum are the most preferred. + items: + description: An empty preferred scheduling term matches all + objects with implicit weight 0 (i.e. it's a no-op). A null + preferred scheduling term matches no objects (i.e. is also + a no-op). + properties: + preference: + description: A node selector term, associated with the corresponding + weight. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + weight: + description: Weight associated with matching the corresponding + nodeSelectorTerm, in the range 1-100. + format: int32 + type: integer + required: + - preference + - weight + type: object + type: array + requiredDuringSchedulingIgnoredDuringExecution: + description: If the affinity requirements specified by this field + are not met at scheduling time, the pod will not be scheduled + onto the node. If the affinity requirements specified by this + field cease to be met at some point during pod execution (e.g. + due to an update), the system may or may not try to eventually + evict the pod from its node. + properties: + nodeSelectorTerms: + description: Required. A list of node selector terms. The + terms are ORed. + items: + description: A null or empty node selector term matches + no objects. The requirements of them are ANDed. The TopologySelectorTerm + type implements a subset of the NodeSelectorTerm. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + type: array + required: + - nodeSelectorTerms + type: object + type: object phase: description: Phase describes current phase of operation type: string diff --git a/config/crd/bases/data.fluid.io_dataprocesses.yaml b/config/crd/bases/data.fluid.io_dataprocesses.yaml index a60ba0e3281..5f2cab223d5 100644 --- a/config/crd/bases/data.fluid.io_dataprocesses.yaml +++ b/config/crd/bases/data.fluid.io_dataprocesses.yaml @@ -9821,6 +9821,40 @@ spec: runAfter: description: Specifies that the preceding operation in a workflow properties: + affinityStrategy: + description: AffinityStrategy specifies the pod affinity strategy + with the referent operation. + properties: + policy: + description: 'Policy one of: "", "Require", "Prefer"' + type: string + prefer: + items: + description: Prefer defines the label key and weight for + generating a PreferredSchedulingTerm. + properties: + name: + type: string + weight: + format: int32 + type: integer + required: + - name + - weight + type: object + type: array + require: + items: + description: Require defines the label key for generating + a NodeSelectorTerm. + properties: + name: + type: string + required: + - name + type: object + type: array + type: object apiVersion: description: API version of the referent operation type: string @@ -9908,6 +9942,200 @@ spec: successfully completed format: date-time type: string + nodeAffinity: + description: NodeAffinity records the node affinity for operation + pods + properties: + preferredDuringSchedulingIgnoredDuringExecution: + description: The scheduler will prefer to schedule pods to nodes + that satisfy the affinity expressions specified by this field, + but it may choose a node that violates one or more of the expressions. + The node that is most preferred is the one with the greatest + sum of weights, i.e. for each node that meets all of the scheduling + requirements (resource request, requiredDuringScheduling affinity + expressions, etc.), compute a sum by iterating through the elements + of this field and adding "weight" to the sum if the node matches + the corresponding matchExpressions; the node(s) with the highest + sum are the most preferred. + items: + description: An empty preferred scheduling term matches all + objects with implicit weight 0 (i.e. it's a no-op). A null + preferred scheduling term matches no objects (i.e. is also + a no-op). + properties: + preference: + description: A node selector term, associated with the corresponding + weight. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + weight: + description: Weight associated with matching the corresponding + nodeSelectorTerm, in the range 1-100. + format: int32 + type: integer + required: + - preference + - weight + type: object + type: array + requiredDuringSchedulingIgnoredDuringExecution: + description: If the affinity requirements specified by this field + are not met at scheduling time, the pod will not be scheduled + onto the node. If the affinity requirements specified by this + field cease to be met at some point during pod execution (e.g. + due to an update), the system may or may not try to eventually + evict the pod from its node. + properties: + nodeSelectorTerms: + description: Required. A list of node selector terms. The + terms are ORed. + items: + description: A null or empty node selector term matches + no objects. The requirements of them are ANDed. The TopologySelectorTerm + type implements a subset of the NodeSelectorTerm. + properties: + matchExpressions: + description: A list of node selector requirements by + node's labels. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + description: A list of node selector requirements by + node's fields. + items: + description: A node selector requirement is a selector + that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: The label key that the selector applies + to. + type: string + operator: + description: Represents a key's relationship to + a set of values. Valid operators are In, NotIn, + Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: An array of string values. If the + operator is In or NotIn, the values array must + be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator + is Gt or Lt, the values array must have a single + element, which will be interpreted as an integer. + This array is replaced during a strategic merge + patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + type: array + required: + - nodeSelectorTerms + type: object + type: object phase: description: Phase describes current phase of operation type: string diff --git a/pkg/common/constants.go b/pkg/common/constants.go index 0659d5566cb..f33fc2f034f 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -209,3 +209,9 @@ const ( const ( RuntimeFuseHostPIDKey = "runtime.fluid.io/fuse.hostpid" ) + +const ( + K8sNodeNameLabelKey = "kubernetes.io/hostname" + K8sZoneLabelKey = "topology.kubernetes.io/zone" + K8sRegionLabelKey = "topology.kubernetes.io/region" +) diff --git a/pkg/controllers/v1alpha1/databackup/status_handler.go b/pkg/controllers/v1alpha1/databackup/status_handler.go index bc89b2e55c8..5a720015ebb 100644 --- a/pkg/controllers/v1alpha1/databackup/status_handler.go +++ b/pkg/controllers/v1alpha1/databackup/status_handler.go @@ -17,6 +17,8 @@ package databackup import ( + "fmt" + "github.com/fluid-cloudnative/fluid/pkg/dataflow" "time" "github.com/fluid-cloudnative/fluid/api/v1alpha1" @@ -51,6 +53,14 @@ func (o *OnceHandler) GetOperationStatus(ctx runtime.ReconcileRequestContext, op if !kubeclient.IsFinishedPod(backupPod) { return } + // set the node labels in status when job finished + if result.NodeAffinity == nil { + // generate the node labels + result.NodeAffinity, err = dataflow.GenerateNodeAffinity(ctx.Client, backupPod) + if err != nil { + return nil, fmt.Errorf("error to generate the node labels: %v", err) + } + } var finishTime time.Time if len(backupPod.Status.Conditions) != 0 { diff --git a/pkg/controllers/v1alpha1/dataload/status_handler.go b/pkg/controllers/v1alpha1/dataload/status_handler.go index a1df0c1ba3b..cbd0f8cd95f 100644 --- a/pkg/controllers/v1alpha1/dataload/status_handler.go +++ b/pkg/controllers/v1alpha1/dataload/status_handler.go @@ -17,6 +17,8 @@ package dataload import ( + "fmt" + "github.com/fluid-cloudnative/fluid/pkg/dataflow" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/types" @@ -71,36 +73,49 @@ func (r *OnceStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestCont return } - if len(job.Status.Conditions) != 0 { - if job.Status.Conditions[0].Type == batchv1.JobFailed || - job.Status.Conditions[0].Type == batchv1.JobComplete { - // job either failed or complete, update DataLoad's phase status - jobCondition := job.Status.Conditions[0] + isJobFinished := len(job.Status.Conditions) != 0 && + (job.Status.Conditions[0].Type == batchv1.JobFailed || job.Status.Conditions[0].Type == batchv1.JobComplete) + if !isJobFinished { + ctx.Log.V(1).Info("DataLoad job still running", "namespace", ctx.Namespace, "jobName", jobName) + return + } - result.Conditions = []datav1alpha1.Condition{ - { - Type: common.ConditionType(jobCondition.Type), - Status: jobCondition.Status, - Reason: jobCondition.Reason, - Message: jobCondition.Message, - LastProbeTime: jobCondition.LastProbeTime, - LastTransitionTime: jobCondition.LastTransitionTime, - }, - } - if jobCondition.Type == batchv1.JobFailed { - result.Phase = common.PhaseFailed - } else { - result.Phase = common.PhaseComplete - } - result.Duration = utils.CalculateDuration(job.CreationTimestamp.Time, jobCondition.LastTransitionTime.Time) + // set the node labels in status when job finished + if result.NodeAffinity == nil { + jobPod, err := kubeclient.GetSucceedPodForJob(r.Client, job) + if err != nil { + ctx.Log.Error(err, "can't get pod for job", "namespace", ctx.Namespace, "jobName", jobName) + return nil, err + } - return + // generate the node labels + result.NodeAffinity, err = dataflow.GenerateNodeAffinity(r.Client, jobPod) + if err != nil { + return nil, fmt.Errorf("error to generate the node labels: %v", err) } } - ctx.Log.V(1).Info("DataLoad job still running", "namespace", ctx.Namespace, "jobName", jobName) + + // job either failed or complete, update DataLoad's phase status + jobCondition := job.Status.Conditions[0] + result.Conditions = []datav1alpha1.Condition{ + { + Type: common.ConditionType(jobCondition.Type), + Status: jobCondition.Status, + Reason: jobCondition.Reason, + Message: jobCondition.Message, + LastProbeTime: jobCondition.LastProbeTime, + LastTransitionTime: jobCondition.LastTransitionTime, + }, + } + if jobCondition.Type == batchv1.JobFailed { + result.Phase = common.PhaseFailed + } else { + result.Phase = common.PhaseComplete + } + result.Duration = utils.CalculateDuration(job.CreationTimestamp.Time, jobCondition.LastTransitionTime.Time) + return } - func (c *CronStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestContext, opStatus *datav1alpha1.OperationStatus) (result *datav1alpha1.OperationStatus, err error) { result = opStatus.DeepCopy() @@ -135,7 +150,6 @@ func (c *CronStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestCont return } - // get the newest job var currentJob *batchv1.Job for _, job := range jobs { if job.CreationTimestamp == *cronjobStatus.LastScheduleTime || job.CreationTimestamp.After(cronjobStatus.LastScheduleTime.Time) { diff --git a/pkg/controllers/v1alpha1/datamigrate/status_handler.go b/pkg/controllers/v1alpha1/datamigrate/status_handler.go index 082e1a8c6d6..c501d6096fd 100644 --- a/pkg/controllers/v1alpha1/datamigrate/status_handler.go +++ b/pkg/controllers/v1alpha1/datamigrate/status_handler.go @@ -17,6 +17,8 @@ package datamigrate import ( + "fmt" + "github.com/fluid-cloudnative/fluid/pkg/dataflow" "github.com/go-logr/logr" batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/types" @@ -78,32 +80,49 @@ func (m *OnceStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestCont return } - if len(job.Status.Conditions) != 0 { - if job.Status.Conditions[0].Type == batchv1.JobFailed || - job.Status.Conditions[0].Type == batchv1.JobComplete { - jobCondition := job.Status.Conditions[0] - // job either failed or complete, update DataMigrate's phase status - result.Conditions = []datav1alpha1.Condition{ - { - Type: common.ConditionType(jobCondition.Type), - Status: jobCondition.Status, - Reason: jobCondition.Reason, - Message: jobCondition.Message, - LastProbeTime: jobCondition.LastProbeTime, - LastTransitionTime: jobCondition.LastTransitionTime, - }, + isJobFinished := len(job.Status.Conditions) != 0 && + (job.Status.Conditions[0].Type == batchv1.JobFailed || job.Status.Conditions[0].Type == batchv1.JobComplete) + if !isJobFinished { + ctx.Log.V(1).Info("DataMigrate job still running", "namespace", ctx.Namespace, "jobName", jobName) + return + } + + // set the node labels in status when job finished + // for parallel migrate, there are multiple pods, so can not set the node labels. + if m.dataMigrate.Spec.Parallelism == 1 { + // set the node labels in status + if result.NodeAffinity == nil { + jobPod, err := kubeclient.GetSucceedPodForJob(m.Client, job) + if err != nil { + ctx.Log.Error(err, "can't get pod for job", "namespace", ctx.Namespace, "jobName", jobName) + return nil, err } - if jobCondition.Type == batchv1.JobFailed { - result.Phase = common.PhaseFailed - } else { - result.Phase = common.PhaseComplete + + // generate the node labels + result.NodeAffinity, err = dataflow.GenerateNodeAffinity(m.Client, jobPod) + if err != nil { + return nil, fmt.Errorf("error to generate the node labels: %v", err) } - result.Duration = utils.CalculateDuration(job.CreationTimestamp.Time, jobCondition.LastTransitionTime.Time) - return } } - - ctx.Log.V(1).Info("DataMigrate job still running", "namespace", ctx.Namespace, "jobName", jobName) + // job either failed or complete, update DataMigrate's phase status + jobCondition := job.Status.Conditions[0] + result.Conditions = []datav1alpha1.Condition{ + { + Type: common.ConditionType(jobCondition.Type), + Status: jobCondition.Status, + Reason: jobCondition.Reason, + Message: jobCondition.Message, + LastProbeTime: jobCondition.LastProbeTime, + LastTransitionTime: jobCondition.LastTransitionTime, + }, + } + if jobCondition.Type == batchv1.JobFailed { + result.Phase = common.PhaseFailed + } else { + result.Phase = common.PhaseComplete + } + result.Duration = utils.CalculateDuration(job.CreationTimestamp.Time, jobCondition.LastTransitionTime.Time) return } diff --git a/pkg/controllers/v1alpha1/dataprocess/status_handler.go b/pkg/controllers/v1alpha1/dataprocess/status_handler.go index a4a5157d018..d2d00ce521f 100644 --- a/pkg/controllers/v1alpha1/dataprocess/status_handler.go +++ b/pkg/controllers/v1alpha1/dataprocess/status_handler.go @@ -17,8 +17,10 @@ package dataprocess import ( + "fmt" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/dataflow" "github.com/fluid-cloudnative/fluid/pkg/dataoperation" "github.com/fluid-cloudnative/fluid/pkg/runtime" "github.com/fluid-cloudnative/fluid/pkg/utils" @@ -60,32 +62,48 @@ func (handler *OnceStatusHandler) GetOperationStatus(ctx runtime.ReconcileReques return } - if len(job.Status.Conditions) != 0 { - if job.Status.Conditions[0].Type == batchv1.JobFailed || - job.Status.Conditions[0].Type == batchv1.JobComplete { - // job either failed or complete, update DataLoad's phase status - jobCondition := job.Status.Conditions[0] - - result.Conditions = []datav1alpha1.Condition{ - { - Type: common.ConditionType(jobCondition.Type), - Status: jobCondition.Status, - Reason: jobCondition.Reason, - Message: jobCondition.Message, - LastProbeTime: jobCondition.LastProbeTime, - LastTransitionTime: jobCondition.LastTransitionTime, - }, - } - if jobCondition.Type == batchv1.JobFailed { - result.Phase = common.PhaseFailed - } else { - result.Phase = common.PhaseComplete - } - result.Duration = utils.CalculateDuration(job.CreationTimestamp.Time, jobCondition.LastTransitionTime.Time) + isJobFinished := len(job.Status.Conditions) != 0 && + (job.Status.Conditions[0].Type == batchv1.JobFailed || job.Status.Conditions[0].Type == batchv1.JobComplete) + if !isJobFinished { + ctx.Log.V(1).Info("DataProcess job still running", "namespace", ctx.Namespace, "jobName", jobName) + return + } + + // set the node labels in status when job finished + if result.NodeAffinity == nil { + jobPod, err := kubeclient.GetSucceedPodForJob(handler.Client, job) + if err != nil { + ctx.Log.Error(err, "can't get pod for job", "namespace", ctx.Namespace, "jobName", jobName) + return nil, err + } - return + // generate the node labels + result.NodeAffinity, err = dataflow.GenerateNodeAffinity(handler.Client, jobPod) + if err != nil { + return nil, fmt.Errorf("error to generate the node labels: %v", err) } } - ctx.Log.V(1).Info("DataProcess job still running", "namespace", ctx.Namespace, "jobName", jobName) + + // job either failed or complete, update DataLoad's phase status + jobCondition := job.Status.Conditions[0] + + result.Conditions = []datav1alpha1.Condition{ + { + Type: common.ConditionType(jobCondition.Type), + Status: jobCondition.Status, + Reason: jobCondition.Reason, + Message: jobCondition.Message, + LastProbeTime: jobCondition.LastProbeTime, + LastTransitionTime: jobCondition.LastTransitionTime, + }, + } + + if jobCondition.Type == batchv1.JobFailed { + result.Phase = common.PhaseFailed + } else { + result.Phase = common.PhaseComplete + } + result.Duration = utils.CalculateDuration(job.CreationTimestamp.Time, jobCondition.LastTransitionTime.Time) + return } diff --git a/pkg/databackup/value.go b/pkg/databackup/value.go index be9012c3770..c007da755b0 100644 --- a/pkg/databackup/value.go +++ b/pkg/databackup/value.go @@ -43,4 +43,5 @@ type DataBackup struct { RuntimeType string `yaml:"runtimeType,omitempty"` // image pull secrets ImagePullSecrets []corev1.LocalObjectReference `yaml:"imagePullSecrets,omitempty"` + Affinity *corev1.Affinity `yaml:"affinity,omitempty"` } diff --git a/pkg/dataflow/affinity.go b/pkg/dataflow/affinity.go new file mode 100644 index 00000000000..481fdb85df0 --- /dev/null +++ b/pkg/dataflow/affinity.go @@ -0,0 +1,135 @@ +/* +Copyright 2024 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataflow + +import ( + "fmt" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils" + v1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// InjectAffinityByRunAfterOp inject the affinity based on preceding operation +func InjectAffinityByRunAfterOp(c client.Client, runAfter *datav1alpha1.OperationRef, opNamespace string, currentAffinity *v1.Affinity) (*v1.Affinity, error) { + // no previous operation or use default affinity strategy, no need to generate node affinity + if runAfter == nil || runAfter.AffinityStrategy.Policy == datav1alpha1.DefaultAffinityStrategy { + return currentAffinity, nil + } + precedingOpNamespace := opNamespace + if len(runAfter.Namespace) != 0 { + precedingOpNamespace = runAfter.Namespace + } + + precedingOpStatus, err := utils.GetPrecedingOperationStatus(c, runAfter, precedingOpNamespace) + if err != nil { + return currentAffinity, err + } + + // require policy + if runAfter.AffinityStrategy.Policy == datav1alpha1.RequireAffinityStrategy { + return injectRequiredAffinity(runAfter, precedingOpStatus.NodeAffinity, currentAffinity) + } + + // prefer policy + if runAfter.AffinityStrategy.Policy == datav1alpha1.PreferAffinityStrategy { + return injectPreferredAffinity(runAfter, precedingOpStatus.NodeAffinity, currentAffinity) + } + + return currentAffinity, fmt.Errorf("unknown policy for affinity strategy: %s", runAfter.AffinityStrategy.Policy) +} + +func injectPreferredAffinity(runAfter *datav1alpha1.OperationRef, prevOpNodeAffinity *v1.NodeAffinity, currentAffinity *v1.Affinity) (*v1.Affinity, error) { + var preferTerms []v1.PreferredSchedulingTerm + prefer := runAfter.AffinityStrategy.Prefer + if len(prefer) == 0 { + prefer = []datav1alpha1.Prefer{ + { + Name: common.K8sNodeNameLabelKey, + Weight: 100, + }, + } + } + + if len(prevOpNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) == 0 { + return currentAffinity, fmt.Errorf("no node selector terms in the preceding operation") + } + + // currently, only has one element. + podNodeSelectorTerm := prevOpNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0] + + for _, item := range prefer { + key := item.Name + for _, expression := range podNodeSelectorTerm.MatchExpressions { + if expression.Key == key { + preferTerms = append(preferTerms, v1.PreferredSchedulingTerm{ + Weight: item.Weight, + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: expression.Key, + Operator: expression.Operator, + Values: expression.Values, + }, + }, + }, + }) + } + } + } + return utils.InjectPreferredSchedulingTermsToAffinity(preferTerms, currentAffinity), nil +} + +func injectRequiredAffinity(runAfter *datav1alpha1.OperationRef, prevOpNodeAffinity *v1.NodeAffinity, currentAffinity *v1.Affinity) (*v1.Affinity, error) { + if prevOpNodeAffinity == nil || prevOpNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil || + prevOpNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms == nil { + return currentAffinity, nil + } + + var matchExpressions []v1.NodeSelectorRequirement + require := runAfter.AffinityStrategy.Require + if len(require) == 0 { + require = []datav1alpha1.Require{ + { + Name: common.K8sNodeNameLabelKey, + }, + } + } + + if len(prevOpNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) == 0 { + return currentAffinity, fmt.Errorf("no node selector terms in the preceding operation") + } + + // currently, only has one element. + podNodeSelectorTerm := prevOpNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0] + + for _, item := range require { + for _, expression := range podNodeSelectorTerm.MatchExpressions { + if expression.Key == item.Name { + matchExpressions = append(matchExpressions, + v1.NodeSelectorRequirement{ + Key: expression.Key, + Operator: expression.Operator, + Values: expression.Values, + }, + ) + } + } + } + return utils.InjectNodeSelectorRequirements(matchExpressions, currentAffinity), nil +} diff --git a/pkg/dataflow/affinity_test.go b/pkg/dataflow/affinity_test.go new file mode 100644 index 00000000000..a105bcf3520 --- /dev/null +++ b/pkg/dataflow/affinity_test.go @@ -0,0 +1,305 @@ +/* +Copyright 2024 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataflow + +import ( + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "reflect" + "testing" +) + +func TestInjectAffinityByRunAfterOp(t *testing.T) { + + type args struct { + runAfter *datav1alpha1.OperationRef + opNamespace string + objects []runtime.Object + currentAffinity *v1.Affinity + } + tests := []struct { + name string + args args + want *v1.Affinity + wantErr bool + }{ + { + name: "default policy", + args: args{ + runAfter: &datav1alpha1.OperationRef{ + Kind: "DataLoad", + Name: "test-op", + }, + objects: []runtime.Object{ + &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-op", + Namespace: "default", + }, + Status: datav1alpha1.OperationStatus{}, + }, + }, + opNamespace: "default", + currentAffinity: nil, + }, + want: nil, + wantErr: false, + }, + { + name: "no preceding op, error", + args: args{ + runAfter: &datav1alpha1.OperationRef{ + Kind: "DataLoad", + Name: "test-op", + AffinityStrategy: datav1alpha1.AffinityStrategy{ + Policy: datav1alpha1.PreferAffinityStrategy, + }, + }, + objects: []runtime.Object{ + &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-op2", + Namespace: "default", + }, + Status: datav1alpha1.OperationStatus{}, + }, + }, + opNamespace: "default", + currentAffinity: nil, + }, + want: nil, + wantErr: true, + }, + { + name: "require policy, use node", + args: args{ + runAfter: &datav1alpha1.OperationRef{ + Kind: "DataLoad", + Name: "test-op", + AffinityStrategy: datav1alpha1.AffinityStrategy{ + Policy: datav1alpha1.RequireAffinityStrategy, + }, + }, + objects: []runtime.Object{ + &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-op", + Namespace: "default", + }, + Status: datav1alpha1.OperationStatus{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: common.K8sNodeNameLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"node01"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + opNamespace: "default", + currentAffinity: nil, + }, + want: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: common.K8sNodeNameLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"node01"}, + }, + }, + }, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "require policy, customized", + args: args{ + runAfter: &datav1alpha1.OperationRef{ + Kind: "DataLoad", + Name: "test-op", + AffinityStrategy: datav1alpha1.AffinityStrategy{ + Policy: datav1alpha1.RequireAffinityStrategy, + Require: []datav1alpha1.Require{ + { + Name: "k8s.rack", + }, + }, + }, + }, + objects: []runtime.Object{ + &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-op", + Namespace: "default", + }, + Status: datav1alpha1.OperationStatus{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "k8s.rack", + Operator: v1.NodeSelectorOpIn, + Values: []string{"rack01"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + opNamespace: "default", + currentAffinity: nil, + }, + want: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "k8s.rack", + Operator: v1.NodeSelectorOpIn, + Values: []string{"rack01"}, + }, + }, + }, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "prefer policy, use zone", + args: args{ + runAfter: &datav1alpha1.OperationRef{ + Kind: "DataLoad", + Name: "test-op", + Namespace: "test", + AffinityStrategy: datav1alpha1.AffinityStrategy{ + Policy: datav1alpha1.PreferAffinityStrategy, + Prefer: []datav1alpha1.Prefer{ + { + Weight: 10, + Name: common.K8sZoneLabelKey, + }, + }, + }, + }, + objects: []runtime.Object{ + &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-op", + Namespace: "test", + }, + Status: datav1alpha1.OperationStatus{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: common.K8sNodeNameLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"node01"}, + }, + { + Key: common.K8sZoneLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"zone01"}, + }, + { + Key: common.K8sRegionLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"region01"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + opNamespace: "default", + currentAffinity: nil, + }, + want: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ + { + Weight: 10, + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: common.K8sZoneLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"zone01"}, + }, + }, + }, + }, + }, + }, + }, + wantErr: false, + }, + } + testScheme := runtime.NewScheme() + _ = v1.AddToScheme(testScheme) + _ = datav1alpha1.AddToScheme(testScheme) + _ = appsv1.AddToScheme(testScheme) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := fake.NewFakeClientWithScheme(testScheme, tt.args.objects...) + got, err := InjectAffinityByRunAfterOp(c, tt.args.runAfter, tt.args.opNamespace, tt.args.currentAffinity) + if (err != nil) != tt.wantErr { + t.Errorf("InjectAffinityByRunAfterOp() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("InjectAffinityByRunAfterOp() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/dataflow/helper.go b/pkg/dataflow/helper.go new file mode 100644 index 00000000000..bb0253c42a9 --- /dev/null +++ b/pkg/dataflow/helper.go @@ -0,0 +1,127 @@ +/* +Copyright 2024 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataflow + +import ( + "fmt" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func GenerateNodeAffinity(c client.Client, pod *corev1.Pod) (*corev1.NodeAffinity, error) { + if pod == nil { + return nil, nil + } + nodeName := pod.Spec.NodeName + if len(nodeName) == 0 { + return nil, nil + } + + node, err := kubeclient.GetNode(c, nodeName) + if err != nil { + return nil, fmt.Errorf("error to get node %s: %v", nodeName, err) + } + + // node name + nodeAffinity := &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: common.K8sNodeNameLabelKey, + Operator: corev1.NodeSelectorOpIn, + Values: []string{nodeName}, + }, + }, + }, + }, + }, + } + + // region + region, exist := node.Labels[common.K8sRegionLabelKey] + if exist { + nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions = + append(nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, + corev1.NodeSelectorRequirement{ + Key: common.K8sRegionLabelKey, + Operator: corev1.NodeSelectorOpIn, + Values: []string{region}, + }) + } + // zone + zone, exist := node.Labels[common.K8sZoneLabelKey] + if exist { + nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions = + append(nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, + corev1.NodeSelectorRequirement{ + Key: common.K8sZoneLabelKey, + Operator: corev1.NodeSelectorOpIn, + Values: []string{zone}, + }) + } + + // customized labels + if pod.Spec.Affinity != nil && pod.Spec.Affinity.NodeAffinity != nil { + fillCustomizedNodeAffinity(pod.Spec.Affinity.NodeAffinity, nodeAffinity, node) + } + + return nodeAffinity, nil +} + +func fillCustomizedNodeAffinity(podNodeAffinity *corev1.NodeAffinity, dstNodeAffinity *corev1.NodeAffinity, node *corev1.Node) { + // prefer + for _, term := range podNodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + for _, expression := range term.Preference.MatchExpressions { + // use the actually value in the node. Transform In, NotIn, Exists, DoesNotExist. Gt, and Lt to In. + value, exist := node.Labels[expression.Key] + if exist { + dstNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions = + append(dstNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, + corev1.NodeSelectorRequirement{ + Key: expression.Key, + Operator: corev1.NodeSelectorOpIn, + Values: []string{value}, + }) + } + } + } + + if podNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + return + } + + // require + for _, term := range podNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms { + for _, expression := range term.MatchExpressions { + // use the actually value in the node. Transform In, NotIn, Exists, DoesNotExist. Gt, and Lt to In. + value, exist := node.Labels[expression.Key] + if exist { + dstNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions = + append(dstNodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, + corev1.NodeSelectorRequirement{ + Key: expression.Key, + Operator: corev1.NodeSelectorOpIn, + Values: []string{value}, + }) + } + } + } +} diff --git a/pkg/dataflow/helper_test.go b/pkg/dataflow/helper_test.go new file mode 100644 index 00000000000..1f6745058e8 --- /dev/null +++ b/pkg/dataflow/helper_test.go @@ -0,0 +1,195 @@ +package dataflow + +import ( + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "reflect" + "sigs.k8s.io/controller-runtime/pkg/client" + "testing" +) + +func TestGenerateNodeLabels(t *testing.T) { + type args struct { + pod *v1.Pod + node *v1.Node + } + tests := []struct { + name string + args args + want *v1.NodeAffinity + wantErr bool + }{ + { + name: "default labels", + args: args{ + pod: &v1.Pod{ + Spec: v1.PodSpec{ + NodeName: "node01", + }, + }, + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node01", + Labels: map[string]string{ + common.K8sNodeNameLabelKey: "node01", + common.K8sRegionLabelKey: "region01", + common.K8sZoneLabelKey: "zone01", + }, + }, + }, + }, + want: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: common.K8sNodeNameLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"node01"}, + }, + { + Key: common.K8sRegionLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"region01"}, + }, + { + Key: common.K8sZoneLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"zone01"}, + }, + }, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "nil pod", + args: args{ + pod: nil, + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node01", + Labels: map[string]string{ + common.K8sNodeNameLabelKey: "node01", + common.K8sRegionLabelKey: "region01", + common.K8sZoneLabelKey: "zone01", + }, + }, + }, + }, + want: nil, + wantErr: false, + }, + { + name: "customized labels", + args: args{ + pod: &v1.Pod{ + Spec: v1.PodSpec{ + NodeName: "node01", + Affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ + { + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "k8s.gpu", + Operator: v1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + Weight: 10, + }, + }, + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "k8s.rack", + Operator: v1.NodeSelectorOpIn, + Values: []string{"rack01"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node01", + Labels: map[string]string{ + common.K8sNodeNameLabelKey: "node01", + common.K8sZoneLabelKey: "zone01", + "k8s.rack": "rack01", + "k8s.gpu": "false", + }, + }, + }, + }, + want: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: common.K8sNodeNameLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"node01"}, + }, + { + Key: common.K8sZoneLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{"zone01"}, + }, + { + Key: "k8s.gpu", + Operator: v1.NodeSelectorOpIn, + Values: []string{"false"}, + }, + { + Key: "k8s.rack", + Operator: v1.NodeSelectorOpIn, + Values: []string{"rack01"}, + }, + }, + }, + }, + }, + }, + wantErr: false, + }, + } + testScheme := runtime.NewScheme() + _ = v1.AddToScheme(testScheme) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var c client.Client + if tt.args.pod == nil { + c = fake.NewFakeClientWithScheme(testScheme, tt.args.node) + } else { + c = fake.NewFakeClientWithScheme(testScheme, tt.args.node, tt.args.pod) + } + + got, err := GenerateNodeAffinity(c, tt.args.pod) + if (err != nil) != tt.wantErr { + t.Errorf("GenerateNodeAffinity() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("GenerateNodeAffinity() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/dataprocess/generate_values.go b/pkg/dataprocess/generate_values.go index 102370e1770..c59a8f498ac 100644 --- a/pkg/dataprocess/generate_values.go +++ b/pkg/dataprocess/generate_values.go @@ -18,7 +18,9 @@ package dataprocess import ( "fmt" + "github.com/fluid-cloudnative/fluid/pkg/dataflow" "os" + "sigs.k8s.io/controller-runtime/pkg/client" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/utils/transfromer" @@ -27,9 +29,28 @@ import ( "sigs.k8s.io/yaml" ) -func GenDataProcessValueFile(dataset *datav1alpha1.Dataset, dataProcess *datav1alpha1.DataProcess) (valueFileName string, err error) { +func GenDataProcessValueFile(client client.Client, dataset *datav1alpha1.Dataset, dataProcess *datav1alpha1.DataProcess) (valueFileName string, err error) { dataProcessValue := GenDataProcessValue(dataset, dataProcess) + // generate the node affinity by previous operation pod. + if dataProcess.Spec.Processor.Job != nil { + affinity, err := dataflow.InjectAffinityByRunAfterOp(client, dataProcess.Spec.RunAfter, dataProcess.Namespace, dataProcess.Spec.Processor.Job.PodSpec.Affinity) + if err != nil { + return "", errors.Wrapf(err, "failed to inject affinity by runAfterOp") + } + dataProcessValue.DataProcessInfo.JobProcessor.PodSpec.Affinity = affinity + } else { + affinity, err := dataflow.InjectAffinityByRunAfterOp(client, dataProcess.Spec.RunAfter, dataProcess.Namespace, nil) + if err != nil { + return "", errors.Wrapf(err, "failed to inject affinity by runAfterOp") + } + dataProcessValue.DataProcessInfo.ScriptProcessor.Affinity = affinity + } + + if err != nil { + return "", errors.Wrapf(err, "failed to generate dataProcessValue of DataProcess %s/%s", dataProcess.GetNamespace(), dataProcess.GetName()) + } + data, err := yaml.Marshal(dataProcessValue) if err != nil { return "", errors.Wrapf(err, "failed to marshal dataProcessValue of DataProcess %s/%s", dataProcess.GetNamespace(), dataProcess.GetName()) diff --git a/pkg/dataprocess/processor_types.go b/pkg/dataprocess/processor_types.go index e9a8536bee0..e59d3a39ee7 100644 --- a/pkg/dataprocess/processor_types.go +++ b/pkg/dataprocess/processor_types.go @@ -18,7 +18,6 @@ package dataprocess import ( "fmt" - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" corev1 "k8s.io/api/core/v1" ) @@ -105,4 +104,5 @@ func (p *ScriptProcessorImpl) TransformDataProcessValues(value *DataProcessValue Command: p.ScriptProcessor.Command, Source: p.ScriptProcessor.Source, } + } diff --git a/pkg/dataprocess/value.go b/pkg/dataprocess/value.go index 2bd501ed0a0..a3e8fd63445 100644 --- a/pkg/dataprocess/value.go +++ b/pkg/dataprocess/value.go @@ -61,6 +61,8 @@ type ScriptProcessor struct { VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` Resources corev1.ResourceRequirements `json:"resources,omitempty"` + + Affinity *corev1.Affinity `json:"affinity,omitempty"` } type JobProcessor struct { diff --git a/pkg/ddc/alluxio/backup_data.go b/pkg/ddc/alluxio/backup_data.go index 5cbdfb63e82..8e2f660ee90 100644 --- a/pkg/ddc/alluxio/backup_data.go +++ b/pkg/ddc/alluxio/backup_data.go @@ -18,6 +18,7 @@ package alluxio import ( "fmt" + "github.com/fluid-cloudnative/fluid/pkg/dataflow" "os" "strconv" "strings" @@ -132,6 +133,12 @@ func (e *AlluxioEngine) generateDataBackupValueFile(ctx cruntime.ReconcileReques dataBackup.PVCName = pvcName dataBackup.Path = path + // inject the node affinity by previous operation pod. + dataBackup.Affinity, err = dataflow.InjectAffinityByRunAfterOp(e.Client, databackup.Spec.RunAfter, databackup.Namespace, nil) + if err != nil { + return "", err + } + dataBackupValue := cdatabackup.DataBackupValue{DataBackup: dataBackup} dataBackupValue.InitUsers = common.InitUsers{ diff --git a/pkg/ddc/alluxio/load_data.go b/pkg/ddc/alluxio/load_data.go index 9d6e233ddb9..807507ed39d 100644 --- a/pkg/ddc/alluxio/load_data.go +++ b/pkg/ddc/alluxio/load_data.go @@ -18,6 +18,7 @@ package alluxio import ( "fmt" + "github.com/fluid-cloudnative/fluid/pkg/dataflow" "github.com/fluid-cloudnative/fluid/pkg/utils/transfromer" "os" "sigs.k8s.io/controller-runtime/pkg/client" @@ -112,7 +113,10 @@ func (e *AlluxioEngine) generateDataLoadValueFile(r cruntime.ReconcileRequestCon image := fmt.Sprintf("%s:%s", imageName, imageTag) - dataLoadValue := e.genDataLoadValue(image, targetDataset, dataload) + dataLoadValue, err := e.genDataLoadValue(image, targetDataset, dataload) + if err != nil { + return + } data, err := yaml.Marshal(dataLoadValue) if err != nil { @@ -130,7 +134,7 @@ func (e *AlluxioEngine) generateDataLoadValueFile(r cruntime.ReconcileRequestCon return valueFile.Name(), nil } -func (e *AlluxioEngine) genDataLoadValue(image string, targetDataset *datav1alpha1.Dataset, dataload *datav1alpha1.DataLoad) *cdataload.DataLoadValue { +func (e *AlluxioEngine) genDataLoadValue(image string, targetDataset *datav1alpha1.Dataset, dataload *datav1alpha1.DataLoad) (*cdataload.DataLoadValue, error) { // image pull secrets // if the environment variable is not set, it is still an empty slice imagePullSecrets := docker.GetImagePullSecretsFromEnv(common.EnvImagePullSecretsKey) @@ -153,6 +157,13 @@ func (e *AlluxioEngine) genDataLoadValue(image string, targetDataset *datav1alph dataloadInfo.Affinity = dataload.Spec.Affinity } + // inject the node affinity by previous operation pod. + var err error + dataloadInfo.Affinity, err = dataflow.InjectAffinityByRunAfterOp(e.Client, dataload.Spec.RunAfter, dataload.Namespace, dataloadInfo.Affinity) + if err != nil { + return nil, err + } + // node selector if dataload.Spec.NodeSelector != nil { if dataloadInfo.NodeSelector == nil { @@ -190,7 +201,7 @@ func (e *AlluxioEngine) genDataLoadValue(image string, targetDataset *datav1alph Owner: transfromer.GenerateOwnerReferenceFromObject(dataload), } - return dataLoadValue + return dataLoadValue, nil } func (e *AlluxioEngine) CheckRuntimeReady() (ready bool) { diff --git a/pkg/ddc/alluxio/load_data_test.go b/pkg/ddc/alluxio/load_data_test.go index 88260108805..e8ab5be920c 100644 --- a/pkg/ddc/alluxio/load_data_test.go +++ b/pkg/ddc/alluxio/load_data_test.go @@ -552,7 +552,7 @@ func Test_genDataLoadValue(t *testing.T) { Log: fake.NullLogger(), } for k, item := range testCases { - got := engine.genDataLoadValue(item.image, item.targetDataset, item.dataload) + got, _ := engine.genDataLoadValue(item.image, item.targetDataset, item.dataload) if !reflect.DeepEqual(got, item.want) { t.Errorf("case %s, got %v,want:%v", k, got, item.want) } diff --git a/pkg/ddc/alluxio/process_data.go b/pkg/ddc/alluxio/process_data.go index 63dbb8cd443..549489efdde 100644 --- a/pkg/ddc/alluxio/process_data.go +++ b/pkg/ddc/alluxio/process_data.go @@ -39,5 +39,5 @@ func (e *AlluxioEngine) generateDataProcessValueFile(ctx cruntime.ReconcileReque return "", errors.Wrap(err, "failed to get dataset") } - return dataprocess.GenDataProcessValueFile(targetDataset, dataProcess) + return dataprocess.GenDataProcessValueFile(e.Client, targetDataset, dataProcess) } diff --git a/pkg/ddc/efc/data_process.go b/pkg/ddc/efc/data_process.go index 06974be49bc..8570d99e2ee 100644 --- a/pkg/ddc/efc/data_process.go +++ b/pkg/ddc/efc/data_process.go @@ -23,5 +23,5 @@ func (e *EFCEngine) generateDataProcessValueFile(ctx cruntime.ReconcileRequestCo return "", errors.Wrap(err, "failed to get dataset") } - return dataprocess.GenDataProcessValueFile(targetDataset, dataProcess) + return dataprocess.GenDataProcessValueFile(e.Client, targetDataset, dataProcess) } diff --git a/pkg/ddc/goosefs/backup_data.go b/pkg/ddc/goosefs/backup_data.go index 21c7c3b3c0a..22053884bd6 100644 --- a/pkg/ddc/goosefs/backup_data.go +++ b/pkg/ddc/goosefs/backup_data.go @@ -21,6 +21,7 @@ import ( datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" cdatabackup "github.com/fluid-cloudnative/fluid/pkg/databackup" + "github.com/fluid-cloudnative/fluid/pkg/dataflow" "github.com/fluid-cloudnative/fluid/pkg/ddc/goosefs/operations" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" "github.com/fluid-cloudnative/fluid/pkg/utils" @@ -132,6 +133,12 @@ func (e *GooseFSEngine) generateDataBackupValueFile(ctx cruntime.ReconcileReques dataBackup.PVCName = pvcName dataBackup.Path = path + // inject the node affinity by previous operation pod. + dataBackup.Affinity, err = dataflow.InjectAffinityByRunAfterOp(e.Client, databackup.Spec.RunAfter, databackup.Namespace, nil) + if err != nil { + return "", err + } + dataBackupValue := cdatabackup.DataBackupValue{DataBackup: dataBackup} dataBackupValue.InitUsers = common.InitUsers{ diff --git a/pkg/ddc/goosefs/load_data.go b/pkg/ddc/goosefs/load_data.go index aff05e47871..3f70c3907d5 100644 --- a/pkg/ddc/goosefs/load_data.go +++ b/pkg/ddc/goosefs/load_data.go @@ -16,6 +16,7 @@ package goosefs import ( "fmt" + "github.com/fluid-cloudnative/fluid/pkg/dataflow" "github.com/fluid-cloudnative/fluid/pkg/utils/transfromer" "os" "sigs.k8s.io/controller-runtime/pkg/client" @@ -109,7 +110,10 @@ func (e *GooseFSEngine) generateDataLoadValueFile(r cruntime.ReconcileRequestCon } image := fmt.Sprintf("%s:%s", imageName, imageTag) - dataLoadValue := e.genDataLoadValue(image, targetDataset, dataload) + dataLoadValue, err := e.genDataLoadValue(image, targetDataset, dataload) + if err != nil { + return + } data, err := yaml.Marshal(dataLoadValue) if err != nil { @@ -127,7 +131,7 @@ func (e *GooseFSEngine) generateDataLoadValueFile(r cruntime.ReconcileRequestCon return valueFile.Name(), nil } -func (e *GooseFSEngine) genDataLoadValue(image string, targetDataset *datav1alpha1.Dataset, dataload *datav1alpha1.DataLoad) *cdataload.DataLoadValue { +func (e *GooseFSEngine) genDataLoadValue(image string, targetDataset *datav1alpha1.Dataset, dataload *datav1alpha1.DataLoad) (*cdataload.DataLoadValue, error) { imagePullSecrets := docker.GetImagePullSecretsFromEnv(common.EnvImagePullSecretsKey) dataloadInfo := cdataload.DataLoadInfo{ @@ -146,6 +150,13 @@ func (e *GooseFSEngine) genDataLoadValue(image string, targetDataset *datav1alph dataloadInfo.Affinity = dataload.Spec.Affinity } + // inject the node affinity by previous operation pod. + var err error + dataloadInfo.Affinity, err = dataflow.InjectAffinityByRunAfterOp(e.Client, dataload.Spec.RunAfter, dataload.Namespace, dataloadInfo.Affinity) + if err != nil { + return nil, err + } + // node selector if dataload.Spec.NodeSelector != nil { if dataloadInfo.NodeSelector == nil { @@ -183,7 +194,7 @@ func (e *GooseFSEngine) genDataLoadValue(image string, targetDataset *datav1alph Owner: transfromer.GenerateOwnerReferenceFromObject(dataload), } - return dataLoadValue + return dataLoadValue, nil } func (e *GooseFSEngine) CheckRuntimeReady() (ready bool) { diff --git a/pkg/ddc/goosefs/load_data_test.go b/pkg/ddc/goosefs/load_data_test.go index ceda8027159..1f38895845c 100644 --- a/pkg/ddc/goosefs/load_data_test.go +++ b/pkg/ddc/goosefs/load_data_test.go @@ -604,7 +604,7 @@ func Test_genDataLoadValue(t *testing.T) { Log: fake.NullLogger(), } for k, item := range testCases { - got := engine.genDataLoadValue(item.image, item.targetDataset, item.dataload) + got, _ := engine.genDataLoadValue(item.image, item.targetDataset, item.dataload) if !reflect.DeepEqual(got, item.want) { t.Errorf("case %s, got %v,want:%v", k, got, item.want) } diff --git a/pkg/ddc/jindo/load_data.go b/pkg/ddc/jindo/load_data.go index 5dc65ccdb1e..b2793f6f6a0 100644 --- a/pkg/ddc/jindo/load_data.go +++ b/pkg/ddc/jindo/load_data.go @@ -18,6 +18,7 @@ package jindo import ( "fmt" + "github.com/fluid-cloudnative/fluid/pkg/dataflow" "github.com/fluid-cloudnative/fluid/pkg/utils/transfromer" "os" "sigs.k8s.io/controller-runtime/pkg/client" @@ -117,7 +118,11 @@ func (e *JindoEngine) generateDataLoadValueFile(r cruntime.ReconcileRequestConte return } - dataLoadValue := e.genDataLoadValue(image, runtime, targetDataset, dataload) + dataLoadValue, err := e.genDataLoadValue(image, runtime, targetDataset, dataload) + if err != nil { + return + } + data, err := yaml.Marshal(dataLoadValue) if err != nil { return @@ -134,7 +139,8 @@ func (e *JindoEngine) generateDataLoadValueFile(r cruntime.ReconcileRequestConte return valueFile.Name(), nil } -func (e *JindoEngine) genDataLoadValue(image string, runtime *datav1alpha1.JindoRuntime, targetDataset *datav1alpha1.Dataset, dataload *datav1alpha1.DataLoad) *cdataload.DataLoadValue { +func (e *JindoEngine) genDataLoadValue(image string, runtime *datav1alpha1.JindoRuntime, targetDataset *datav1alpha1.Dataset, + dataload *datav1alpha1.DataLoad) (*cdataload.DataLoadValue, error) { imagePullSecrets := docker.GetImagePullSecretsFromEnv(common.EnvImagePullSecretsKey) hadoopConfig := runtime.Spec.HadoopConfig @@ -159,6 +165,13 @@ func (e *JindoEngine) genDataLoadValue(image string, runtime *datav1alpha1.Jindo dataloadInfo.Affinity = dataload.Spec.Affinity } + // inject the node affinity by previous operation pod. + var err error + dataloadInfo.Affinity, err = dataflow.InjectAffinityByRunAfterOp(e.Client, dataload.Spec.RunAfter, dataload.Namespace, dataloadInfo.Affinity) + if err != nil { + return nil, err + } + // node selector if dataload.Spec.NodeSelector != nil { if dataloadInfo.NodeSelector == nil { @@ -212,7 +225,7 @@ func (e *JindoEngine) genDataLoadValue(image string, runtime *datav1alpha1.Jindo DataLoadInfo: dataloadInfo, Owner: transfromer.GenerateOwnerReferenceFromObject(dataload), } - return dataLoadValue + return dataLoadValue, nil } func (e *JindoEngine) CheckRuntimeReady() (ready bool) { diff --git a/pkg/ddc/jindo/load_data_test.go b/pkg/ddc/jindo/load_data_test.go index 5335651dfee..a8b44b69a95 100644 --- a/pkg/ddc/jindo/load_data_test.go +++ b/pkg/ddc/jindo/load_data_test.go @@ -697,7 +697,7 @@ func Test_genDataLoadValue(t *testing.T) { Log: fake.NullLogger(), } for k, item := range testCases { - got := engine.genDataLoadValue(item.image, item.runtime, item.targetDataset, item.dataload) + got, _ := engine.genDataLoadValue(item.image, item.runtime, item.targetDataset, item.dataload) if !reflect.DeepEqual(got, item.want) { t.Errorf("case %s, got %v,want:%v", k, got, item.want) } diff --git a/pkg/ddc/jindocache/load_data.go b/pkg/ddc/jindocache/load_data.go index 012882e1c07..d2a5fd86a9c 100644 --- a/pkg/ddc/jindocache/load_data.go +++ b/pkg/ddc/jindocache/load_data.go @@ -18,6 +18,7 @@ package jindocache import ( "fmt" + "github.com/fluid-cloudnative/fluid/pkg/dataflow" "github.com/fluid-cloudnative/fluid/pkg/utils/transfromer" "os" "sigs.k8s.io/controller-runtime/pkg/client" @@ -115,7 +116,10 @@ func (e *JindoCacheEngine) generateDataLoadValueFile(r cruntime.ReconcileRequest return } - dataLoadValue := e.genDataLoadValue(image, runtime, targetDataset, dataload) + dataLoadValue, err := e.genDataLoadValue(image, runtime, targetDataset, dataload) + if err != nil { + return + } data, err := yaml.Marshal(dataLoadValue) if err != nil { @@ -133,7 +137,8 @@ func (e *JindoCacheEngine) generateDataLoadValueFile(r cruntime.ReconcileRequest return valueFile.Name(), nil } -func (e *JindoCacheEngine) genDataLoadValue(image string, runtime *datav1alpha1.JindoRuntime, targetDataset *datav1alpha1.Dataset, dataload *datav1alpha1.DataLoad) *cdataload.DataLoadValue { +func (e *JindoCacheEngine) genDataLoadValue(image string, runtime *datav1alpha1.JindoRuntime, targetDataset *datav1alpha1.Dataset, + dataload *datav1alpha1.DataLoad) (*cdataload.DataLoadValue, error) { hadoopConfig := runtime.Spec.HadoopConfig loadMemorydata := false if len(runtime.Spec.TieredStore.Levels) > 0 && runtime.Spec.TieredStore.Levels[0].MediumType == "MEM" { @@ -159,6 +164,13 @@ func (e *JindoCacheEngine) genDataLoadValue(image string, runtime *datav1alpha1. dataloadInfo.Affinity = dataload.Spec.Affinity } + // inject the node affinity by previous operation pod. + var err error + dataloadInfo.Affinity, err = dataflow.InjectAffinityByRunAfterOp(e.Client, dataload.Spec.RunAfter, dataload.Namespace, dataloadInfo.Affinity) + if err != nil { + return nil, err + } + // node selector if dataload.Spec.NodeSelector != nil { if dataloadInfo.NodeSelector == nil { @@ -213,7 +225,7 @@ func (e *JindoCacheEngine) genDataLoadValue(image string, runtime *datav1alpha1. Owner: transfromer.GenerateOwnerReferenceFromObject(dataload), } - return dataLoadValue + return dataLoadValue, nil } func (e *JindoCacheEngine) CheckRuntimeReady() (ready bool) { diff --git a/pkg/ddc/jindocache/load_data_test.go b/pkg/ddc/jindocache/load_data_test.go index 84ed19f272a..0c0eea6c9d6 100644 --- a/pkg/ddc/jindocache/load_data_test.go +++ b/pkg/ddc/jindocache/load_data_test.go @@ -697,7 +697,7 @@ func Test_genDataLoadValue(t *testing.T) { Log: fake.NullLogger(), } for k, item := range testCases { - got := engine.genDataLoadValue(item.image, item.runtime, item.targetDataset, item.dataload) + got, _ := engine.genDataLoadValue(item.image, item.runtime, item.targetDataset, item.dataload) if !reflect.DeepEqual(got, item.want) { t.Errorf("case %s, got %v,want:%v", k, got, item.want) } diff --git a/pkg/ddc/jindofsx/load_data.go b/pkg/ddc/jindofsx/load_data.go index 9c033963979..5f0e8cd1f7a 100644 --- a/pkg/ddc/jindofsx/load_data.go +++ b/pkg/ddc/jindofsx/load_data.go @@ -18,6 +18,7 @@ package jindofsx import ( "fmt" + "github.com/fluid-cloudnative/fluid/pkg/dataflow" "github.com/fluid-cloudnative/fluid/pkg/utils/transfromer" "os" "sigs.k8s.io/controller-runtime/pkg/client" @@ -115,7 +116,10 @@ func (e *JindoFSxEngine) generateDataLoadValueFile(r cruntime.ReconcileRequestCo return } - dataLoadValue := e.genDataLoadValue(image, runtime, targetDataset, dataload) + dataLoadValue, err := e.genDataLoadValue(image, runtime, targetDataset, dataload) + if err != nil { + return + } data, err := yaml.Marshal(dataLoadValue) if err != nil { @@ -133,7 +137,8 @@ func (e *JindoFSxEngine) generateDataLoadValueFile(r cruntime.ReconcileRequestCo return valueFile.Name(), nil } -func (e *JindoFSxEngine) genDataLoadValue(image string, runtime *datav1alpha1.JindoRuntime, targetDataset *datav1alpha1.Dataset, dataload *datav1alpha1.DataLoad) *cdataload.DataLoadValue { +func (e *JindoFSxEngine) genDataLoadValue(image string, runtime *datav1alpha1.JindoRuntime, targetDataset *datav1alpha1.Dataset, + dataload *datav1alpha1.DataLoad) (*cdataload.DataLoadValue, error) { hadoopConfig := runtime.Spec.HadoopConfig loadMemorydata := false if len(runtime.Spec.TieredStore.Levels) > 0 && runtime.Spec.TieredStore.Levels[0].MediumType == "MEM" { @@ -159,6 +164,13 @@ func (e *JindoFSxEngine) genDataLoadValue(image string, runtime *datav1alpha1.Ji dataloadInfo.Affinity = dataload.Spec.Affinity } + // inject the node affinity by previous operation pod. + var err error + dataloadInfo.Affinity, err = dataflow.InjectAffinityByRunAfterOp(e.Client, dataload.Spec.RunAfter, dataload.Namespace, dataloadInfo.Affinity) + if err != nil { + return nil, err + } + // node selector if dataload.Spec.NodeSelector != nil { if dataloadInfo.NodeSelector == nil { @@ -213,7 +225,7 @@ func (e *JindoFSxEngine) genDataLoadValue(image string, runtime *datav1alpha1.Ji Owner: transfromer.GenerateOwnerReferenceFromObject(dataload), } - return dataLoadValue + return dataLoadValue, nil } func (e *JindoFSxEngine) CheckRuntimeReady() (ready bool) { diff --git a/pkg/ddc/jindofsx/load_data_test.go b/pkg/ddc/jindofsx/load_data_test.go index 8762b4a2ca7..38db68eec44 100644 --- a/pkg/ddc/jindofsx/load_data_test.go +++ b/pkg/ddc/jindofsx/load_data_test.go @@ -697,7 +697,7 @@ func Test_genDataLoadValue(t *testing.T) { Log: fake.NullLogger(), } for k, item := range testCases { - got := engine.genDataLoadValue(item.image, item.runtime, item.targetDataset, item.dataload) + got, _ := engine.genDataLoadValue(item.image, item.runtime, item.targetDataset, item.dataload) if !reflect.DeepEqual(got, item.want) { t.Errorf("case %s, got %v,want:%v", k, got, item.want) } diff --git a/pkg/ddc/jindofsx/process_data.go b/pkg/ddc/jindofsx/process_data.go index fa6dd8599fa..d242a4bec4f 100644 --- a/pkg/ddc/jindofsx/process_data.go +++ b/pkg/ddc/jindofsx/process_data.go @@ -39,5 +39,5 @@ func (e *JindoFSxEngine) generateDataProcessValueFile(ctx cruntime.ReconcileRequ return "", errors.Wrap(err, "failed to get dataset") } - return dataprocess.GenDataProcessValueFile(targetDataset, dataProcess) + return dataprocess.GenDataProcessValueFile(e.Client, targetDataset, dataProcess) } diff --git a/pkg/ddc/juicefs/data_load.go b/pkg/ddc/juicefs/data_load.go index 435312b68b5..c5f27748a95 100644 --- a/pkg/ddc/juicefs/data_load.go +++ b/pkg/ddc/juicefs/data_load.go @@ -18,6 +18,7 @@ package juicefs import ( "fmt" + "github.com/fluid-cloudnative/fluid/pkg/dataflow" "os" "strings" @@ -129,7 +130,10 @@ func (j *JuiceFSEngine) generateDataLoadValueFile(r cruntime.ReconcileRequestCon } image := fmt.Sprintf("%s:%s", imageName, imageTag) - dataLoadValue := j.genDataLoadValue(image, cacheinfo, pods, targetDataset, dataload) + dataLoadValue, err := j.genDataLoadValue(image, cacheinfo, pods, targetDataset, dataload) + if err != nil { + return + } data, err := yaml.Marshal(dataLoadValue) if err != nil { return @@ -147,7 +151,7 @@ func (j *JuiceFSEngine) generateDataLoadValueFile(r cruntime.ReconcileRequestCon return valueFile.Name(), nil } -func (j *JuiceFSEngine) genDataLoadValue(image string, cacheinfo map[string]string, pods []v1.Pod, targetDataset *datav1alpha1.Dataset, dataload *datav1alpha1.DataLoad) *cdataload.DataLoadValue { +func (j *JuiceFSEngine) genDataLoadValue(image string, cacheinfo map[string]string, pods []v1.Pod, targetDataset *datav1alpha1.Dataset, dataload *datav1alpha1.DataLoad) (*cdataload.DataLoadValue, error) { imagePullSecrets := docker.GetImagePullSecretsFromEnv(common.EnvImagePullSecretsKey) dataloadInfo := cdataload.DataLoadInfo{ @@ -168,6 +172,13 @@ func (j *JuiceFSEngine) genDataLoadValue(image string, cacheinfo map[string]stri dataloadInfo.Affinity = dataload.Spec.Affinity } + // generate the node affinity by previous operation pod. + var err error + dataloadInfo.Affinity, err = dataflow.InjectAffinityByRunAfterOp(j.Client, dataload.Spec.RunAfter, dataload.Namespace, dataloadInfo.Affinity) + if err != nil { + return nil, err + } + // node selector if dataload.Spec.NodeSelector != nil { if dataloadInfo.NodeSelector == nil { @@ -236,7 +247,7 @@ func (j *JuiceFSEngine) genDataLoadValue(image string, cacheinfo map[string]stri Owner: transfromer.GenerateOwnerReferenceFromObject(dataload), } - return dataLoadValue + return dataLoadValue, nil } func (j *JuiceFSEngine) CheckRuntimeReady() (ready bool) { diff --git a/pkg/ddc/juicefs/data_load_test.go b/pkg/ddc/juicefs/data_load_test.go index c104176bdc3..551ba2b496f 100644 --- a/pkg/ddc/juicefs/data_load_test.go +++ b/pkg/ddc/juicefs/data_load_test.go @@ -1097,7 +1097,7 @@ func TestJuiceFSEngine_genDataLoadValue(t *testing.T) { name: item.runtimeName, Log: fake.NullLogger(), } - got := engine.genDataLoadValue(item.image, item.cacheInfo, item.pods, item.targetDataset, item.dataload) + got, _ := engine.genDataLoadValue(item.image, item.cacheInfo, item.pods, item.targetDataset, item.dataload) if !reflect.DeepEqual(got, item.want) { t.Errorf("case %s, got %v,want:%v", k, got, item.want) } diff --git a/pkg/ddc/juicefs/data_migrate.go b/pkg/ddc/juicefs/data_migrate.go index 0a4ed713002..e659f58998a 100644 --- a/pkg/ddc/juicefs/data_migrate.go +++ b/pkg/ddc/juicefs/data_migrate.go @@ -18,6 +18,7 @@ package juicefs import ( "fmt" + "github.com/fluid-cloudnative/fluid/pkg/dataflow" "github.com/fluid-cloudnative/fluid/pkg/dataoperation" "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -143,7 +144,6 @@ func (j *JuiceFSEngine) generateDataMigrateValueFile(r cruntime.ReconcileRequest if dataMigrate.Spec.Affinity != nil { dataMigrateInfo.Affinity = dataMigrate.Spec.Affinity } - // generate ssh config for parallel tasks when using parallel tasks if dataMigrateInfo.Parallelism > 1 { err = j.setParallelMigrateOptions(&dataMigrateInfo, dataMigrate) @@ -151,7 +151,13 @@ func (j *JuiceFSEngine) generateDataMigrateValueFile(r cruntime.ReconcileRequest return "", err } // the launcher prefers to run on different host with the workers - addWorkerPodAntiAffinity(&dataMigrateInfo, dataMigrate) + addWorkerPodPreferredAntiAffinity(&dataMigrateInfo, dataMigrate) + } + + // inject the node affinity by previous operation pod. + dataMigrateInfo.Affinity, err = dataflow.InjectAffinityByRunAfterOp(j.Client, dataMigrate.Spec.RunAfter, dataMigrate.Namespace, dataMigrateInfo.Affinity) + if err != nil { + return "", err } if dataMigrate.Spec.NodeSelector != nil { @@ -220,7 +226,7 @@ func (j *JuiceFSEngine) generateDataMigrateValueFile(r cruntime.ReconcileRequest return valueFile.Name(), nil } -func addWorkerPodAntiAffinity(dataMigrateInfo *cdatamigrate.DataMigrateInfo, dataMigrate *datav1alpha1.DataMigrate) { +func addWorkerPodPreferredAntiAffinity(dataMigrateInfo *cdatamigrate.DataMigrateInfo, dataMigrate *datav1alpha1.DataMigrate) { releaseName := utils.GetDataMigrateReleaseName(dataMigrate.Name) podAffinityTerm := corev1.WeightedPodAffinityTerm{ @@ -231,7 +237,7 @@ func addWorkerPodAntiAffinity(dataMigrateInfo *cdatamigrate.DataMigrateInfo, dat dataoperation.OperationLabel: fmt.Sprintf("migrate-%s-%s", dataMigrate.Namespace, releaseName), }, }, - TopologyKey: "kubernetes.io/hostname", + TopologyKey: common.K8sNodeNameLabelKey, }, } diff --git a/pkg/ddc/juicefs/data_migrate_test.go b/pkg/ddc/juicefs/data_migrate_test.go index eaaf062e86e..c90fa18ebc9 100644 --- a/pkg/ddc/juicefs/data_migrate_test.go +++ b/pkg/ddc/juicefs/data_migrate_test.go @@ -996,9 +996,9 @@ func Test_addWorkerPodAntiAffinity(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - addWorkerPodAntiAffinity(tt.args.dataMigrateInfo, tt.args.dataMigrate) + addWorkerPodPreferredAntiAffinity(tt.args.dataMigrateInfo, tt.args.dataMigrate) if !reflect.DeepEqual(tt.args.dataMigrateInfo, tt.want) { - t.Errorf("addWorkerPodAntiAffinity() got = %v, want %v", tt.args.dataMigrateInfo, tt.want) + t.Errorf("addWorkerPodPreferredAntiAffinity() got = %v, want %v", tt.args.dataMigrateInfo, tt.want) } }) } diff --git a/pkg/ddc/juicefs/data_process.go b/pkg/ddc/juicefs/data_process.go index 825e3e1e62b..dc0b484f834 100644 --- a/pkg/ddc/juicefs/data_process.go +++ b/pkg/ddc/juicefs/data_process.go @@ -18,9 +18,9 @@ package juicefs import ( "fmt" + "github.com/fluid-cloudnative/fluid/pkg/dataprocess" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - cdataprocess "github.com/fluid-cloudnative/fluid/pkg/dataprocess" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/pkg/errors" @@ -39,5 +39,5 @@ func (j *JuiceFSEngine) generateDataProcessValueFile(ctx cruntime.ReconcileReque return "", errors.Wrap(err, "failed to get dataset") } - return cdataprocess.GenDataProcessValueFile(targetDataset, dataProcess) + return dataprocess.GenDataProcessValueFile(j.Client, targetDataset, dataProcess) } diff --git a/pkg/ddc/thin/data_process.go b/pkg/ddc/thin/data_process.go index bfb4ff52440..f4598c80ccb 100644 --- a/pkg/ddc/thin/data_process.go +++ b/pkg/ddc/thin/data_process.go @@ -39,5 +39,5 @@ func (t *ThinEngine) generateDataProcessValueFile(ctx cruntime.ReconcileRequestC return "", errors.Wrap(err, "failed to get dataset") } - return dataprocess.GenDataProcessValueFile(targetDataset, dataProcess) + return dataprocess.GenDataProcessValueFile(t.Client, targetDataset, dataProcess) } diff --git a/pkg/ddc/vineyard/process_data.go b/pkg/ddc/vineyard/process_data.go index 1820279adeb..250f4cd6db5 100644 --- a/pkg/ddc/vineyard/process_data.go +++ b/pkg/ddc/vineyard/process_data.go @@ -36,5 +36,5 @@ func (e *VineyardEngine) generateDataProcessValueFile(ctx cruntime.ReconcileRequ return "", errors.Wrap(err, "failed to get dataset") } - return dataprocess.GenDataProcessValueFile(targetDataset, dataProcess) + return dataprocess.GenDataProcessValueFile(e.Client, targetDataset, dataProcess) } diff --git a/pkg/utils/affinity.go b/pkg/utils/affinity.go new file mode 100644 index 00000000000..8890b013ac8 --- /dev/null +++ b/pkg/utils/affinity.go @@ -0,0 +1,65 @@ +/* +Copyright 2024 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import v1 "k8s.io/api/core/v1" + +// InjectNodeSelectorRequirements injects(not append) a node selector term to affinity‘s nodeAffinity. +func InjectNodeSelectorRequirements(matchExpressions []v1.NodeSelectorRequirement, affinity *v1.Affinity) *v1.Affinity { + result := affinity + if affinity == nil { + result = &v1.Affinity{} + } + + if result.NodeAffinity == nil { + result.NodeAffinity = &v1.NodeAffinity{} + } + if result.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + result.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &v1.NodeSelector{} + } + // no element + if result.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms == nil { + result.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = []v1.NodeSelectorTerm{ + { + MatchExpressions: matchExpressions, + }, + } + return result + } + // has element, inject term's match expressions to each element + for _, nodeSelectorTerm := range result.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms { + nodeSelectorTerm.MatchExpressions = append(nodeSelectorTerm.MatchExpressions, matchExpressions...) + } + + return result +} + +func InjectPreferredSchedulingTermsToAffinity(terms []v1.PreferredSchedulingTerm, affinity *v1.Affinity) *v1.Affinity { + result := affinity + if affinity == nil { + result = &v1.Affinity{} + } + + if result.NodeAffinity == nil { + result.NodeAffinity = &v1.NodeAffinity{} + } + + result.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = + append(result.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution, terms...) + + return result +} diff --git a/pkg/utils/affinity_test.go b/pkg/utils/affinity_test.go new file mode 100644 index 00000000000..b6b120cdfed --- /dev/null +++ b/pkg/utils/affinity_test.go @@ -0,0 +1,73 @@ +/* +Copyright 2024 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + v1 "k8s.io/api/core/v1" + "reflect" + "testing" +) + +func TestInjectNodeSelectorTermsToAffinity(t *testing.T) { + type args struct { + expressions []v1.NodeSelectorRequirement + affinity *v1.Affinity + } + tests := []struct { + name string + args args + want *v1.Affinity + }{ + { + name: "test1", + args: args{ + expressions: []v1.NodeSelectorRequirement{ + { + Key: "test", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test"}, + }, + }, + affinity: &v1.Affinity{}, + }, + want: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "test", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test"}, + }, + }, + }, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := InjectNodeSelectorRequirements(tt.args.expressions, tt.args.affinity); !reflect.DeepEqual(got, tt.want) { + t.Errorf("InjectNodeSelectorRequirements() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/utils/fluid.go b/pkg/utils/fluid.go index 4bd11b536f1..7c76d876d5d 100644 --- a/pkg/utils/fluid.go +++ b/pkg/utils/fluid.go @@ -24,14 +24,16 @@ import ( // IsPodManagedByFluid checks if the given Pod is managed by Fluid. func IsPodManagedByFluid(pod *corev1.Pod) bool { fluidPodLabels := []string{common.AlluxioRuntime, + // For jindo, Data Operation / Runtime pods use jindofs, jindofsx, jindocache as the app label value. common.JindoChartName, + common.JindoFSxEngineImpl, + common.JindoCacheEngineImpl, common.GooseFSRuntime, common.JuiceFSRuntime, common.ThinRuntime, common.EFCRuntime} - if _, ok := pod.Labels[common.PodRoleType]; ok && pod.Labels[common.PodRoleType] == common.DataloadPod { - return true - } + + // Runtime Pod and DataOperation Pod both have the App label. for _, label := range fluidPodLabels { if pod.Labels[common.App] == label { return true diff --git a/pkg/utils/kubeclient/job.go b/pkg/utils/kubeclient/job.go index 96657c0ced5..50bb67a26a0 100644 --- a/pkg/utils/kubeclient/job.go +++ b/pkg/utils/kubeclient/job.go @@ -18,7 +18,10 @@ package kubeclient import ( "context" + "fmt" "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -39,3 +42,27 @@ func GetJob(client client.Client, name, namespace string) (*v1.Job, error) { func UpdateJob(client client.Client, job *v1.Job) error { return client.Update(context.TODO(), job) } + +// GetSucceedPodForJob get the first finished pod for the job, if no succeed pod, return nil with no error. +func GetSucceedPodForJob(c client.Client, job *v1.Job) (*corev1.Pod, error) { + var podList corev1.PodList + selector, err := metav1.LabelSelectorAsSelector(job.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("error converting Job %s in namespace %s selector: %v", job.Name, job.Namespace, err) + } + err = c.List(context.TODO(), &podList, &client.ListOptions{ + Namespace: job.Namespace, + LabelSelector: selector, + }) + if err != nil { + return nil, fmt.Errorf("error listing pods for Job %s in namespace %s: %v", job.Name, job.Namespace, err) + } + + for _, pod := range podList.Items { + if IsSucceededPod(&pod) { + return &pod, nil + } + } + // no succeed job, return nil with no error. + return nil, nil +}