Skip to content

Commit 4637375

Browse files
authored
Support primary key columns ordering for standard pipeline table metadata loader (#32647)
1 parent d30d07f commit 4637375

File tree

6 files changed

+142
-11
lines changed

6 files changed

+142
-11
lines changed

kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.sql.Array;
3131
import java.sql.SQLException;
3232
import java.sql.SQLXML;
33+
import java.util.Collection;
3334
import java.util.Iterator;
3435
import java.util.Map;
3536
import java.util.Map.Entry;
@@ -150,6 +151,33 @@ public static boolean isBigDecimalEquals(final BigDecimal one, final BigDecimal
150151
return 0 == decimalOne.compareTo(decimalTwo);
151152
}
152153

154+
/**
155+
* Compare lists.
156+
*
157+
* @param thisList this list
158+
* @param thatList that list
159+
* @return true if lists equals, otherwise false
160+
*/
161+
public static boolean compareLists(final @Nullable Collection<?> thisList, final @Nullable Collection<?> thatList) {
162+
if (null == thisList && null == thatList) {
163+
return true;
164+
}
165+
if (null == thisList || null == thatList) {
166+
return false;
167+
}
168+
if (thisList.size() != thatList.size()) {
169+
return false;
170+
}
171+
Iterator<?> thisIterator = thisList.iterator();
172+
Iterator<?> thatIterator = thatList.iterator();
173+
while (thisIterator.hasNext() && thatIterator.hasNext()) {
174+
if (!Objects.deepEquals(thisIterator.next(), thatIterator.next())) {
175+
return false;
176+
}
177+
}
178+
return true;
179+
}
180+
153181
/**
154182
* Get first unique key value.
155183
*

kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import lombok.RequiredArgsConstructor;
2121
import lombok.extern.slf4j.Slf4j;
22+
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCheckUtils;
2223
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
2324
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
2425
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
@@ -33,11 +34,9 @@
3334
import java.sql.SQLException;
3435
import java.util.Collection;
3536
import java.util.LinkedHashMap;
36-
import java.util.LinkedHashSet;
3737
import java.util.LinkedList;
3838
import java.util.Map;
3939
import java.util.Map.Entry;
40-
import java.util.Set;
4140
import java.util.SortedMap;
4241
import java.util.TreeMap;
4342
import java.util.concurrent.ConcurrentHashMap;
@@ -91,7 +90,7 @@ private Map<CaseInsensitiveIdentifier, PipelineTableMetaData> loadTableMetaData0
9190
}
9291
Map<CaseInsensitiveIdentifier, PipelineTableMetaData> result = new LinkedHashMap<>(tableNames.size(), 1F);
9392
for (String each : tableNames) {
94-
Set<CaseInsensitiveIdentifier> primaryKeys = loadPrimaryKeys(connection, schemaName, each);
93+
Collection<CaseInsensitiveIdentifier> primaryKeys = loadPrimaryKeys(connection, schemaName, each);
9594
Map<CaseInsensitiveIdentifier, Collection<CaseInsensitiveIdentifier>> uniqueKeys = loadUniqueIndexesOfTable(connection, schemaName, each);
9695
Map<CaseInsensitiveIdentifier, PipelineColumnMetaData> columnMetaDataMap = new LinkedHashMap<>();
9796
try (ResultSet resultSet = connection.getMetaData().getColumns(connection.getCatalog(), schemaName, each, "%")) {
@@ -111,7 +110,9 @@ private Map<CaseInsensitiveIdentifier, PipelineTableMetaData> loadTableMetaData0
111110
}
112111
}
113112
Collection<PipelineIndexMetaData> uniqueIndexMetaData = uniqueKeys.entrySet().stream()
114-
.map(entry -> new PipelineIndexMetaData(entry.getKey(), entry.getValue().stream().map(columnMetaDataMap::get).collect(Collectors.toList()))).collect(Collectors.toList());
113+
.map(entry -> new PipelineIndexMetaData(entry.getKey(), entry.getValue().stream().map(columnMetaDataMap::get).collect(Collectors.toList()),
114+
DataConsistencyCheckUtils.compareLists(primaryKeys, entry.getValue())))
115+
.collect(Collectors.toList());
115116
result.put(new CaseInsensitiveIdentifier(each), new PipelineTableMetaData(each, columnMetaDataMap, uniqueIndexMetaData));
116117
}
117118
return result;
@@ -138,14 +139,13 @@ private Map<CaseInsensitiveIdentifier, Collection<CaseInsensitiveIdentifier>> lo
138139
return result;
139140
}
140141

141-
private Set<CaseInsensitiveIdentifier> loadPrimaryKeys(final Connection connection, final String schemaName, final String tableName) throws SQLException {
142-
Set<CaseInsensitiveIdentifier> result = new LinkedHashSet<>();
143-
// TODO order primary keys
142+
private Collection<CaseInsensitiveIdentifier> loadPrimaryKeys(final Connection connection, final String schemaName, final String tableName) throws SQLException {
143+
SortedMap<Short, CaseInsensitiveIdentifier> result = new TreeMap<>();
144144
try (ResultSet resultSet = connection.getMetaData().getPrimaryKeys(connection.getCatalog(), schemaName, tableName)) {
145145
while (resultSet.next()) {
146-
result.add(new CaseInsensitiveIdentifier(resultSet.getString("COLUMN_NAME")));
146+
result.put(resultSet.getShort("KEY_SEQ"), new CaseInsensitiveIdentifier(resultSet.getString("COLUMN_NAME")));
147147
}
148148
}
149-
return result;
149+
return result.values();
150150
}
151151
}

kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,6 @@ public final class PipelineIndexMetaData {
3535
private final CaseInsensitiveIdentifier name;
3636

3737
private final List<PipelineColumnMetaData> columns;
38+
39+
private final boolean primaryKey;
3840
}

kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Collections;
3232
import java.util.List;
3333
import java.util.Map;
34+
import java.util.Optional;
3435
import java.util.stream.Collectors;
3536

3637
/**
@@ -62,8 +63,10 @@ public PipelineTableMetaData(final String name, final Map<CaseInsensitiveIdentif
6263
List<PipelineColumnMetaData> columnMetaDataList = new ArrayList<>(columnMetaDataMap.values());
6364
Collections.sort(columnMetaDataList);
6465
columnNames = Collections.unmodifiableList(columnMetaDataList.stream().map(PipelineColumnMetaData::getName).collect(Collectors.toList()));
65-
primaryKeyColumns = Collections.unmodifiableList(columnMetaDataList.stream().filter(PipelineColumnMetaData::isPrimaryKey)
66-
.map(PipelineColumnMetaData::getName).collect(Collectors.toList()));
66+
Optional<PipelineIndexMetaData> primaryKeyMetaData = uniqueIndexes.stream().filter(PipelineIndexMetaData::isPrimaryKey).findFirst();
67+
primaryKeyColumns = primaryKeyMetaData.map(each -> each.getColumns().stream().map(PipelineColumnMetaData::getName).collect(Collectors.toList()))
68+
.orElseGet(() -> Collections.unmodifiableList(columnMetaDataList.stream().filter(PipelineColumnMetaData::isPrimaryKey)
69+
.map(PipelineColumnMetaData::getName).collect(Collectors.toList())));
6770
this.uniqueIndexes = Collections.unmodifiableCollection(uniqueIndexes);
6871
}
6972

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shardingsphere.data.pipeline.core.util;
19+
20+
import lombok.AccessLevel;
21+
import lombok.NoArgsConstructor;
22+
23+
import java.util.Collection;
24+
import java.util.Iterator;
25+
26+
/**
27+
* Pipeline string utils.
28+
*/
29+
@NoArgsConstructor(access = AccessLevel.NONE)
30+
public final class PipelineStringUtils {
31+
32+
/**
33+
* Equals ignore case.
34+
*
35+
* @param one one
36+
* @param another another
37+
* @return is equals ignore case
38+
*/
39+
public static boolean equalsIgnoreCase(final Collection<String> one, final Collection<String> another) {
40+
if (null == one && null == another) {
41+
return true;
42+
}
43+
if (null == one || null == another) {
44+
return false;
45+
}
46+
if (one.size() != another.size()) {
47+
return false;
48+
}
49+
Iterator<String> oneIterator = one.iterator();
50+
Iterator<String> anotherIterator = another.iterator();
51+
while (oneIterator.hasNext() && anotherIterator.hasNext()) {
52+
if (!oneIterator.next().equalsIgnoreCase(anotherIterator.next())) {
53+
return false;
54+
}
55+
}
56+
return true;
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shardingsphere.data.pipeline.core.util;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import java.util.Collections;
23+
24+
import static org.junit.jupiter.api.Assertions.assertFalse;
25+
import static org.junit.jupiter.api.Assertions.assertTrue;
26+
27+
class PipelineStringUtilsTest {
28+
29+
@Test
30+
void assertEqualsIgnoreCase() {
31+
assertTrue(PipelineStringUtils.equalsIgnoreCase(null, null));
32+
assertFalse(PipelineStringUtils.equalsIgnoreCase(null, Collections.emptyList()));
33+
assertFalse(PipelineStringUtils.equalsIgnoreCase(Collections.emptyList(), null));
34+
assertFalse(PipelineStringUtils.equalsIgnoreCase(Collections.singletonList("test"), Collections.emptyList()));
35+
assertFalse(PipelineStringUtils.equalsIgnoreCase(Collections.emptyList(), Collections.singletonList("test")));
36+
assertTrue(PipelineStringUtils.equalsIgnoreCase(Collections.singletonList("test"), Collections.singletonList("test")));
37+
assertTrue(PipelineStringUtils.equalsIgnoreCase(Collections.singletonList("TEST"), Collections.singletonList("test")));
38+
assertTrue(PipelineStringUtils.equalsIgnoreCase(Collections.singletonList("test"), Collections.singletonList("TEST")));
39+
}
40+
}

0 commit comments

Comments
 (0)