Skip to content

Commit

Permalink
[Improve][Engine] Improve flink engine (#510)
Browse files Browse the repository at this point in the history
  • Loading branch information
zixi0825 authored Feb 11, 2025
1 parent fdb9d32 commit bab98b6
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,20 @@ public class CommonConstants {
public static final String CATALOG_ENTITY_INSTANCE_STATUS_ACTIVE = "active";

public static final String CATALOG_ENTITY_INSTANCE_STATUS_DELETED = "deleted";

public static final String WEEK_START_DAY = "week_start_day";

public static final String WEEK_END_DAY = "week_end_day";

public static final String MONTH_START_DAY = "month_start_day";

public static final String MONTH_END_DAY = "month_end_day";

public static final String DAY_START_TIME = "day_start_time";

public static final String DAY_END_TIME = "day_end_time";

public static final String DAY_AFTER_7_END_TIME = "day_after_7_end_time";

public static final String DAY_AFTER_30_END_TIME = "day_after_30_end_time";
}
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ public static void setValue(String key, String value) {
public static void remove(String key) {
PROPERTIES.remove(key);
}

public static Map<String, String> getPropertiesByPrefix(String prefix) {
if (StringUtils.isEmpty(prefix)) {
return null;
Expand Down
129 changes: 103 additions & 26 deletions datavines-common/src/main/java/io/datavines/common/utils/DateUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
*/
package io.datavines.common.utils;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAdjusters;
import java.util.Calendar;
import java.util.Date;
import java.util.Objects;
Expand All @@ -43,6 +42,7 @@ public class DateUtils {
public static final String YYYYMMDDHHMMSS = "yyyyMMddHHmmss";

public static final String YYYYMMDD = "yyyyMMdd";

public static final String YYYY_MM_DD = "yyyy-MM-dd";

/**
Expand Down Expand Up @@ -107,6 +107,17 @@ public static String format(LocalDateTime localDateTime, String format) {
return localDateTime.format(DateTimeFormatter.ofPattern(format));
}

/**
* get the formatted date string
*
* @param localDate local data
* @param format yyyy-MM-dd HH:mm:ss
* @return date string
*/
public static String format(LocalDate localDate, String format) {
return localDate.format(DateTimeFormatter.ofPattern(format));
}

/**
* convert time to yyyy-MM-dd HH:mm:ss format
*
Expand All @@ -117,7 +128,6 @@ public static String dateToString(Date date) {
return format(date, YYYY_MM_DD_HH_MM_SS);
}


/**
* convert string to date and time
*
Expand All @@ -135,7 +145,6 @@ public static Date parse(String date, String format) {
return null;
}


/**
* convert date str to yyyy-MM-dd HH:mm:ss format
*
Expand Down Expand Up @@ -171,7 +180,6 @@ public static long differMs(Date d1, Date d2) {
return Math.abs(d1.getTime() - d2.getTime());
}


/**
* get hours between two dates
*
Expand All @@ -198,7 +206,6 @@ public static long diffMin(Date d1, Date d2) {
return (long) Math.ceil(differSec(d1, d2) / 60.0);
}


/**
* get the date of the specified date in the days before and after
*
Expand Down Expand Up @@ -248,7 +255,6 @@ public static String format2Readable(long ms) {
long seconds = (ms % (1000 * 60)) / 1000;

return String.format("%02d %02d:%02d:%02d", days, hours, minutes, seconds);

}

/**
Expand Down Expand Up @@ -302,38 +308,109 @@ public static Date getFirstDayOfMonth(Date date) {
}

/**
* get some hour of day
* get last day of month
*
* @param date date
* @param hours hours
* @return some hour of day
* */
public static Date getSomeHourOfDay(Date date, int hours) {
* @param date date
* @return get last day of month
*/
public static Date getLastDayOfMonth(Date date) {
Calendar cal = Calendar.getInstance();

cal.setTime(date);
cal.set(Calendar.HOUR_OF_DAY, cal.get(Calendar.HOUR_OF_DAY) - hours);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);

cal.add(Calendar.MONTH, 1);
cal.set(Calendar.DAY_OF_MONTH, 1);
cal.add(Calendar.DAY_OF_MONTH, -1);

return cal.getTime();
}

/**
* get last day of month
* get first day of week
*
* @param date date
* @return get last day of month
* @param date date
* @return first day of week
*/
public static Date getLastDayOfMonth(Date date) {
public static LocalDate getWeekStart(LocalDate date) {
return date.with(TemporalAdjusters.previousOrSame(DayOfWeek.MONDAY));
}

/**
* get last day of week
* @param date
* @return
*/
public static LocalDate getWeekEnd(LocalDate date) {
LocalDate startOfWeek = date.with(TemporalAdjusters.previousOrSame(DayOfWeek.MONDAY));
return startOfWeek.with(TemporalAdjusters.nextOrSame(DayOfWeek.SUNDAY));
}

/**
* get the first day of the month
*
* @param date date
* @return the first day of the month
*/
public static LocalDate getMonthStart(LocalDate date) {
return date.withDayOfMonth(1);
}

/**
* get the last day of the month
*
* @param date date
* @return the last day of the month
*/
public static LocalDate getMonthEnd(LocalDate date) {
return date.with(TemporalAdjusters.lastDayOfMonth());
}

/**
* get the first time of the day
*
* @param date date
* @return the first day of the year
*/
public static LocalDateTime getStartOfDay(LocalDate date) {
return date.atStartOfDay();
}

/**
* get the last time of the day
*
* @param date date
* @return the last day of the year
*/
public static LocalDateTime getEndOfDay(LocalDate date) {
return date.atTime(LocalTime.MAX);
}

/**
* get the first time of the day after n days
*
* @param date date
* @param n n
* @return the first day of the year after n days
*/
public static LocalDateTime getEndOfDayAfterNDays(LocalDate date, int n) {
return date.plusDays(n).atTime(LocalTime.MAX);
}

/**
* get some hour of day
*
* @param date date
* @param hours hours
* @return some hour of day
* */
public static Date getSomeHourOfDay(Date date, int hours) {
Calendar cal = Calendar.getInstance();

cal.setTime(date);

cal.add(Calendar.MONTH, 1);
cal.set(Calendar.DAY_OF_MONTH, 1);
cal.add(Calendar.DAY_OF_MONTH, -1);
cal.set(Calendar.HOUR_OF_DAY, cal.get(Calendar.HOUR_OF_DAY) - hours);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);

return cal.getTime();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,36 +39,36 @@ public String columnNotMatchRegex() {
public String dailyAvg(String uniqueKey) {
return "SELECT ROUND(AVG(actual_value), 2) AS expected_value_" + uniqueKey +
" FROM md_dv_actual_values" +
" WHERE data_time >= TIMESTAMP ${data_time}" +
" AND data_time < TIMESTAMP ${data_time} + INTERVAL '1' DAY" +
" WHERE data_time >= '${day_start_time}'" +
" AND data_time < '${day_end_time}'" +
" AND unique_code = ${unique_code}";
}

@Override
public String last7DayAvg(String uniqueKey) {
return "select round(avg(actual_value),2) as expected_value_" + uniqueKey +
" from dv_actual_values where data_time >= TIMESTAMP ${data_time} + INTERVAL '-7' DAY" +
" and data_time < TIMESTAMP ${data_time} + INTERVAL '1' DAY and unique_code = ${unique_code}";
" from md_dv_actual_values where data_time >= '${day_start_time}'" +
" and data_time < '${day_after_7_end_time}' and unique_code = ${unique_code}";
}

@Override
public String last30DayAvg(String uniqueKey) {
return "select round(avg(actual_value),2) as expected_value_" + uniqueKey +
" from dv_actual_values where data_time >= TIMESTAMP ${data_time} + INTERVAL '-30' DAY" +
" and data_time < TIMESTAMP ${data_time} + INTERVAL '1' DAY and unique_code = ${unique_code}";
" from md_dv_actual_values where data_time >= '${day_start_time}'" +
" and data_time < '${day_after_30_end_time}' and unique_code = ${unique_code}";
}

@Override
public String monthlyAvg(String uniqueKey) {
return "select round(avg(actual_value),2) as expected_value_" + uniqueKey +
" from dv_actual_values where data_time >= DATE_TRUNC('MONTH', TIMESTAMP ${data_time})" +
" and data_time < DATE_TRUNC('MONTH', TIMESTAMP ${data_time}) + INTERVAL '1' MONTH - INTERVAL '1' DAY and unique_code = ${unique_code}";
" from md_dv_actual_values where data_time >= '${month_start_day} 00:00:00'" +
" and data_time <= '${month_end_day} 23:59:59' and unique_code = ${unique_code}";
}

@Override
public String weeklyAvg(String uniqueKey) {
return "select round(avg(actual_value),2) as expected_value_" + uniqueKey +
" from dv_actual_values where data_time >= DATE_TRUNC('WEEK', TIMESTAMP ${data_time})" +
" and data_time < DATE_TRUNC('WEEK', TIMESTAMP ${data_time}) + INTERVAL '1' WEEK - INTERVAL '1' DAY and unique_code = ${unique_code}";
" from md_dv_actual_values where data_time >= '${week_start_day} 00:00:00'"+
" and data_time <= '${week_end_day} 23:59:59' and unique_code = ${unique_code}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@
import io.datavines.common.entity.*;
import io.datavines.common.entity.job.BaseJobParameter;
import io.datavines.common.exception.DataVinesException;
import io.datavines.common.utils.CommonPropertyUtils;
import io.datavines.common.utils.JSONUtils;
import io.datavines.common.utils.ParameterUtils;
import io.datavines.common.utils.StringUtils;
import io.datavines.common.utils.placeholder.PlaceholderUtils;
import io.datavines.common.utils.*;
import io.datavines.connector.api.ConnectorFactory;
import io.datavines.metric.api.ExpectedValue;
import io.datavines.metric.api.SqlMetric;
Expand All @@ -35,12 +31,15 @@
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections4.CollectionUtils;

import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static io.datavines.common.CommonConstants.*;
import static io.datavines.common.ConfigConstants.*;
import static io.datavines.common.ConfigConstants.TABLE;
import static io.datavines.engine.config.MetricParserUtils.generateUniqueCode;

public abstract class BaseJobConfigurationBuilder implements JobConfigurationBuilder {
Expand All @@ -58,6 +57,15 @@ public abstract class BaseJobConfigurationBuilder implements JobConfigurationBui
@Override
public void init(Map<String, String> inputParameter, JobExecutionInfo jobExecutionInfo) {
this.inputParameter = inputParameter;
LocalDate nowDate = LocalDate.now();
this.inputParameter.put(WEEK_START_DAY, DateUtils.format(DateUtils.getWeekStart(nowDate), DateUtils.YYYY_MM_DD));
this.inputParameter.put(WEEK_END_DAY, DateUtils.format(DateUtils.getWeekEnd(nowDate), DateUtils.YYYY_MM_DD));
this.inputParameter.put(MONTH_START_DAY, DateUtils.format(DateUtils.getMonthStart(nowDate), DateUtils.YYYY_MM_DD));
this.inputParameter.put(MONTH_END_DAY, DateUtils.format(DateUtils.getMonthEnd(nowDate), DateUtils.YYYY_MM_DD));
this.inputParameter.put(DAY_START_TIME, DateUtils.format(DateUtils.getStartOfDay(nowDate), DateUtils.YYYY_MM_DD_HH_MM_SS));
this.inputParameter.put(DAY_END_TIME, DateUtils.format(DateUtils.getEndOfDay(nowDate), DateUtils.YYYY_MM_DD_HH_MM_SS));
this.inputParameter.put(DAY_AFTER_7_END_TIME, DateUtils.format(DateUtils.getEndOfDayAfterNDays(nowDate,7), DateUtils.YYYY_MM_DD_HH_MM_SS));
this.inputParameter.put(DAY_AFTER_30_END_TIME, DateUtils.format(DateUtils.getEndOfDayAfterNDays(nowDate,30), DateUtils.YYYY_MM_DD_HH_MM_SS));
this.inputParameter.put(COLUMN, "");
this.jobExecutionInfo = jobExecutionInfo;
this.jobExecutionParameter = jobExecutionInfo.getJobExecutionParameter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ const Index = ({ form, detail }: InnerProps) => {
<Radio.Group>
{/*<Radio value="local">{intl.formatMessage({ id: 'dv_flink_deploy_mode_local' })}</Radio>*/}
<Radio value="yarn-session">{intl.formatMessage({ id: 'dv_flink_deploy_mode_yarn_session' })}</Radio>
<Radio value="yarn-per-job">{intl.formatMessage({ id: 'dv_flink_deploy_mode_yarn_per_job' })}</Radio>
{/*<Radio value="yarn-per-job">{intl.formatMessage({ id: 'dv_flink_deploy_mode_yarn_per_job' })}</Radio>*/}
<Radio value="yarn-application">{intl.formatMessage({ id: 'dv_flink_deploy_mode_yarn_application' })}</Radio>
</Radio.Group>
</Form.Item>
Expand Down
16 changes: 8 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -852,14 +852,14 @@
</build>

<repositories>
<!-- <repository>-->
<!-- <id>public</id>-->
<!-- <name>aliyun nexus</name>-->
<!-- <url>http://maven.aliyun.com/nexus/content/groups/public/</url>-->
<!-- <releases>-->
<!-- <enabled>true</enabled>-->
<!-- </releases>-->
<!-- </repository>-->
<repository>
<id>public</id>
<name>aliyun nexus</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
</repository>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
Expand Down

0 comments on commit bab98b6

Please sign in to comment.