Skip to content

Commit

Permalink
[Optimize]Optimize some code (#3831)
Browse files Browse the repository at this point in the history
Co-authored-by: zackyoungh <zackyoungh@users.noreply.github.com>
  • Loading branch information
zackyoungh and zackyoungh authored Sep 24, 2024
1 parent 0f52cb2 commit 81db6e4
Show file tree
Hide file tree
Showing 20 changed files with 221 additions and 82 deletions.
3 changes: 3 additions & 0 deletions dinky-admin/src/main/java/org/dinky/aop/LogAspect.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ protected void handleCommonLogic(final JoinPoint joinPoint, final Exception e, O
// *========数据库日志=========*//
OperateLog operLog = new OperateLog();
Result<Void> result = JsonUtils.toBean(jsonResult, new TypeReference<Result<Void>>() {});
if (result == null) {
result = Result.failed();
}
operLog.setStatus(result.isSuccess() ? BusinessStatus.SUCCESS.ordinal() : BusinessStatus.FAIL.ordinal());

// 请求的地址
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@
package org.dinky.context;

import java.io.File;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* @since 0.7.0
*/
public class FlinkUdfPathContextHolder {
private static final List<String> PYTHON_FILE_SUFFIX =
Arrays.asList(".zip", ".py", ".pyc", ".pyo", ".pyd", ".pyw", ".pyz", ".pyzw");

private final Set<File> UDF_PATH_CONTEXT = new HashSet<>();
private final Set<File> OTHER_PLUGINS_PATH_CONTEXT = new HashSet<>();
private final Set<File> PYTHON_UDF_FILE = new HashSet<>();
private final Set<File> FILES = new HashSet<>();

public void addUdfPath(File file) {
Expand All @@ -54,7 +58,10 @@ public Set<File> getUdfFile() {
}

public Set<File> getPyUdfFile() {
return PYTHON_UDF_FILE;
return getAllFileSet().stream()
.filter(file -> PYTHON_FILE_SUFFIX.stream()
.anyMatch(suffix -> file.getName().endsWith(suffix)))
.collect(Collectors.toSet());
}

public Set<File> getOtherPluginsFiles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -42,6 +43,7 @@
@Slf4j
@Setter
@Getter
@NoArgsConstructor
public class SelectResult extends AbstractResult implements IResult {

private String jobID;
Expand All @@ -65,7 +67,6 @@ public SelectResult(
this.columns = columns;
this.jobID = jobID;
this.success = success;
// this.endTime = LocalDateTime.now();
this.isDestroyed = false;
}

Expand Down
22 changes: 12 additions & 10 deletions dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public YarnGateway(GatewayConfig config) {
super(config);
}

@Override
public void init() {
initConfig();
initYarnClient();
Expand Down Expand Up @@ -192,7 +193,7 @@ private void initYarnClient() {
hadoopUserName = "hdfs";
}

// 设置 yarn 提交的用户名
// Set the username for the yarn submission
String yarnUser = configuration.get(CustomerConfigureOptions.YARN_APPLICATION_USER);
if (StrUtil.isNotBlank(yarnUser)) {
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser(yarnUser));
Expand All @@ -211,6 +212,7 @@ private Path getYanConfigFilePath(String path) {
return new Path(URI.create(config.getClusterConfig().getHadoopConfigPath() + "/" + path));
}

@Override
public SavePointResult savepointCluster(String savePoint) {
if (Asserts.isNull(yarnClient)) {
init();
Expand All @@ -221,6 +223,7 @@ public SavePointResult savepointCluster(String savePoint) {
return runClusterSavePointResult(savePoint, applicationId, clusterDescriptor);
}

@Override
public SavePointResult savepointJob(String savePoint) {
if (Asserts.isNull(yarnClient)) {
init();
Expand Down Expand Up @@ -253,13 +256,14 @@ private void autoCancelCluster(ClusterClient<ApplicationId> clusterClient) {
Thread.sleep(3000);
clusterClient.shutDownCluster();
} catch (InterruptedException e) {
e.printStackTrace();
logger.error(e.getMessage());
} finally {
clusterClient.close();
}
});
}

@Override
public TestResult test() {
try {
initConfig();
Expand Down Expand Up @@ -365,13 +369,12 @@ protected YarnClusterDescriptor createYarnClusterDescriptorWithJar(FlinkUdfPathC
}

protected YarnClusterDescriptor createInitYarnClusterDescriptor() {
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
yarnClient,
YarnClientYarnClusterInformationRetriever.create(yarnClient),
true);
return yarnClusterDescriptor;
}

protected String getWebUrl(ClusterClient<ApplicationId> clusterClient, YarnResult result)
Expand Down Expand Up @@ -464,7 +467,7 @@ public String getLatestJobManageHost(String appId, String oldJobManagerHost) {
HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);

if (HighAvailabilityMode.ZOOKEEPER == highAvailabilityMode) {
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, appId);
configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, appId);
String zkQuorum = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM);

if (zkQuorum == null || StringUtils.isBlank(zkQuorum)) {
Expand Down Expand Up @@ -495,13 +498,13 @@ public String getLatestJobManageHost(String appId, String oldJobManagerHost) {
}
}
} catch (Exception e) {
e.printStackTrace();
logger.error("", e);
} finally {
if (Asserts.isNotNull(zooKeeper)) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
logger.error("", e);
}
}
}
Expand All @@ -515,12 +518,11 @@ public String getLatestJobManageHost(String appId, String oldJobManagerHost) {
* Creates a ZooKeeper path of the form "/a/b/.../z".
*/
private static String generateZookeeperPath(String... paths) {
final String result = Arrays.stream(paths)

return Arrays.stream(paths)
.map(YarnGateway::trimSlashes)
.filter(s -> !s.isEmpty())
.collect(Collectors.joining("/", "/", ""));

return result;
}

private static String trimSlashes(String input) {
Expand Down
7 changes: 4 additions & 3 deletions dinky-web/src/components/Flink/FlinkDag/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ function getMaxWidthAndDepth(edges: CusEdge[]): { maxWidth: number; maxDepth: nu
graph[sourceCell].push(targetCell);
});

const maxSource = Object.keys(sourceCount).reduce((a, b) =>
sourceCount[a] > sourceCount[b] ? a : b
);
const maxSource =
Object.keys(sourceCount).length < 1
? '1'
: Object.keys(sourceCount).reduce((a, b) => (sourceCount[a] > sourceCount[b] ? a : b));
const maxWidth = sourceCount[maxSource];

const visited: Record<string, boolean> = {};
Expand Down
2 changes: 1 addition & 1 deletion dinky-web/src/components/Flink/OptionsSelect/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const FlinkOptionsSelect = (props: FlinkOptionsProps) => {
return (
<ProFormSelect
{...props}
fieldProps={{ dropdownRender: (item) => renderTemplateDropDown(item) }}
fieldProps={{ dropdownRender: (item) => renderTemplateDropDown(item), virtual: false }}
/>
);
};
Expand Down
Loading

0 comments on commit 81db6e4

Please sign in to comment.