diff --git a/docs/sql/flink_web.sql b/docs/sql/flink_web.sql index 30ad321e..c2a38411 100644 --- a/docs/sql/flink_web.sql +++ b/docs/sql/flink_web.sql @@ -73,6 +73,31 @@ ALTER TABLE job_config add `custom_args` varchar(128) DEFAULT NULL COMMENT '启 ALTER TABLE job_config add `custom_main_class` varchar(128) DEFAULT NULL COMMENT '程序入口类' AFTER custom_args; ALTER TABLE job_config add `custom_jar_url` varchar(128) DEFAULT NULL COMMENT'自定义jar的http地址 如:http://ccblog.cn/xx.jar' AFTER custom_main_class; + +-- ---------------------------- +-- Table structure for job_config_history +-- ---------------------------- +CREATE TABLE `job_config_history` ( + `id` bigint(11) unsigned NOT NULL AUTO_INCREMENT, + `job_config_id` bigint(11) NOT NULL COMMENT 'job_config主表Id', + `job_name` varchar(64) NOT NULL COMMENT '任务名称', + `deploy_mode` varchar(64) NOT NULL COMMENT '提交模式: standalone 、yarn 、yarn-session ', + `flink_run_config` varchar(512) NOT NULL COMMENT 'flink运行配置', + `flink_sql` mediumtext NOT NULL COMMENT 'sql语句', + `flink_checkpoint_config` varchar(512) DEFAULT NULL COMMENT 'checkPoint配置', + `ext_jar_path` varchar(2048) DEFAULT NULL COMMENT 'udf地址及连接器jar 如http://xxx.xxx.com/flink-streaming-udf.jar', + `version` int(11) NOT NULL DEFAULT '0' COMMENT '更新版本号', + `is_deleted` tinyint(1) NOT NULL DEFAULT '0', + `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `edit_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + `creator` varchar(32) DEFAULT 'sys', + `editor` varchar(32) DEFAULT 'sys', + PRIMARY KEY (`id`), + KEY `index_job_config_id` (`job_config_id`) USING BTREE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='flink任务配置历史变更表'; + + + -- ---------------------------- -- Table structure for job_run_log -- ---------------------------- @@ -98,6 +123,7 @@ CREATE TABLE `job_run_log` ( ALTER TABLE job_run_log add `run_ip` varchar(64) DEFAULT NULL COMMENT '任务运行所在的机器' AFTER local_log ; + -- ---------------------------- -- Table structure for savepoint_backup -- ---------------------------- diff --git a/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/dto/JobConfigHistoryDTO.java b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/dto/JobConfigHistoryDTO.java new file mode 100644 index 00000000..bc86c3be --- /dev/null +++ b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/dto/JobConfigHistoryDTO.java @@ -0,0 +1,162 @@ +package com.flink.streaming.web.model.dto; + +import cn.hutool.core.collection.CollectionUtil; +import com.flink.streaming.web.model.entity.JobConfig; +import com.flink.streaming.web.model.entity.JobConfigHistory; +import lombok.Data; +import org.apache.commons.compress.utils.Lists; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +/** + * @author zhuhuipei + * @date 2021/5/5 + * @time 19:49 + */ +@Data +public class JobConfigHistoryDTO implements Serializable { + + private static final long serialVersionUID = 1L; + + private Long id; + + /** + * job_config主表Id + */ + private Long jobConfigId; + + /** + * 任务名称 + */ + private String jobName; + + /** + * 提交模式: standalone 、yarn 、yarn-session + */ + private String deployMode; + + /** + * flink运行配置 + */ + private String flinkRunConfig; + + /** + * checkPoint配置 + */ + private String flinkCheckpointConfig; + + /** + * udf地址及连接器jar 如http://xxx.xxx.com/flink-streaming-udf.jar + */ + private String extJarPath; + + /** + * 更新版本号 + */ + private Integer version; + + /** + * 创建时间 + */ + private Date createTime; + + /** + * 修改时间 + */ + private Date editTime; + + private String creator; + + private String editor; + + /** + * sql语句 + */ + private String flinkSql; + + + public static JobConfigHistory toEntity(JobConfigHistoryDTO jobConfigHistoryDTO) { + if (jobConfigHistoryDTO == null) { + return null; + } + JobConfigHistory jobConfigHistory = new JobConfigHistory(); + jobConfigHistory.setId(jobConfigHistoryDTO.getId()); + jobConfigHistory.setJobConfigId(jobConfigHistoryDTO.getJobConfigId()); + jobConfigHistory.setJobName(jobConfigHistoryDTO.getJobName()); + jobConfigHistory.setDeployMode(jobConfigHistoryDTO.getDeployMode()); + jobConfigHistory.setFlinkRunConfig(jobConfigHistoryDTO.getFlinkRunConfig()); + jobConfigHistory.setFlinkCheckpointConfig(jobConfigHistoryDTO.getFlinkCheckpointConfig()); + jobConfigHistory.setExtJarPath(jobConfigHistoryDTO.getExtJarPath()); + jobConfigHistory.setVersion(jobConfigHistoryDTO.getVersion()); + jobConfigHistory.setCreateTime(jobConfigHistoryDTO.getCreateTime()); + jobConfigHistory.setEditTime(jobConfigHistoryDTO.getEditTime()); + jobConfigHistory.setCreator(jobConfigHistoryDTO.getCreator()); + jobConfigHistory.setEditor(jobConfigHistoryDTO.getEditor()); + jobConfigHistory.setFlinkSql(jobConfigHistoryDTO.getFlinkSql()); + return jobConfigHistory; + } + + + public static JobConfigHistoryDTO toDTO(JobConfigHistory jobConfigHistory) { + if (jobConfigHistory == null) { + return null; + } + JobConfigHistoryDTO jobConfigHistoryDTO = new JobConfigHistoryDTO(); + jobConfigHistoryDTO.setId(jobConfigHistory.getId()); + jobConfigHistoryDTO.setJobConfigId(jobConfigHistory.getJobConfigId()); + jobConfigHistoryDTO.setJobName(jobConfigHistory.getJobName()); + jobConfigHistoryDTO.setDeployMode(jobConfigHistory.getDeployMode()); + jobConfigHistoryDTO.setFlinkRunConfig(jobConfigHistory.getFlinkRunConfig()); + jobConfigHistoryDTO.setFlinkCheckpointConfig(jobConfigHistory.getFlinkCheckpointConfig()); + jobConfigHistoryDTO.setExtJarPath(jobConfigHistory.getExtJarPath()); + jobConfigHistoryDTO.setVersion(jobConfigHistory.getVersion()); + jobConfigHistoryDTO.setCreateTime(jobConfigHistory.getCreateTime()); + jobConfigHistoryDTO.setEditTime(jobConfigHistory.getEditTime()); + jobConfigHistoryDTO.setCreator(jobConfigHistory.getCreator()); + jobConfigHistoryDTO.setEditor(jobConfigHistory.getEditor()); + jobConfigHistoryDTO.setFlinkSql(jobConfigHistory.getFlinkSql()); + return jobConfigHistoryDTO; + } + + public static List toListDTO(List jobConfigHistoryList) { + if (CollectionUtil.isEmpty(jobConfigHistoryList)) { + return Collections.EMPTY_LIST; + } + + List list = Lists.newArrayList(); + + for (JobConfigHistory jobConfigHistory : jobConfigHistoryList) { + + JobConfigHistoryDTO jobConfigHistoryDTO = JobConfigHistoryDTO.toDTO(jobConfigHistory); + if (jobConfigHistoryDTO != null) { + list.add(jobConfigHistoryDTO); + } + } + + return list; + } + + + public static JobConfigHistoryDTO to(JobConfig jobConfig) { + if (jobConfig == null) { + return null; + } + JobConfigHistoryDTO jobConfigHistoryDTO = new JobConfigHistoryDTO(); + jobConfigHistoryDTO.setJobConfigId (jobConfig.getId()); + jobConfigHistoryDTO.setJobName(jobConfig.getJobName()); + jobConfigHistoryDTO.setDeployMode(jobConfig.getDeployMode()); + jobConfigHistoryDTO.setFlinkRunConfig(jobConfig.getFlinkRunConfig()); + jobConfigHistoryDTO.setFlinkCheckpointConfig(jobConfig.getFlinkCheckpointConfig()); + jobConfigHistoryDTO.setExtJarPath(jobConfig.getExtJarPath()); + jobConfigHistoryDTO.setVersion(jobConfig.getVersion()); + jobConfigHistoryDTO.setCreateTime(jobConfig.getCreateTime()); + jobConfigHistoryDTO.setEditTime(jobConfig.getEditTime()); + jobConfigHistoryDTO.setCreator(jobConfig.getCreator()); + jobConfigHistoryDTO.setEditor(jobConfig.getEditor()); + jobConfigHistoryDTO.setFlinkSql(jobConfig.getFlinkSql()); + return jobConfigHistoryDTO; + } +} diff --git a/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/entity/JobConfigHistory.java b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/entity/JobConfigHistory.java new file mode 100644 index 00000000..5da21c11 --- /dev/null +++ b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/entity/JobConfigHistory.java @@ -0,0 +1,79 @@ +package com.flink.streaming.web.model.entity; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * + * @author zhuhuipei + * @date 2021/5/5 + * @time 19:49 + */ +@Data +public class JobConfigHistory implements Serializable { + + private static final long serialVersionUID = 1L; + + private Long id; + + /** + * job_config主表Id + */ + private Long jobConfigId; + + /** + * 任务名称 + */ + private String jobName; + + /** + * 提交模式: standalone 、yarn 、yarn-session + */ + private String deployMode; + + /** + * flink运行配置 + */ + private String flinkRunConfig; + + /** + * checkPoint配置 + */ + private String flinkCheckpointConfig; + + /** + * udf地址及连接器jar 如http://xxx.xxx.com/flink-streaming-udf.jar + */ + private String extJarPath; + + /** + * 更新版本号 + */ + private Integer version; + + private Boolean isDeleted; + + /** + * 创建时间 + */ + private Date createTime; + + /** + * 修改时间 + */ + private Date editTime; + + private String creator; + + private String editor; + + /** + * sql语句 + */ + private String flinkSql; + + + +} diff --git a/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/vo/JobConfigHistoryVO.java b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/vo/JobConfigHistoryVO.java new file mode 100644 index 00000000..d028b2de --- /dev/null +++ b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/vo/JobConfigHistoryVO.java @@ -0,0 +1,115 @@ +package com.flink.streaming.web.model.vo; + +import cn.hutool.core.collection.CollectionUtil; +import com.flink.streaming.web.common.util.DateFormatUtils; +import com.flink.streaming.web.model.dto.JobConfigHistoryDTO; +import lombok.Data; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * @author zhuhuipei + * @date 2021/5/5 + * @time 19:49 + */ +@Data +public class JobConfigHistoryVO implements Serializable { + + private static final long serialVersionUID = 1L; + + private Long id; + + /** + * job_config主表Id + */ + private Long jobConfigId; + + /** + * 任务名称 + */ + private String jobName; + + /** + * 提交模式: standalone 、yarn 、yarn-session + */ + private String deployMode; + + /** + * flink运行配置 + */ + private String flinkRunConfig; + + /** + * checkPoint配置 + */ + private String flinkCheckpointConfig; + + /** + * udf地址及连接器jar 如http://xxx.xxx.com/flink-streaming-udf.jar + */ + private String extJarPath; + + /** + * 更新版本号 + */ + private Integer version; + + /** + * 创建时间 + */ + private String createTime; + + /** + * 修改时间 + */ + private String editTime; + + private String creator; + + private String editor; + + /** + * sql语句 + */ + private String flinkSql; + + + public static JobConfigHistoryVO toVO(JobConfigHistoryDTO jobConfigHistoryDTO, boolean isFlinkSql) { + if (jobConfigHistoryDTO == null) { + return null; + } + JobConfigHistoryVO jobConfigHistoryVO = new JobConfigHistoryVO(); + jobConfigHistoryVO.setId(jobConfigHistoryDTO.getId()); + jobConfigHistoryVO.setJobConfigId(jobConfigHistoryDTO.getJobConfigId()); + jobConfigHistoryVO.setJobName(jobConfigHistoryDTO.getJobName()); + jobConfigHistoryVO.setDeployMode(jobConfigHistoryDTO.getDeployMode()); + jobConfigHistoryVO.setFlinkRunConfig(jobConfigHistoryDTO.getFlinkRunConfig()); + jobConfigHistoryVO.setFlinkCheckpointConfig(jobConfigHistoryDTO.getFlinkCheckpointConfig()); + jobConfigHistoryVO.setExtJarPath(jobConfigHistoryDTO.getExtJarPath()); + jobConfigHistoryVO.setVersion(jobConfigHistoryDTO.getVersion()); + jobConfigHistoryVO.setCreateTime(DateFormatUtils.toFormatString(jobConfigHistoryDTO.getCreateTime())); + jobConfigHistoryVO.setEditTime(DateFormatUtils.toFormatString(jobConfigHistoryDTO.getEditTime())); + jobConfigHistoryVO.setCreator(jobConfigHistoryDTO.getCreator()); + jobConfigHistoryVO.setEditor(jobConfigHistoryDTO.getEditor()); + if (isFlinkSql) { + jobConfigHistoryVO.setFlinkSql(jobConfigHistoryDTO.getFlinkSql()); + } + return jobConfigHistoryVO; + } + + public static List toListVO(List jobConfigHistoryDTOList) { + if (CollectionUtil.isEmpty(jobConfigHistoryDTOList)) { + return Collections.EMPTY_LIST; + } + List list = new ArrayList<>(); + + for (JobConfigHistoryDTO jobConfigHistoryDTO : jobConfigHistoryDTOList) { + list.add(toVO(jobConfigHistoryDTO, Boolean.FALSE)); + } + + return list; + } +} diff --git a/flink-streaming-web/src/main/java/com/flink/streaming/web/controller/web/JobConfigHistoryController.java b/flink-streaming-web/src/main/java/com/flink/streaming/web/controller/web/JobConfigHistoryController.java new file mode 100644 index 00000000..57481705 --- /dev/null +++ b/flink-streaming-web/src/main/java/com/flink/streaming/web/controller/web/JobConfigHistoryController.java @@ -0,0 +1,50 @@ +package com.flink.streaming.web.controller.web; + +import com.flink.streaming.web.model.dto.JobConfigHistoryDTO; +import com.flink.streaming.web.model.vo.JobConfigHistoryVO; +import com.flink.streaming.web.service.JobConfigHistoryService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Controller; +import org.springframework.ui.ModelMap; +import org.springframework.web.bind.annotation.RequestMapping; + +import java.util.List; + +/** + * @author zhuhuipei + * @Description: + * @date 2020-08-16 + * @time 23:24 + */ +@Controller +@RequestMapping("/admin") +@Slf4j +public class JobConfigHistoryController { + + @Autowired + private JobConfigHistoryService jobConfigHistoryService; + + + @RequestMapping(value = "/jobConfigHistoryPage") + public String listPage(ModelMap modelMap, Long jobConfigId) { + if (jobConfigId==null){ + modelMap.put("message", "jobConfigId参数不能为空"); + return "screen/job_config_history/listPage"; + } + modelMap.put("jobConfigId", jobConfigId); + List list = jobConfigHistoryService.getJobConfigHistoryByJobConfigId(jobConfigId); + modelMap.put("jobConfigHistoryList", JobConfigHistoryVO.toListVO(list)); + return "screen/job_config_history/listPage"; + } + + + @RequestMapping("/jobConfigHistoryDetailPage") + public String detailPage(ModelMap modelMap, Long id) { + JobConfigHistoryDTO jobConfigHistoryDTO= jobConfigHistoryService.getJobConfigHistoryById(id); + modelMap.put("jobConfigHistory", JobConfigHistoryVO.toVO(jobConfigHistoryDTO,Boolean.TRUE)); + return "screen/job_config_history/detailPage"; + } + + +} diff --git a/flink-streaming-web/src/main/java/com/flink/streaming/web/mapper/JobConfigHistoryMapper.java b/flink-streaming-web/src/main/java/com/flink/streaming/web/mapper/JobConfigHistoryMapper.java new file mode 100644 index 00000000..e4bb7af0 --- /dev/null +++ b/flink-streaming-web/src/main/java/com/flink/streaming/web/mapper/JobConfigHistoryMapper.java @@ -0,0 +1,19 @@ +package com.flink.streaming.web.mapper; + +import com.flink.streaming.web.model.entity.JobConfigHistory; +import org.apache.ibatis.annotations.Param; +import org.springframework.stereotype.Repository; + +import java.util.List; + +@Repository +public interface JobConfigHistoryMapper { + + int insert(JobConfigHistory record); + + List selectByJobConfigId(@Param("jobConfigId") Long jobConfigId); + + + JobConfigHistory selectById(@Param("id") Long id); + +} diff --git a/flink-streaming-web/src/main/java/com/flink/streaming/web/service/JobConfigHistoryService.java b/flink-streaming-web/src/main/java/com/flink/streaming/web/service/JobConfigHistoryService.java new file mode 100644 index 00000000..b8a2ea7c --- /dev/null +++ b/flink-streaming-web/src/main/java/com/flink/streaming/web/service/JobConfigHistoryService.java @@ -0,0 +1,44 @@ +package com.flink.streaming.web.service; + +import com.flink.streaming.web.model.dto.JobConfigHistoryDTO; + +import java.util.List; + +/** + * @author zhuhuipei + * @Description: + * @date 2021/5/5 + * @time 20:11 + */ +public interface JobConfigHistoryService { + + /** + * 新增记录 + * + * @author zhuhuipei + * @date 2021/5/5 + * @time 20:13 + */ + void insertJobConfigHistory(JobConfigHistoryDTO jobConfigHistoryDTO); + + + /** + * 查询历史记录 + * + * @author zhuhuipei + * @date 2021/5/5 + * @time 20:13 + */ + List getJobConfigHistoryByJobConfigId(Long jobConfigId); + + + /** + * 详情 + * + * @author zhuhuipei + * @date 2021/5/5 + * @time 20:14 + */ + JobConfigHistoryDTO getJobConfigHistoryById(Long id); + +} diff --git a/flink-streaming-web/src/main/java/com/flink/streaming/web/service/impl/JobConfigHistoryServiceImpl.java b/flink-streaming-web/src/main/java/com/flink/streaming/web/service/impl/JobConfigHistoryServiceImpl.java new file mode 100644 index 00000000..e3e7c645 --- /dev/null +++ b/flink-streaming-web/src/main/java/com/flink/streaming/web/service/impl/JobConfigHistoryServiceImpl.java @@ -0,0 +1,39 @@ +package com.flink.streaming.web.service.impl; + +import com.flink.streaming.web.mapper.JobConfigHistoryMapper; +import com.flink.streaming.web.model.dto.JobConfigHistoryDTO; +import com.flink.streaming.web.service.JobConfigHistoryService; +import lombok.extern.java.Log; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * @author zhuhuipei + * @Description: + * @date 2021/5/5 + * @time 20:11 + */ +@Service +@Log +public class JobConfigHistoryServiceImpl implements JobConfigHistoryService { + + @Autowired + private JobConfigHistoryMapper jobConfigHistoryMapper; + + @Override + public void insertJobConfigHistory(JobConfigHistoryDTO jobConfigHistoryDTO) { + jobConfigHistoryMapper.insert(JobConfigHistoryDTO.toEntity(jobConfigHistoryDTO)); + } + + @Override + public List getJobConfigHistoryByJobConfigId(Long jobConfigId) { + return JobConfigHistoryDTO.toListDTO(jobConfigHistoryMapper.selectByJobConfigId(jobConfigId)); + } + + @Override + public JobConfigHistoryDTO getJobConfigHistoryById(Long id) { + return JobConfigHistoryDTO.toDTO(jobConfigHistoryMapper.selectById(id)); + } +} diff --git a/flink-streaming-web/src/main/java/com/flink/streaming/web/service/impl/JobConfigServiceImpl.java b/flink-streaming-web/src/main/java/com/flink/streaming/web/service/impl/JobConfigServiceImpl.java index 4f4dba30..c63c4128 100644 --- a/flink-streaming-web/src/main/java/com/flink/streaming/web/service/impl/JobConfigServiceImpl.java +++ b/flink-streaming-web/src/main/java/com/flink/streaming/web/service/impl/JobConfigServiceImpl.java @@ -1,16 +1,14 @@ package com.flink.streaming.web.service.impl; -import com.flink.streaming.web.exceptions.BizException; import com.flink.streaming.web.enums.*; +import com.flink.streaming.web.exceptions.BizException; import com.flink.streaming.web.mapper.JobConfigMapper; import com.flink.streaming.web.model.dto.JobConfigDTO; +import com.flink.streaming.web.model.dto.JobConfigHistoryDTO; import com.flink.streaming.web.model.dto.PageModel; import com.flink.streaming.web.model.entity.JobConfig; import com.flink.streaming.web.model.param.JobConfigParam; -import com.flink.streaming.web.service.JobAlarmConfigService; -import com.flink.streaming.web.service.JobConfigService; -import com.flink.streaming.web.service.JobRunLogService; -import com.flink.streaming.web.service.SystemConfigService; +import com.flink.streaming.web.service.*; import com.github.pagehelper.Page; import com.github.pagehelper.PageHelper; import lombok.extern.slf4j.Slf4j; @@ -46,6 +44,9 @@ public class JobConfigServiceImpl implements JobConfigService { @Autowired private SystemConfigService systemConfigService; + @Autowired + private JobConfigHistoryService jobConfigHistoryService; + @Override public Long addJobConfig(JobConfigDTO jobConfigDTO) { if (jobConfigDTO == null) { @@ -55,6 +56,7 @@ public Long addJobConfig(JobConfigDTO jobConfigDTO) { this.checkSystemConfig(jobConfigDTO.getDeployModeEnum()); JobConfig jobConfig = JobConfigDTO.toEntity(jobConfigDTO); jobConfigMapper.insert(jobConfig); + this.insertJobConfigHistory(jobConfig.getId()); return jobConfig.getId(); } @@ -77,8 +79,11 @@ public void updateJobConfigById(JobConfigDTO jobConfigDTO) { if (StringUtils.isNotEmpty(jobConfigDTO.getJobName())) { this.checkJobName(jobConfigDTO.getJobName(), jobConfigDTO.getId()); } + JobConfig JobConfigUpdate = JobConfigDTO.toEntity(jobConfigDTO); - jobConfigMapper.updateByPrimaryKeySelective(JobConfigDTO.toEntity(jobConfigDTO)); + jobConfigMapper.updateByPrimaryKeySelective(JobConfigUpdate); + + this.insertJobConfigHistory(jobConfigDTO.getId()); } @Override @@ -244,4 +249,11 @@ private void checkSystemConfig(DeployModeEnum deployModeEnum) { } } + + private void insertJobConfigHistory(Long id) { + JobConfig jobConfig = jobConfigMapper.selectByPrimaryKey(id); + if (jobConfig != null && JobTypeEnum.SQL.getCode()== jobConfig.getJobType().intValue()) { + jobConfigHistoryService.insertJobConfigHistory(JobConfigHistoryDTO.to(jobConfig)); + } + } } diff --git a/flink-streaming-web/src/main/resources/mapper/JobConfigHistoryMapper.xml b/flink-streaming-web/src/main/resources/mapper/JobConfigHistoryMapper.xml new file mode 100644 index 00000000..2ee26b23 --- /dev/null +++ b/flink-streaming-web/src/main/resources/mapper/JobConfigHistoryMapper.xml @@ -0,0 +1,75 @@ + + + + + + + + + + + + + + + + + + + + + + + + id, job_config_id, job_name, deploy_mode, flink_run_config, flink_checkpoint_config, + ext_jar_path, version, is_deleted, create_time, edit_time, creator, editor + + + flink_sql + + + + + + SELECT LAST_INSERT_ID() + + insert into job_config_history (job_config_id, job_name, deploy_mode, + flink_run_config, flink_checkpoint_config, + ext_jar_path, version, is_deleted, + create_time, edit_time, creator, + editor, flink_sql) + values (#{jobConfigId,jdbcType=BIGINT}, #{jobName,jdbcType=VARCHAR}, #{deployMode,jdbcType=VARCHAR}, + #{flinkRunConfig,jdbcType=VARCHAR}, #{flinkCheckpointConfig,jdbcType=VARCHAR}, + #{extJarPath,jdbcType=VARCHAR}, #{version,jdbcType=INTEGER},0, + now(), now(), #{creator,jdbcType=VARCHAR}, + #{editor,jdbcType=VARCHAR}, #{flinkSql,jdbcType=LONGVARCHAR}) + + + + + + + + + diff --git a/flink-streaming-web/src/main/resources/templates/screen/job_config/editPage.ftl b/flink-streaming-web/src/main/resources/templates/screen/job_config/editPage.ftl index f52d71be..3759f096 100644 --- a/flink-streaming-web/src/main/resources/templates/screen/job_config/editPage.ftl +++ b/flink-streaming-web/src/main/resources/templates/screen/job_config/editPage.ftl @@ -64,6 +64,11 @@ 点击查看 +
+ + 历史版本 +
+
diff --git a/flink-streaming-web/src/main/resources/templates/screen/job_config/listPage.ftl b/flink-streaming-web/src/main/resources/templates/screen/job_config/listPage.ftl index cac6a83f..f9f30383 100644 --- a/flink-streaming-web/src/main/resources/templates/screen/job_config/listPage.ftl +++ b/flink-streaming-web/src/main/resources/templates/screen/job_config/listPage.ftl @@ -139,13 +139,14 @@ 操作 辅助 日志 + 历史版本 <#if jobConfigList?size == 0> - + 没有数据 @@ -195,7 +196,6 @@ <#if jobConfigVO.isOpen==1> - <#if jobConfigVO.stauts==1> 停止任务 <#else> @@ -215,7 +215,9 @@ 日志详情 历史日志 - + + + 历史版本 diff --git a/flink-streaming-web/src/main/resources/templates/screen/job_config_history/detailPage.ftl b/flink-streaming-web/src/main/resources/templates/screen/job_config_history/detailPage.ftl new file mode 100644 index 00000000..05465440 --- /dev/null +++ b/flink-streaming-web/src/main/resources/templates/screen/job_config_history/detailPage.ftl @@ -0,0 +1,82 @@ + + + + + + + + + 配置历史详情 + <#include "../../control/public_css_js.ftl"> + + + + + + + +
+ + + <#include "../../layout/menu.ftl"> + + +
+
+ + + +
+ +
+
+ + <#if message??> +
+ ${message} +
+ <#else > +
+

