diff --git a/docs/docs/en/guide/upgrade/incompatible.md b/docs/docs/en/guide/upgrade/incompatible.md index 0f60fcb9986a..afeca6ce6c47 100644 --- a/docs/docs/en/guide/upgrade/incompatible.md +++ b/docs/docs/en/guide/upgrade/incompatible.md @@ -43,4 +43,5 @@ This document records the incompatible updates between each version. You need to ## 3.4.1 * Remove import and export of workflow definition. ([#17940])(https://github.com/apache/dolphinscheduler/issues/17940) +* Persist `default` worker group at database. ([#17929])(https://github.com/apache/dolphinscheduler/issues/17929) diff --git a/docs/docs/zh/guide/upgrade/incompatible.md b/docs/docs/zh/guide/upgrade/incompatible.md index 2f0c4c044ff8..3c9242f80ac7 100644 --- a/docs/docs/zh/guide/upgrade/incompatible.md +++ b/docs/docs/zh/guide/upgrade/incompatible.md @@ -47,4 +47,5 @@ ## 3.4.1 * 移除导入导出工作流([#17940])(https://github.com/apache/dolphinscheduler/issues/17940) +* 在数据库中持久化 `default` 工作组 ([#17929])(https://github.com/apache/dolphinscheduler/issues/17929) diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index b86a1707e85e..6f0c2149d2eb 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -1051,6 +1051,8 @@ CREATE TABLE t_ds_worker_group -- Records of t_ds_worker_group -- ---------------------------- +INSERT INTO `t_ds_worker_group` (name, addr_list, create_time, update_time, description) VALUES ('default', NULL, current_timestamp, current_timestamp, 'default worker group'); + -- ---------------------------- -- Table structure for t_ds_relation_project_worker_group -- ---------------------------- diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index ea80068e49d4..b73517917364 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -1049,6 +1049,7 @@ CREATE TABLE `t_ds_worker_group` ( -- ---------------------------- -- Records of t_ds_worker_group -- ---------------------------- +INSERT INTO `t_ds_worker_group` (name, addr_list, create_time, update_time, description) VALUES ('default', NULL, current_timestamp, current_timestamp, 'default worker group'); -- ---------------------------- -- Table structure for t_ds_version diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index 36261d7a8c2d..11a73b5494ea 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -1098,6 +1098,9 @@ VALUES (NULL, 1, 'default admin warning group', 'default admin warning group', ' INSERT INTO t_ds_queue(queue_name, queue, create_time, update_time) VALUES ('default', 'default', '2018-11-29 10:22:33', '2018-11-29 10:22:33'); +-- Records of t_ds_worker_group, default worker name : default +INSERT INTO t_ds_worker_group (name, addr_list, create_time, update_time, description) VALUES ('default', NULL, now(), now(), 'default worker group'); + -- Records of t_ds_queue,default queue name : default INSERT INTO t_ds_version(version) VALUES ('3.3.0'); diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_dml.sql index 4a14f326b985..b0c3f5bd45ae 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_dml.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_dml.sql @@ -14,3 +14,5 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +INSERT IGNORE INTO t_ds_worker_group (name, addr_list, create_time, update_time, description) VALUES ('default', NULL, NOW(), NOW(), 'default worker group'); \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_dml.sql index 4a14f326b985..779d2e48d03d 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_dml.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_dml.sql @@ -14,3 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +-- Insert default worker group if not exists +INSERT INTO t_ds_worker_group (name, addr_list, create_time, update_time, description) +VALUES ('default', NULL, now(), now(), 'default worker group') +ON CONFLICT (name) DO NOTHING; \ No newline at end of file diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java index c94d8861defb..a720a15655f6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java @@ -35,6 +35,9 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class WorkerClusters extends AbstractClusterSubscribeListener implements IClusters, @@ -43,11 +46,8 @@ public class WorkerClusters extends AbstractClusterSubscribeListener worker private final Map workerMapping = new ConcurrentHashMap<>(); - // WorkerGroup from db -> WorkerIdentifier(workerAddress) - private final Map> dbWorkerGroupMapping = new ConcurrentHashMap<>(); - - // WorkerGroup from config -> WorkerIdentifier(workerAddress) - private final Map> configWorkerGroupMapping = new ConcurrentHashMap<>(); + // WorkerGroup -> WorkerIdentifier(workerAddress) + private final Map> workerGroupMapping = new ConcurrentHashMap<>(); private final List> workerClusterChangeListeners = new CopyOnWriteArrayList<>(); @@ -62,44 +62,23 @@ public Optional getServer(final String address) { return Optional.ofNullable(workerMapping.get(address)); } - public List getDbWorkerServerAddressByGroup(String workerGroup) { - if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) { - return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.keySet())); - } - return dbWorkerGroupMapping.getOrDefault(workerGroup, Collections.emptyList()); - } - - public List getConfigWorkerServerAddressByGroup(String workerGroup) { - if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) { - return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.keySet())); - } - return configWorkerGroupMapping.getOrDefault(workerGroup, Collections.emptyList()); + public List getWorkerServerAddressByGroup(String workerGroup) { + return workerGroupMapping.getOrDefault(workerGroup, Collections.emptyList()); } public List getNormalWorkerServerAddressByGroup(String workerGroup) { - List dbWorkerAddresses = getDbWorkerServerAddressByGroup(workerGroup) - .stream() - .map(workerMapping::get) - .filter(Objects::nonNull) - .filter(workerServer -> workerServer.getServerStatus() == ServerStatus.NORMAL) - .map(WorkerServerMetadata::getAddress) - .collect(Collectors.toList()); - List configWorkerAddresses = getConfigWorkerServerAddressByGroup(workerGroup) + List dbWorkerAddresses = getWorkerServerAddressByGroup(workerGroup) .stream() .map(workerMapping::get) .filter(Objects::nonNull) .filter(workerServer -> workerServer.getServerStatus() == ServerStatus.NORMAL) .map(WorkerServerMetadata::getAddress) .collect(Collectors.toList()); - dbWorkerAddresses.removeAll(configWorkerAddresses); - dbWorkerAddresses.addAll(configWorkerAddresses); return UnmodifiableList.unmodifiableList(dbWorkerAddresses); } public boolean containsWorkerGroup(String workerGroup) { - return WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup) - || dbWorkerGroupMapping.containsKey(workerGroup) - || configWorkerGroupMapping.containsKey(workerGroup); + return workerGroupMapping.containsKey(workerGroup); } @Override @@ -109,9 +88,9 @@ public void registerListener(IClustersChangeListener liste @Override public void onWorkerGroupDelete(List workerGroups) { - synchronized (dbWorkerGroupMapping) { + synchronized (workerGroupMapping) { for (WorkerGroup workerGroup : workerGroups) { - dbWorkerGroupMapping.remove(workerGroup.getName()); + workerGroupMapping.remove(workerGroup.getName()); } } } @@ -127,8 +106,8 @@ public void onWorkerGroupAdd(List workerGroups) { public void onWorkerGroupChange(List workerGroups) { for (WorkerGroup workerGroup : workerGroups) { List workerAddresses = WorkerGroupUtils.getWorkerAddressListFromWorkerGroup(workerGroup); - synchronized (dbWorkerGroupMapping) { - dbWorkerGroupMapping.put(workerGroup.getName(), workerAddresses); + synchronized (workerGroupMapping) { + workerGroupMapping.put(workerGroup.getName(), workerAddresses); } } } @@ -145,15 +124,12 @@ WorkerServerMetadata parseServerFromHeartbeat(String serverHeartBeatJson) { @Override public void onServerAdded(WorkerServerMetadata workerServer) { workerMapping.put(workerServer.getAddress(), workerServer); - synchronized (configWorkerGroupMapping) { - List addWorkerGroupAddrList = configWorkerGroupMapping.get(workerServer.getWorkerGroup()); - if (addWorkerGroupAddrList == null) { - List newWorkerGroupAddrList = new ArrayList<>(); - newWorkerGroupAddrList.add(workerServer.getAddress()); - configWorkerGroupMapping.put(workerServer.getWorkerGroup(), newWorkerGroupAddrList); - } else if (!addWorkerGroupAddrList.contains(workerServer.getAddress())) { - addWorkerGroupAddrList.add(workerServer.getAddress()); - configWorkerGroupMapping.put(workerServer.getWorkerGroup(), addWorkerGroupAddrList); + synchronized (workerGroupMapping) { + if (!workerGroupMapping.containsKey(workerServer.getWorkerGroup())) { + log.warn("The group: {} of worker: {} is not defined, please define the workergroup first", + workerServer.getWorkerGroup(), workerServer); + } else { + workerGroupMapping.get(workerServer.getWorkerGroup()).add(workerServer.getAddress()); } } for (IClustersChangeListener listener : workerClusterChangeListeners) { @@ -164,13 +140,9 @@ public void onServerAdded(WorkerServerMetadata workerServer) { @Override public void onServerRemove(WorkerServerMetadata workerServer) { workerMapping.remove(workerServer.getAddress(), workerServer); - synchronized (configWorkerGroupMapping) { - List removeWorkerGroupAddrList = configWorkerGroupMapping.get(workerServer.getWorkerGroup()); - if (removeWorkerGroupAddrList != null && removeWorkerGroupAddrList.contains(workerServer.getAddress())) { - removeWorkerGroupAddrList.remove(workerServer.getAddress()); - if (removeWorkerGroupAddrList.isEmpty()) { - configWorkerGroupMapping.remove(workerServer.getWorkerGroup()); - } + synchronized (workerGroupMapping) { + if (workerGroupMapping.containsKey(workerServer.getWorkerGroup())) { + workerGroupMapping.get(workerServer.getWorkerGroup()).remove(workerServer.getAddress()); } } for (IClustersChangeListener listener : workerClusterChangeListeners) { diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java index 0be794c49742..045e493d57fe 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java @@ -40,12 +40,12 @@ void testOnWorkerGroupDelete() { .addrList(normalWorkerServerMetadata.getAddress()) .build(); workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup)); - assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")) + assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")) .containsExactly(normalWorkerServerMetadata.getAddress()); workerClusters.onWorkerGroupDelete(Lists.newArrayList(workerGroup)); Truth.assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isFalse(); - assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty(); + assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty(); } @Test @@ -59,7 +59,7 @@ void testOnWorkerGroupAdd() { .addrList(normalWorkerServerMetadata.getAddress()) .build(); workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup)); - assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")) + assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")) .containsExactly(normalWorkerServerMetadata.getAddress()); } @@ -74,7 +74,7 @@ void testOnWorkerGroupChange() { .addrList(normalWorkerServerMetadata.getAddress()) .build(); workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup)); - assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")) + assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")) .containsExactly(normalWorkerServerMetadata.getAddress()); WorkerGroup updatedWorkerGroup = WorkerGroup.builder() @@ -82,7 +82,7 @@ void testOnWorkerGroupChange() { .addrList("") .build(); workerClusters.onWorkerGroupChange(Lists.newArrayList(updatedWorkerGroup)); - assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty(); + assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty(); assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isTrue(); } @@ -94,7 +94,7 @@ void testOnServerAdded() { WorkerClusters workerClusters = new WorkerClusters(); workerClusters.onServerAdded(normalWorkerServerMetadata); workerClusters.onServerAdded(busyWorkerServerMetadata); - assertThat(workerClusters.getDbWorkerServerAddressByGroup("default")) + assertThat(workerClusters.getWorkerServerAddressByGroup("default")) .containsExactly(normalWorkerServerMetadata.getAddress(), busyWorkerServerMetadata.getAddress()); assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default")) .containsExactly(normalWorkerServerMetadata.getAddress()); @@ -110,7 +110,7 @@ void testOnServerRemove() { workerClusters.onServerAdded(busyWorkerServerMetadata); workerClusters.onServerRemove(busyWorkerServerMetadata); - assertThat(workerClusters.getDbWorkerServerAddressByGroup("default")) + assertThat(workerClusters.getWorkerServerAddressByGroup("default")) .containsExactly(normalWorkerServerMetadata.getAddress()); assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default")) .containsExactly(normalWorkerServerMetadata.getAddress()); @@ -137,7 +137,7 @@ void testOnServerUpdate() { workerClusters.onServerUpdate(workerServerMetadata); - assertThat(workerClusters.getDbWorkerServerAddressByGroup("default")) + assertThat(workerClusters.getWorkerServerAddressByGroup("default")) .containsExactly(normalWorkerServerMetadata.getAddress(), workerServerMetadata.getAddress()); assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default")) .containsExactly(normalWorkerServerMetadata.getAddress(), workerServerMetadata.getAddress());