diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java index dba540c9f7075..0a986c4768aea 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java @@ -21,7 +21,7 @@ import lombok.RequiredArgsConstructor; import lombok.Setter; import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener; -import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter; +import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress; import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService; import java.util.Collection; @@ -61,8 +61,8 @@ public final class ConsistencyCheckJobItemProgressContext implements PipelineJob private final String sourceDatabaseType; @Override - public void onProgressUpdated(final PipelineJobProgressUpdatedParameter param) { - checkedRecordsCount.addAndGet(param.getProcessedRecordsCount()); + public void onProgressUpdated(final PipelineJobUpdateProgress updateProgress) { + checkedRecordsCount.addAndGet(updateProgress.getProcessedRecordsCount()); PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java index ddf99a8eea1ba..9a98e67a92f20 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java @@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator; import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType; -import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter; +import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress; import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException; import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException; import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder; @@ -113,7 +113,7 @@ private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iter if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) { param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().toString(), targetCalculatedResult.getMaxUniqueKeyValue().get()); } - param.getProgressContext().onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount())); + param.getProgressContext().onProgressUpdated(new PipelineJobUpdateProgress(sourceCalculatedResult.getRecordsCount())); } if (sourceCalculatedResults.hasNext()) { checkResult.setMatched(false); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java index 09e74e55d80f8..b2249bc377f81 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java @@ -24,7 +24,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord; import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener; -import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter; +import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress; import org.apache.shardingsphere.infra.util.close.QuietlyCloser; import java.util.List; @@ -52,9 +52,9 @@ protected void runBlocking() { if (records.isEmpty()) { continue; } - PipelineJobProgressUpdatedParameter updatedParam = sink.write("", records); + PipelineJobUpdateProgress updateProgress = sink.write("", records); channel.ack(records); - jobProgressListener.onProgressUpdated(updatedParam); + jobProgressListener.onProgressUpdated(updateProgress); if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) { break; } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java index 9f356cf35b5e2..9704abffe9b3c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.core.importer.sink; import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter; +import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress; import java.io.Closeable; import java.util.Collection; @@ -35,5 +35,5 @@ public interface PipelineSink extends Closeable { * @param records records * @return job progress updated parameter */ - PipelineJobProgressUpdatedParameter write(String ackId, Collection records); + PipelineJobUpdateProgress write(String ackId, Collection records); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java index d6e180dfb0626..d9f1d81f0ab6b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java @@ -30,7 +30,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils; import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.DataRecordGroupEngine; import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.GroupedDataRecord; -import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter; +import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder; import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils; import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation; @@ -73,17 +73,17 @@ public PipelineDataSourceSink(final ImporterConfiguration importerConfig, final } @Override - public PipelineJobProgressUpdatedParameter write(final String ackId, final Collection records) { + public PipelineJobUpdateProgress write(final String ackId, final Collection records) { List dataRecords = records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList()); if (dataRecords.isEmpty()) { - return new PipelineJobProgressUpdatedParameter(0); + return new PipelineJobUpdateProgress(0); } for (GroupedDataRecord each : groupEngine.group(dataRecords)) { batchWrite(each.getDeleteDataRecords()); batchWrite(each.getInsertDataRecords()); batchWrite(each.getUpdateDataRecords()); } - return new PipelineJobProgressUpdatedParameter((int) dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == each.getType()).count()); + return new PipelineJobUpdateProgress((int) dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == each.getType()).count()); } @SuppressWarnings("BusyWait") diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressListener.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressListener.java index cc41810ab2ae2..0d4e434869651 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressListener.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressListener.java @@ -23,9 +23,9 @@ public interface PipelineJobProgressListener { /** - * Emit on progress updated. + * Emit on pipeline job progress updated. * - * @param param process update parameter + * @param updateProgress pipeline job update process */ - void onProgressUpdated(PipelineJobProgressUpdatedParameter param); + void onProgressUpdated(PipelineJobUpdateProgress updateProgress); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressUpdatedParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobUpdateProgress.java similarity index 91% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressUpdatedParameter.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobUpdateProgress.java index 79d36967e7450..f1a802a19cd9e 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressUpdatedParameter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobUpdateProgress.java @@ -21,11 +21,11 @@ import lombok.RequiredArgsConstructor; /** - * Pipeline job process update parameter. + * Pipeline job update progress. */ @RequiredArgsConstructor @Getter -public final class PipelineJobProgressUpdatedParameter { +public final class PipelineJobUpdateProgress { private final int processedRecordsCount; } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java index f1d189aa5afe2..ecca5660a2e27 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java @@ -30,7 +30,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource; import org.apache.shardingsphere.data.pipeline.core.job.JobStatus; import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress; -import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter; +import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; @@ -117,8 +117,8 @@ public String getDataSourceName() { } @Override - public void onProgressUpdated(final PipelineJobProgressUpdatedParameter param) { - processedRecordsCount.addAndGet(param.getProcessedRecordsCount()); + public void onProgressUpdated(final PipelineJobUpdateProgress updateProgress) { + processedRecordsCount.addAndGet(updateProgress.getProcessedRecordsCount()); PipelineJobProgressPersistService.notifyPersist(jobConfig.getJobId(), shardingItem); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java index 2357fb41384e2..c8640dd71abcf 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java @@ -35,7 +35,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord; import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter; +import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress; import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm; import java.util.ArrayList; @@ -77,7 +77,7 @@ public final class CDCImporter extends AbstractPipelineLifecycleRunnable impleme protected void runBlocking() { CDCImporterManager.putImporter(this); for (CDCChannelProgressPair each : channelProgressPairs) { - each.getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(0)); + each.getJobProgressListener().onProgressUpdated(new PipelineJobUpdateProgress(0)); } while (isRunning()) { if (needSorting) { @@ -223,7 +223,7 @@ private void doWithoutSorting(final CDCChannelProgressPair channelProgressPair) Record lastRecord = records.get(records.size() - 1); if (records.stream().noneMatch(DataRecord.class::isInstance)) { channel.ack(records); - channelProgressPair.getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(0)); + channelProgressPair.getJobProgressListener().onProgressUpdated(new PipelineJobUpdateProgress(0)); if (lastRecord instanceof FinishedRecord) { channelProgressPairs.remove(channelProgressPair); } @@ -255,7 +255,7 @@ public void ack(final String ackId) { if (lastRecord instanceof FinishedRecord) { channelProgressPairs.remove(each.getKey()); } - each.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount())); + each.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobUpdateProgress(ackPosition.getDataRecordCount())); } ackCache.invalidate(ackId); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java index c175e38812a4b..700a23fed4b9e 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java @@ -27,7 +27,7 @@ import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter; +import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress; import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; @@ -70,20 +70,20 @@ public PipelineCDCSocketSink(final Channel channel, final ShardingSphereDatabase } @Override - public PipelineJobProgressUpdatedParameter write(final String ackId, final Collection records) { + public PipelineJobUpdateProgress write(final String ackId, final Collection records) { if (records.isEmpty()) { - return new PipelineJobProgressUpdatedParameter(0); + return new PipelineJobUpdateProgress(0); } while (!channel.isWritable() && channel.isActive()) { doAwait(); } if (!channel.isActive()) { - return new PipelineJobProgressUpdatedParameter(0); + return new PipelineJobUpdateProgress(0); } Collection resultRecords = getResultRecords(records); DataRecordResult dataRecordResult = DataRecordResult.newBuilder().addAllRecord(resultRecords).setAckId(ackId).build(); channel.writeAndFlush(CDCResponseUtils.succeed("", ResponseCase.DATA_RECORD_RESULT, dataRecordResult)); - return new PipelineJobProgressUpdatedParameter(resultRecords.size()); + return new PipelineJobUpdateProgress(resultRecords.size()); } @SneakyThrows(InterruptedException.class) diff --git a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java index b52b3494ea2fc..414c2adfa8e63 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java @@ -22,7 +22,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition; import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord; -import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter; +import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; import org.junit.jupiter.api.Test; @@ -44,7 +44,7 @@ void assertWrite() throws IOException { ShardingSphereDatabase mockDatabase = mock(ShardingSphereDatabase.class); when(mockDatabase.getName()).thenReturn("test"); try (PipelineCDCSocketSink sink = new PipelineCDCSocketSink(mockChannel, mockDatabase, Collections.singletonList("test.t_order"))) { - PipelineJobProgressUpdatedParameter actual = sink.write("ack", Collections.singletonList(new FinishedRecord(new IngestPlaceholderPosition()))); + PipelineJobUpdateProgress actual = sink.write("ack", Collections.singletonList(new FinishedRecord(new IngestPlaceholderPosition()))); assertThat(actual.getProcessedRecordsCount(), is(0)); actual = sink.write("ack", Collections.singletonList(new DataRecord(PipelineSQLOperationType.DELETE, "t_order", new IngestPlaceholderPosition(), 1))); assertThat(actual.getProcessedRecordsCount(), is(1)); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java index 1f5d81375726b..0b3d3aa86a3bb 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java @@ -33,7 +33,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource; import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException; import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress; -import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter; +import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils; @@ -89,7 +89,7 @@ public Map check(final String algorithm .forEach(dataNode -> sourceTableNames.add(DataNodeUtils.formatWithSchema(dataNode))))); progressContext.setRecordsCount(getRecordsCount()); progressContext.getTableNames().addAll(sourceTableNames); - progressContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(0)); + progressContext.onProgressUpdated(new PipelineJobUpdateProgress(0)); Map result = new LinkedHashMap<>(); try ( PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager(); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java index f2d5c695e72ca..5cfbc0193eb7c 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java @@ -28,7 +28,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource; import org.apache.shardingsphere.data.pipeline.core.job.JobStatus; import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress; -import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter; +import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink; @@ -140,8 +140,8 @@ public boolean isSourceTargetDatabaseTheSame() { } @Override - public void onProgressUpdated(final PipelineJobProgressUpdatedParameter param) { - processedRecordsCount.addAndGet(param.getProcessedRecordsCount()); + public void onProgressUpdated(final PipelineJobUpdateProgress updateProgress) { + processedRecordsCount.addAndGet(updateProgress.getProcessedRecordsCount()); PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/algorithm/FixtureTransmissionJobItemContext.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/algorithm/FixtureTransmissionJobItemContext.java index bea381310b39c..654f45291d9d9 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/algorithm/FixtureTransmissionJobItemContext.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/algorithm/FixtureTransmissionJobItemContext.java @@ -23,7 +23,7 @@ import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.core.job.JobStatus; import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress; -import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter; +import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; @@ -32,7 +32,7 @@ public final class FixtureTransmissionJobItemContext implements TransmissionJobItemContext { @Override - public void onProgressUpdated(final PipelineJobProgressUpdatedParameter param) { + public void onProgressUpdated(final PipelineJobUpdateProgress updateProgress) { } @Override