备份时间:

+
${jobConfigHistory.createTime!""}
+
+ +
+

运行模式:

+
${jobConfigHistory.deployMode!""}
+
+ +
+

flink运行配置:

+
${jobConfigHistory.flinkRunConfig!"无"}
+
+
+

Checkpoint信息:

+
${jobConfigHistory.flinkCheckpointConfig!"无"}
+
+
+

三方jar地址:

+
${jobConfigHistory.extJarPath!"无"}
+
+
+

sql语句:

+
${jobConfigHistory.flinkSql!""}
+
+ + + +
+
+
+
+
+ + <#include "../../layout/bottom.ftl"> + +
+ + diff --git a/flink-streaming-web/src/main/resources/templates/screen/job_config_history/listPage.ftl b/flink-streaming-web/src/main/resources/templates/screen/job_config_history/listPage.ftl new file mode 100644 index 00000000..8517b524 --- /dev/null +++ b/flink-streaming-web/src/main/resources/templates/screen/job_config_history/listPage.ftl @@ -0,0 +1,141 @@ + + + + + + 历史版本查询列表 + + + <#include "../../control/public_css_js.ftl"> + + + + + + + + + +
+ + + <#include "../../layout/menu.ftl"> + + +
+
+ + +
+
+
+

+ 只显示最近50次变更的记录(每次新增、修改都会记录),另外如果想查看更多版本可以直接查库 +

