Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add immediate option to ReplicationSource #812

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/v1alpha1/replicationsource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ type ReplicationSourceSpec struct {
// paused can be used to temporarily stop replication. Defaults to "false".
//+optional
Paused bool `json:"paused,omitempty"`
// immediate can be used to stop replication when a new ReplicationSource is created. Defaults to "true".
//+optional
Immediate *bool `json:"immediate,omitempty"`
}

type ReplicationSourceRsyncStatus struct {
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions bundle/manifests/volsync.backube_replicationsources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ spec:
provider. The name should be of the form: domain.com/provider.'
type: string
type: object
immediate:
description: immediate can be used to stop replication when a new
ReplicationSource is created. Defaults to "true".
type: boolean
paused:
description: paused can be used to temporarily stop replication. Defaults
to "false".
Expand Down
2 changes: 1 addition & 1 deletion bundle/manifests/volsync.clusterserviceversion.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ metadata:
}
]
capabilities: Basic Install
createdAt: "2023-07-13T13:28:33Z"
createdAt: "2023-07-27T21:09:50Z"
olm.skipRange: '>=0.4.0 <0.8.0'
operators.operatorframework.io/builder: operator-sdk-v1.26.0
operators.operatorframework.io/project_layout: go.kubebuilder.io/v3
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/volsync.backube_replicationsources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ spec:
provider. The name should be of the form: domain.com/provider.'
type: string
type: object
immediate:
description: immediate can be used to stop replication when a new
ReplicationSource is created. Defaults to "true".
type: boolean
paused:
description: paused can be used to temporarily stop replication. Defaults
to "false".
Expand Down
7 changes: 7 additions & 0 deletions controllers/mover/rclone/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
return rb.viper.GetString(rcloneContainerImageFlag)
}

