Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/en/guide/upgrade/incompatible.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ This document records the incompatible updates between each version. You need to
## 3.4.1

* Remove import and export of workflow definition. ([#17940])(https://github.com/apache/dolphinscheduler/issues/17940)
* Persist `default` worker group at database. ([#17929])(https://github.com/apache/dolphinscheduler/issues/17929)

1 change: 1 addition & 0 deletions docs/docs/zh/guide/upgrade/incompatible.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@
## 3.4.1

* 移除导入导出工作流([#17940])(https://github.com/apache/dolphinscheduler/issues/17940)
* 在数据库中持久化 `default` 工作组 ([#17929])(https://github.com/apache/dolphinscheduler/issues/17929)

Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,8 @@ CREATE TABLE t_ds_worker_group
-- Records of t_ds_worker_group
-- ----------------------------

INSERT INTO `t_ds_worker_group` (name, addr_list, create_time, update_time, description) VALUES ('default', NULL, current_timestamp, current_timestamp, 'default worker group');

-- ----------------------------
-- Table structure for t_ds_relation_project_worker_group
-- ----------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,7 @@ CREATE TABLE `t_ds_worker_group` (
-- ----------------------------
-- Records of t_ds_worker_group
-- ----------------------------
INSERT INTO `t_ds_worker_group` (name, addr_list, create_time, update_time, description) VALUES ('default', NULL, current_timestamp, current_timestamp, 'default worker group');

-- ----------------------------
-- Table structure for t_ds_version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,9 @@ VALUES (NULL, 1, 'default admin warning group', 'default admin warning group', '
INSERT INTO t_ds_queue(queue_name, queue, create_time, update_time)
VALUES ('default', 'default', '2018-11-29 10:22:33', '2018-11-29 10:22:33');

-- Records of t_ds_worker_group, default worker name : default
INSERT INTO t_ds_worker_group (name, addr_list, create_time, update_time, description) VALUES ('default', NULL, now(), now(), 'default worker group');

-- Records of t_ds_queue,default queue name : default
INSERT INTO t_ds_version(version) VALUES ('3.3.0');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

INSERT IGNORE INTO t_ds_worker_group (name, addr_list, create_time, update_time, description) VALUES ('default', NULL, NOW(), NOW(), 'default worker group');
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Insert default worker group if not exists
INSERT INTO t_ds_worker_group (name, addr_list, create_time, update_time, description)
VALUES ('default', NULL, now(), now(), 'default worker group')
ON CONFLICT (name) DO NOTHING;
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class WorkerClusters extends AbstractClusterSubscribeListener<WorkerServerMetadata>
implements
IClusters<WorkerServerMetadata>,
Expand All @@ -43,11 +46,8 @@ public class WorkerClusters extends AbstractClusterSubscribeListener<WorkerServe
// WorkerIdentifier(workerAddress) -> worker
private final Map<String, WorkerServerMetadata> workerMapping = new ConcurrentHashMap<>();

// WorkerGroup from db -> WorkerIdentifier(workerAddress)
private final Map<String, List<String>> dbWorkerGroupMapping = new ConcurrentHashMap<>();

// WorkerGroup from config -> WorkerIdentifier(workerAddress)
private final Map<String, List<String>> configWorkerGroupMapping = new ConcurrentHashMap<>();
// WorkerGroup -> WorkerIdentifier(workerAddress)
private final Map<String, List<String>> workerGroupMapping = new ConcurrentHashMap<>();

private final List<IClustersChangeListener<WorkerServerMetadata>> workerClusterChangeListeners =
new CopyOnWriteArrayList<>();
Expand All @@ -62,44 +62,23 @@ public Optional<WorkerServerMetadata> getServer(final String address) {
return Optional.ofNullable(workerMapping.get(address));
}

public List<String> getDbWorkerServerAddressByGroup(String workerGroup) {
if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) {
return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.keySet()));
}
return dbWorkerGroupMapping.getOrDefault(workerGroup, Collections.emptyList());
}

public List<String> getConfigWorkerServerAddressByGroup(String workerGroup) {
if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) {
return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.keySet()));
}
return configWorkerGroupMapping.getOrDefault(workerGroup, Collections.emptyList());
public List<String> getWorkerServerAddressByGroup(String workerGroup) {
return workerGroupMapping.getOrDefault(workerGroup, Collections.emptyList());
}

public List<String> getNormalWorkerServerAddressByGroup(String workerGroup) {
List<String> dbWorkerAddresses = getDbWorkerServerAddressByGroup(workerGroup)
.stream()
.map(workerMapping::get)
.filter(Objects::nonNull)
.filter(workerServer -> workerServer.getServerStatus() == ServerStatus.NORMAL)
.map(WorkerServerMetadata::getAddress)
.collect(Collectors.toList());
List<String> configWorkerAddresses = getConfigWorkerServerAddressByGroup(workerGroup)
List<String> dbWorkerAddresses = getWorkerServerAddressByGroup(workerGroup)
.stream()
.map(workerMapping::get)
.filter(Objects::nonNull)
.filter(workerServer -> workerServer.getServerStatus() == ServerStatus.NORMAL)
.map(WorkerServerMetadata::getAddress)
.collect(Collectors.toList());
dbWorkerAddresses.removeAll(configWorkerAddresses);
dbWorkerAddresses.addAll(configWorkerAddresses);
return UnmodifiableList.unmodifiableList(dbWorkerAddresses);
}

public boolean containsWorkerGroup(String workerGroup) {
return WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)
|| dbWorkerGroupMapping.containsKey(workerGroup)
|| configWorkerGroupMapping.containsKey(workerGroup);
return workerGroupMapping.containsKey(workerGroup);
}

@Override
Expand All @@ -109,9 +88,9 @@ public void registerListener(IClustersChangeListener<WorkerServerMetadata> liste

@Override
public void onWorkerGroupDelete(List<WorkerGroup> workerGroups) {
synchronized (dbWorkerGroupMapping) {
synchronized (workerGroupMapping) {
for (WorkerGroup workerGroup : workerGroups) {
dbWorkerGroupMapping.remove(workerGroup.getName());
workerGroupMapping.remove(workerGroup.getName());
}
}
}
Expand All @@ -127,8 +106,8 @@ public void onWorkerGroupAdd(List<WorkerGroup> workerGroups) {
public void onWorkerGroupChange(List<WorkerGroup> workerGroups) {
for (WorkerGroup workerGroup : workerGroups) {
List<String> workerAddresses = WorkerGroupUtils.getWorkerAddressListFromWorkerGroup(workerGroup);
synchronized (dbWorkerGroupMapping) {
dbWorkerGroupMapping.put(workerGroup.getName(), workerAddresses);
synchronized (workerGroupMapping) {
workerGroupMapping.put(workerGroup.getName(), workerAddresses);
}
}
}
Expand All @@ -145,15 +124,12 @@ WorkerServerMetadata parseServerFromHeartbeat(String serverHeartBeatJson) {
@Override
public void onServerAdded(WorkerServerMetadata workerServer) {
workerMapping.put(workerServer.getAddress(), workerServer);
synchronized (configWorkerGroupMapping) {
List<String> addWorkerGroupAddrList = configWorkerGroupMapping.get(workerServer.getWorkerGroup());
if (addWorkerGroupAddrList == null) {
List<String> newWorkerGroupAddrList = new ArrayList<>();
newWorkerGroupAddrList.add(workerServer.getAddress());
configWorkerGroupMapping.put(workerServer.getWorkerGroup(), newWorkerGroupAddrList);
} else if (!addWorkerGroupAddrList.contains(workerServer.getAddress())) {
addWorkerGroupAddrList.add(workerServer.getAddress());
configWorkerGroupMapping.put(workerServer.getWorkerGroup(), addWorkerGroupAddrList);
synchronized (workerGroupMapping) {
if (!workerGroupMapping.containsKey(workerServer.getWorkerGroup())) {
log.warn("The group: {} of worker: {} is not defined, please define the workergroup first",
workerServer.getWorkerGroup(), workerServer);
} else {
workerGroupMapping.get(workerServer.getWorkerGroup()).add(workerServer.getAddress());
}
}
for (IClustersChangeListener<WorkerServerMetadata> listener : workerClusterChangeListeners) {
Expand All @@ -164,13 +140,9 @@ public void onServerAdded(WorkerServerMetadata workerServer) {
@Override
public void onServerRemove(WorkerServerMetadata workerServer) {
workerMapping.remove(workerServer.getAddress(), workerServer);
synchronized (configWorkerGroupMapping) {
List<String> removeWorkerGroupAddrList = configWorkerGroupMapping.get(workerServer.getWorkerGroup());
if (removeWorkerGroupAddrList != null && removeWorkerGroupAddrList.contains(workerServer.getAddress())) {
removeWorkerGroupAddrList.remove(workerServer.getAddress());
if (removeWorkerGroupAddrList.isEmpty()) {
configWorkerGroupMapping.remove(workerServer.getWorkerGroup());
}
synchronized (workerGroupMapping) {
if (workerGroupMapping.containsKey(workerServer.getWorkerGroup())) {
workerGroupMapping.get(workerServer.getWorkerGroup()).remove(workerServer.getAddress());
}
}
for (IClustersChangeListener<WorkerServerMetadata> listener : workerClusterChangeListeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ void testOnWorkerGroupDelete() {
.addrList(normalWorkerServerMetadata.getAddress())
.build();
workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
.containsExactly(normalWorkerServerMetadata.getAddress());

workerClusters.onWorkerGroupDelete(Lists.newArrayList(workerGroup));
Truth.assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isFalse();
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty();
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty();
}

@Test
Expand All @@ -59,7 +59,7 @@ void testOnWorkerGroupAdd() {
.addrList(normalWorkerServerMetadata.getAddress())
.build();
workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
.containsExactly(normalWorkerServerMetadata.getAddress());
}

Expand All @@ -74,15 +74,15 @@ void testOnWorkerGroupChange() {
.addrList(normalWorkerServerMetadata.getAddress())
.build();
workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
.containsExactly(normalWorkerServerMetadata.getAddress());

WorkerGroup updatedWorkerGroup = WorkerGroup.builder()
.name("flinkCluster")
.addrList("")
.build();
workerClusters.onWorkerGroupChange(Lists.newArrayList(updatedWorkerGroup));
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty();
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty();
assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isTrue();
}

Expand All @@ -94,7 +94,7 @@ void testOnServerAdded() {
WorkerClusters workerClusters = new WorkerClusters();
workerClusters.onServerAdded(normalWorkerServerMetadata);
workerClusters.onServerAdded(busyWorkerServerMetadata);
assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress(), busyWorkerServerMetadata.getAddress());
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress());
Expand All @@ -110,7 +110,7 @@ void testOnServerRemove() {
workerClusters.onServerAdded(busyWorkerServerMetadata);
workerClusters.onServerRemove(busyWorkerServerMetadata);

assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress());
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress());
Expand All @@ -137,7 +137,7 @@ void testOnServerUpdate() {

workerClusters.onServerUpdate(workerServerMetadata);

assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress(), workerServerMetadata.getAddress());
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress(), workerServerMetadata.getAddress());
Expand Down
Loading