+
+
+
+ +
+ +
+
+ +
+ + + + + + + + + + + + + + + <#if jobConfigHistoryList?size == 0> + + + + <#else> + + <#list jobConfigHistoryList as jobConfigVO> + + + + + + + + + + + + + +
序号配置ID任务名称运行模式版本号创建时间详情
+ 没有数据 +
${jobConfigVO_index+1}${jobConfigVO.jobConfigId!""}${jobConfigVO.jobName!""}${jobConfigVO.deployMode!""}${jobConfigVO.version!""}${jobConfigVO.createTime!""} + 详情 +
+
+
+
+
+
+
+ + + <#include "../../layout/bottom.ftl"> + +
+ + + + + diff --git a/flink-streaming-web/src/main/resources/templates/screen/job_log/detailLogPage.ftl b/flink-streaming-web/src/main/resources/templates/screen/job_log/detailLogPage.ftl index 00c56075..c6044fd8 100644 --- a/flink-streaming-web/src/main/resources/templates/screen/job_log/detailLogPage.ftl +++ b/flink-streaming-web/src/main/resources/templates/screen/job_log/detailLogPage.ftl @@ -38,6 +38,7 @@
+
@@ -64,15 +65,17 @@ <#if jobRunLogDetail.clinetJobUrl??>
- -
 点击查看Flink客户端日志(连接跳转后 如果要看最新日志 需要手动刷新页面) 
+ +
${jobRunLogDetail.clinetJobUrl!""}   (连接跳转后 如果要看最新日志 需要手动刷新页面)
<#if jobRunLogDetail.remoteLogUrl??>