// nolint: funlen
func (rb *Builder) FromSource(client client.Client, logger logr.Logger,
eventRecorder events.EventRecorder,
source *volsyncv1alpha1.ReplicationSource, privileged bool) (mover.Mover, error) {
Expand Down Expand Up @@ -114,6 +115,11 @@
saHandler := utils.NewSAHandler(client, source, isSource, privileged,
source.Spec.Rclone.MoverServiceAccount)

isImmediate := true
if source.Spec.Immediate != nil {
isImmediate = *source.Spec.Immediate
}

Check warning on line 121 in controllers/mover/rclone/builder.go

View check run for this annotation

Codecov / codecov/patch

controllers/mover/rclone/builder.go#L120-L121

Added lines #L120 - L121 were not covered by tests

return &Mover{
client: client,
logger: logger.WithValues("method", "Rclone"),
Expand All @@ -132,6 +138,7 @@
privileged: privileged,
moverSecurityContext: source.Spec.Rclone.MoverSecurityContext,
latestMoverStatus: source.Status.LatestMoverStatus,
isImmediate: isImmediate,
}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions controllers/mover/rclone/mover.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Mover struct {
privileged bool // true if the mover should have elevated privileges
moverSecurityContext *corev1.PodSecurityContext
latestMoverStatus *volsyncv1alpha1.MoverStatus
isImmediate bool
}

var _ mover.Mover = &Mover{}
Expand Down Expand Up @@ -231,6 +232,9 @@ func (m *Mover) ensureJob(ctx context.Context, dataPVC *corev1.PersistentVolumeC
if m.paused {
parallelism = int32(0)
}
if !m.isImmediate && m.latestMoverStatus.Result == "" {
parallelism = int32(0)
}
job.Spec.Parallelism = &parallelism

envVars := []corev1.EnvVar{
Expand Down
7 changes: 7 additions & 0 deletions controllers/mover/restic/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
return rb.viper.GetString(resticContainerImageFlag)
}

// nolint: funlen
func (rb *Builder) FromSource(client client.Client, logger logr.Logger,
eventRecorder events.EventRecorder,
source *volsyncv1alpha1.ReplicationSource, privileged bool) (mover.Mover, error) {
Expand Down Expand Up @@ -119,6 +120,11 @@
saHandler := utils.NewSAHandler(client, source, isSource, privileged,
source.Spec.Restic.MoverServiceAccount)

isImmediate := true
if source.Spec.Immediate != nil {
isImmediate = *source.Spec.Immediate
}

Check warning on line 126 in controllers/mover/restic/builder.go

View check run for this annotation

Codecov / codecov/patch

controllers/mover/restic/builder.go#L125-L126

Added lines #L125 - L126 were not covered by tests

return &Mover{
client: client,
logger: logger.WithValues("method", "Restic"),
Expand All @@ -142,6 +148,7 @@
unlock: source.Spec.Restic.Unlock,
sourceStatus: source.Status.Restic,
latestMoverStatus: source.Status.LatestMoverStatus,
isImmediate: isImmediate,
}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions controllers/mover/restic/mover.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type Mover struct {
unlock string
retainPolicy *volsyncv1alpha1.ResticRetainPolicy
sourceStatus *volsyncv1alpha1.ReplicationSourceResticStatus
isImmediate bool
// Destination-only fields
previous *int32
restoreAsOf *string
Expand Down Expand Up @@ -293,6 +294,9 @@ func (m *Mover) ensureJob(ctx context.Context, cachePVC *corev1.PersistentVolume
if m.paused {
parallelism = int32(0)
}
if !m.isImmediate && m.latestMoverStatus.Result == "" {
parallelism = int32(0)
}
job.Spec.Parallelism = &parallelism
forgetOptions := generateForgetOptions(m.retainPolicy)
// set default values
Expand Down
7 changes: 7 additions & 0 deletions controllers/mover/rsync/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
return rb.viper.GetString(rsyncContainerImageFlag)
}

// nolint: funlen
func (rb *Builder) FromSource(client client.Client, logger logr.Logger,
eventRecorder events.EventRecorder,
source *volsyncv1alpha1.ReplicationSource, privileged bool) (mover.Mover, error) {
Expand Down Expand Up @@ -119,6 +120,11 @@
saHandler := utils.NewSAHandler(client, source, isSource, true, /*Rsync runs privileged only*/
source.Spec.Rsync.MoverServiceAccount)

isImmediate := true
if source.Spec.Immediate != nil {
isImmediate = *source.Spec.Immediate
}

Check warning on line 126 in controllers/mover/rsync/builder.go

View check run for this annotation

Codecov / codecov/patch

controllers/mover/rsync/builder.go#L125-L126

Added lines #L125 - L126 were not covered by tests

return &Mover{
client: client,
logger: logger.WithValues("method", "Rsync"),
Expand All @@ -137,6 +143,7 @@
mainPVCName: &source.Spec.SourcePVC,
sourceStatus: source.Status.Rsync,
latestMoverStatus: source.Status.LatestMoverStatus,
isImmediate: isImmediate,
}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions controllers/mover/rsync/mover.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Mover struct {
sourceStatus *volsyncv1alpha1.ReplicationSourceRsyncStatus
destStatus *volsyncv1alpha1.ReplicationDestinationRsyncStatus
latestMoverStatus *volsyncv1alpha1.MoverStatus
isImmediate bool
}

var _ mover.Mover = &Mover{}
Expand Down Expand Up @@ -356,6 +357,9 @@ func (m *Mover) ensureJob(ctx context.Context, dataPVC *corev1.PersistentVolumeC
if m.paused {
parallelism = int32(0)
}
if !m.isImmediate && m.latestMoverStatus.Result == "" {
parallelism = int32(0)
}
job.Spec.Parallelism = &parallelism

readOnlyVolume := false
Expand Down
7 changes: 7 additions & 0 deletions controllers/mover/rsynctls/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
return rb.viper.GetString(rsyncTLSContainerImageFlag)
}

// nolint: funlen
func (rb *Builder) FromSource(client client.Client, logger logr.Logger,
eventRecorder events.EventRecorder,
source *volsyncv1alpha1.ReplicationSource, privileged bool) (mover.Mover, error) {
Expand Down Expand Up @@ -119,6 +120,11 @@
saHandler := utils.NewSAHandler(client, source, isSource, privileged,
source.Spec.RsyncTLS.MoverServiceAccount)

isImmediate := true
if source.Spec.Immediate != nil {
isImmediate = *source.Spec.Immediate
}

Check warning on line 126 in controllers/mover/rsynctls/builder.go

View check run for this annotation

Codecov / codecov/patch

controllers/mover/rsynctls/builder.go#L125-L126

Added lines #L125 - L126 were not covered by tests

return &Mover{
client: client,
logger: logger.WithValues("method", "RsyncTLS"),
Expand All @@ -139,6 +145,7 @@
moverSecurityContext: source.Spec.RsyncTLS.MoverSecurityContext,
sourceStatus: source.Status.RsyncTLS,
latestMoverStatus: source.Status.LatestMoverStatus,
isImmediate: isImmediate,
}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions controllers/mover/rsynctls/mover.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Mover struct {
sourceStatus *volsyncv1alpha1.ReplicationSourceRsyncTLSStatus
destStatus *volsyncv1alpha1.ReplicationDestinationRsyncTLSStatus
latestMoverStatus *volsyncv1alpha1.MoverStatus
isImmediate bool
}

var _ mover.Mover = &Mover{}
Expand Down Expand Up @@ -363,6 +364,9 @@ func (m *Mover) ensureJob(ctx context.Context, dataPVC *corev1.PersistentVolumeC
if m.paused {
parallelism = int32(0)
}
if !m.isImmediate && m.latestMoverStatus.Result == "" {
parallelism = int32(0)
}
job.Spec.Parallelism = &parallelism

readOnlyVolume := false
Expand Down
1 change: 1 addition & 0 deletions controllers/mover/syncthing/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (rb *Builder) FromSource(client client.Client, logger logr.Logger,
apiConfig: api.APIConfig{},
privileged: privileged,
moverSecurityContext: source.Spec.Syncthing.MoverSecurityContext,
isImmediate: true,
// defer setting the VolumeHandler
}, nil
}
Expand Down
1 change: 1 addition & 0 deletions controllers/mover/syncthing/mover.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type Mover struct {
apiConfig api.APIConfig
privileged bool
moverSecurityContext *corev1.PodSecurityContext
isImmediate bool
}

var _ mover.Mover = &Mover{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ spec:
description: 'provider is the name of the external replication provider. The name should be of the form: domain.com/provider.'
type: string
type: object
immediate:
description: immediate can be used to stop replication when a new ReplicationSource is created. Defaults to "true".
type: boolean
paused:
description: paused can be used to temporarily stop replication. Defaults to "false".
type: boolean
Expand Down
Loading