Skip to content

Commit c2025c7

Browse files
authored
Add CreateIncrementalDumperParameter (#32509)
1 parent 4cf4ed2 commit c2025c7

File tree

9 files changed

+72
-50
lines changed

9 files changed

+72
-50
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental;
19+
20+
import lombok.Getter;
21+
import lombok.RequiredArgsConstructor;
22+
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
23+
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
24+
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
25+
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
26+
27+
/**
28+
* Create incremental dumper parameter.
29+
*/
30+
@RequiredArgsConstructor
31+
@Getter
32+
public final class CreateIncrementalDumperParameter {
33+
34+
private final IncrementalDumperContext context;
35+
36+
private final IngestPosition position;
37+
38+
private final PipelineChannel channel;
39+
40+
private final PipelineTableMetaDataLoader metaDataLoader;
41+
42+
private final PipelineDataSourceManager dataSourceManager;
43+
}

kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/DialectIncrementalDumperCreator.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental;
1919

20-
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
21-
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
22-
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
2320
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
2421
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
2522

@@ -32,11 +29,8 @@ public interface DialectIncrementalDumperCreator extends DatabaseTypedSPI {
3229
/**
3330
* Create incremental dumper.
3431
*
35-
* @param context incremental dumper context
36-
* @param position position
37-
* @param channel channel
38-
* @param metaDataLoader meta data loader
32+
* @param param create incremental dumper parameter
3933
* @return incremental dumper
4034
*/
41-
IncrementalDumper createIncrementalDumper(IncrementalDumperContext context, IngestPosition position, PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader);
35+
IncrementalDumper createIncrementalDumper(CreateIncrementalDumperParameter param);
4236
}

kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/IncrementalDumperCreator.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import lombok.AccessLevel;
2121
import lombok.NoArgsConstructor;
2222
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
23-
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
24-
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
2523
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
2624
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
2725
import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
@@ -35,15 +33,12 @@ public final class IncrementalDumperCreator {
3533
/**
3634
* Create incremental dumper.
3735
*
38-
* @param dumperContext incremental dumper context
39-
* @param channel channel
40-
* @param metaDataLoader meta data loader
36+
* @param param create incremental dumper parameter
4137
* @return incremental dumper
4238
*/
43-
public static IncrementalDumper create(final IncrementalDumperContext dumperContext, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
44-
ShardingSpherePreconditions.checkState(dumperContext.getCommonContext().getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration,
39+
public static IncrementalDumper create(final CreateIncrementalDumperParameter param) {
40+
ShardingSpherePreconditions.checkState(param.getContext().getCommonContext().getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration,
4541
() -> new UnsupportedSQLOperationException("Incremental dumper only support StandardPipelineDataSourceConfiguration"));
46-
return DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
47-
.createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, metaDataLoader);
42+
return DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, param.getContext().getCommonContext().getDataSourceConfig().getDatabaseType()).createIncrementalDumper(param);
4843
}
4944
}

kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,19 @@
1717

1818
package org.apache.shardingsphere.data.pipeline.mysql.ingest.dumper;
1919

20-
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
21-
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
20+
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
21+
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
2222
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
23-
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
24-
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
2523
import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
26-
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
2724

2825
/**
2926
* MySQL incremental dumper creator.
3027
*/
3128
public final class MySQLIncrementalDumperCreator implements DialectIncrementalDumperCreator {
3229

3330
@Override
34-
public IncrementalDumper createIncrementalDumper(final IncrementalDumperContext context,
35-
final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
36-
return new MySQLIncrementalDumper(context, position, channel, metaDataLoader);
31+
public IncrementalDumper createIncrementalDumper(final CreateIncrementalDumperParameter param) {
32+
return new MySQLIncrementalDumper(param.getContext(), param.getPosition(), param.getChannel(), param.getMetaDataLoader());
3733
}
3834

3935
@Override

kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,19 @@
1717

1818
package org.apache.shardingsphere.data.pipeline.opengauss.ingest.dumper;
1919

20-
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
21-
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
20+
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
21+
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
2222
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
23-
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
24-
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
2523
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWALDumper;
26-
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
2724

2825
/**
2926
* OpenGauss incremental dumper creator.
3027
*/
3128
public final class OpenGaussIncrementalDumperCreator implements DialectIncrementalDumperCreator {
3229

3330
@Override
34-
public IncrementalDumper createIncrementalDumper(final IncrementalDumperContext context,
35-
final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
36-
return new OpenGaussWALDumper(context, position, channel, metaDataLoader);
31+
public IncrementalDumper createIncrementalDumper(final CreateIncrementalDumperParameter param) {
32+
return new OpenGaussWALDumper(param.getContext(), param.getPosition(), param.getChannel(), param.getMetaDataLoader());
3733
}
3834

3935
@Override

kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,19 @@
1717

1818
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.dumper;
1919

20-
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
21-
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
20+
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
21+
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
2222
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
23-
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
24-
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
2523
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWALDumper;
26-
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
2724

2825
/**
2926
* PostgreSQL incremental dumper creator.
3027
*/
3128
public final class PostgreSQLIncrementalDumperCreator implements DialectIncrementalDumperCreator {
3229

3330
@Override
34-
public IncrementalDumper createIncrementalDumper(final IncrementalDumperContext context,
35-
final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
36-
return new PostgreSQLWALDumper(context, position, channel, metaDataLoader);
31+
public IncrementalDumper createIncrementalDumper(final CreateIncrementalDumperParameter param) {
32+
return new PostgreSQLWALDumper(param.getContext(), param.getPosition(), param.getChannel(), param.getMetaDataLoader());
3733
}
3834

3935
@Override

kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
3232
import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
3333
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
34+
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
3435
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
3536
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
3637
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
@@ -139,7 +140,8 @@ private void initIncrementalTask(final CDCJobItemContext jobItemContext, final A
139140
IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress());
140141
PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(), taskProgress);
141142
channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext));
142-
Dumper dumper = IncrementalDumperCreator.create(dumperContext, channel, jobItemContext.getSourceMetaDataLoader());
143+
Dumper dumper = IncrementalDumperCreator.create(new CreateIncrementalDumperParameter(
144+
dumperContext, dumperContext.getCommonContext().getPosition(), channel, jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager()));
143145
boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
144146
Importer importer = importerUsed.get() ? null
145147
: new CDCImporter(channelProgressPairs, 1, 100L, jobItemContext.getSink(), needSorting, taskConfig.getImporterConfig().getRateLimitAlgorithm());

kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
3232
import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
3333
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
34+
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
3435
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
3536
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
3637
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
@@ -43,6 +44,7 @@
4344
import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
4445
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
4546
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
47+
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
4648
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
4749
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
4850
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
@@ -191,11 +193,13 @@ private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
191193

192194
private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) {
193195
MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
196+
PipelineTableMetaDataLoader sourceMetaDataLoader = jobItemContext.getSourceMetaDataLoader();
194197
IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
195198
ExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
196199
IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress());
197200
PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(), taskProgress);
198-
Dumper dumper = IncrementalDumperCreator.create(dumperContext, channel, jobItemContext.getSourceMetaDataLoader());
201+
Dumper dumper = IncrementalDumperCreator.create(
202+
new CreateIncrementalDumperParameter(dumperContext, dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader, jobItemContext.getDataSourceManager()));
199203
Collection<Importer> importers = Collections.singletonList(new SingleChannelConsumerImporter(channel, 1, 5L, jobItemContext.getSink(), jobItemContext));
200204
PipelineTask incrementalTask = new IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress);
201205
jobItemContext.getIncrementalTasks().add(incrementalTask);

test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,9 @@
1717

1818
package org.apache.shardingsphere.test.it.data.pipeline.core.fixture.h2.dumper;
1919

20-
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
21-
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
22-
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
23-
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
24-
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
20+
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
2521
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
22+
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
2623

2724
import static org.mockito.Mockito.mock;
2825

@@ -32,8 +29,7 @@
3229
public final class H2IncrementalDumperCreator implements DialectIncrementalDumperCreator {
3330

3431
@Override
35-
public IncrementalDumper createIncrementalDumper(final IncrementalDumperContext context,
36-
final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
32+
public IncrementalDumper createIncrementalDumper(final CreateIncrementalDumperParameter param) {
3733
return mock(IncrementalDumper.class);
3834
}
3935

0 commit comments

Comments
 (0)