Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler): add support for scheduler #26

Open
wants to merge 87 commits into
base: master
Choose a base branch
from
Open

Conversation

andylau-55
Copy link
Collaborator

init Scheduler Model
init Scheduler Api and impl
init Scheduler Task and sync/async impl
init Scheduler Metadata and Local impl
init Scheduler Handler and impl
init Scheduler Translate and impl
Scheduler engine impl

public static final String LT = "lt";

/** merge two bean by discovering differences */
public static <M> M merge(M dest, M orig) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个方法不是SchedulerUtils?而应该放在BeanUtils呢?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

调度的本地查询merge逻辑,只有为目标值空才会覆盖,不适合放到BeanUtils

descriptor.getWriteMethod().invoke(dest, originalValue);
}
} catch (Exception e) {
log.error("merge bean exception", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个merge失败要抛异常的,不然异常吞了上游要得到非预期结果

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因为入参都是同一类型M,这里反射不会抛出异常。

}

/** Limit remark. sub String To Length */
public static String setRemarkLimit(String oldRemark, StringBuffer appendRemark) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个方法命名感觉也不太对。太定制化了是不是不需要放在common/util?直接scheduler包那里使用就ok了

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

静态的工具方法统一放在SchedulerUtils.java类中,供Scheduler统一调用是合理的吧

}

/** content compare key */
public static boolean compare(Object content, Object key, String type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个方法太定制化了,不像是common/util呢。key/content有可能是 Date或者String类型。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduler用于对象属性比较的,提供给Scheduler的本地查询用,所以统一放在SchedulerUtils中

}

/** get local ips */
public static List<String> getLocalIps() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个方法NetworkAddressUtils里面已经有了

}
executeInstance(instance, runningTasks);
} catch (Exception e) {
log.error("execute instance error id:", id, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

日志打印没有占位符

if (instances.containsKey(type)) {
return instances.get(type);
}
ThreadPoolExecutor instanceExecutor =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

直接使用ThreadUtils里面的方法创建线程池,里面方法不满足的直接改。别再自己手动创建线程池了。

import org.springframework.stereotype.Component;

/** Scheduler Common Value */
@Component
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

类名是否改为SchedulerConfig更好?另外往上挪一级吧。放在common里面有点奇怪

/** Scheduler Common Service */
@Service
@Slf4j
public class SchedulerCommonService {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

叫SchedulerCommonService太奇怪了。其实也不是通用服务,这里东西有些是状态相关,有些又是调度实例相关

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的方法是对各模块复用的方法进行的抽取 统一放到了一个地方,Scheduler各个地方能进行复用


/** Scheduler Job Service implementation class: Add, delete, update, and query Jobs */
@Service
public class LocalSchedulerJobServiceImpl implements SchedulerJobService {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里也需要有个工厂,根据不同的metadata生成不同的Local/Db等service。而不应该直接打@service注解

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已经修改为@ConditionalOnProperty的实现

* or implied.
*/

/** Alipay.com Inc. Copyright (c) 2004-2021 All Rights Reserved. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里还需要去掉

private Date estimateFinishTime;

/** remark */
private String remark;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remark这个单词有点费解?是不是就是log呢?

Copy link
Contributor

@baifuyu baifuyu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@OpenSPG OpenSPG deleted a comment from hotian Dec 26, 2023
/** Scheduler Service:submit,execute,delete and other scheduler interfaces */
public interface SchedulerService {
/** submit job */
SchedulerJob submitJob(SchedulerJob job);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments cannot be the same as function name

List<SchedulerInstance> searchInstances(SchedulerInstance query);

/** search Tasks */
List<SchedulerTask> searchTasks(SchedulerTask query);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the same to the above

import org.springframework.stereotype.Service;
import org.springframework.util.Assert;

/** Scheduler Service implementation class */
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

such comments are meaningless.

/** set Instance To Finish */
public void setInstanceFinish(
SchedulerInstance instance, InstanceStatus instanceStatus, TaskStatus taskStatus) {
Long finish = 100L;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid the devil number

updateInstance.setFinishTime(finishTime);
Long updateNum = schedulerInstanceService.update(updateInstance);
Assert.isTrue(updateNum > 0, "update instance failed " + updateInstance);
stopRunningTasks(instance);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a blank line between line 66 and 67

AsyncTaskExecute jobAsyncTask = (AsyncTaskExecute) jobTask;
TaskExecuteContext context = new TaskExecuteContext(job, instance, task);
jobAsyncTask.stop(context, task.getResource());
} else {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add continue after line 87 can reduce 1 level code indentation.

private void checkInstanceRunning(SchedulerJob job) {
SchedulerInstance query = new SchedulerInstance();
query.setJobId(job.getId());
query.setStartCreateTime(DateUtils.addDays(new Date(), -1));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add blank line between line 98 and 99

if (jobId.equals(instance.getJobId())) {
instanceList.add(instance.getId());
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add blank line between line 66 and 67

}

/** Merge instance pre-check */
public TaskStatus processByMerge(TaskExecuteContext context) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment does not have any valid information

}
}

/** lock task */
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments cannot be the same with function name.

return true;
}

/** unlock task */
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same to the above

nextNodes.forEach(node -> startNextNode(context, instance.getTaskDag(), node));
}

/** start next node */
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments cannot be the same with function name

}

/** check all nodes is finished */
private boolean checkAllNodesFinished(SchedulerTask task, List<TaskExecuteDag.Node> nodes) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same to the above

return true;
}

/** set instance to finished */
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same to the above

String submit(TaskExecuteContext context);

/** get task Status */
TaskStatus getStatus(TaskExecuteContext context, String resource);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same to the above

/**
* step 1: create Period Job to submit step 2: query Jobs and Instances step 3: offline Job step
* 4: online Job step 5: execute Job step 6: trigger first Instance until it ends step 7: trigger
* second Instance until it ends step 8: get tasks step 9: delete Job
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

step 2 - 9 start on a separate line

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code formatting will automatically indent

SchedulerJob job = new SchedulerJob();
job.setProjectId(0L);
job.setName("Test Period Job");
job.setCreateUser("test");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why CreateUser default value is test?

/**
* step 1: create RealTime Job to submit step 2: query Jobs and Instances step 3: offline Job step
* 4: online Job step 5: trigger Instance step 6: get tasks step 7: delete Job;
*/
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

step 2 - 7 start on a separate line

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code formatting will automatically indent

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has checked 1594 files.

Valid Invalid Ignored Fixed
1529 39 26 0
Click to see the invalid file list
  • server/common/model/src/main/java/com/antgroup/openspg/server/common/model/exception/SchedulerException.java
  • server/common/model/src/main/java/com/antgroup/openspg/server/common/model/scheduler/SchedulerEnum.java
  • server/core/scheduler/model/pom.xml
  • server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/service/SchedulerInstance.java
  • server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/service/SchedulerJob.java
  • server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/service/SchedulerTask.java
  • server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/task/TaskExecuteContext.java
  • server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/task/TaskExecuteDag.java
  • server/core/scheduler/service/pom.xml
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/api/SchedulerService.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/api/impl/SchedulerServiceImpl.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/common/SchedulerCommonService.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/config/SchedulerConfig.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/engine/SchedulerExecuteService.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/engine/impl/SchedulerExecuteServiceImpl.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/handler/SchedulerHandler.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/handler/impl/local/LocalSchedulerHandler.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/SchedulerInstanceService.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/SchedulerJobService.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/SchedulerTaskService.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/impl/local/LocalSchedulerInstanceServiceImpl.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/impl/local/LocalSchedulerJobServiceImpl.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/impl/local/LocalSchedulerTaskServiceImpl.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/TaskExecute.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/TaskExecuteTemplate.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/AsyncTaskExecute.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/AsyncTaskExecuteTemplate.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/SyncTaskExecute.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/SyncTaskExecuteTemplate.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/translate/Translate.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/translate/TranslatorFactory.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/utils/SchedulerUtils.java
  • server/test/src/test/java/com/antgroup/openspg/test/kgschema/SPGSchemaFacadeTest.groovy
  • server/test/src/test/java/com/antgroup/openspg/test/scheduler/SchedulerServiceImplTest.java
  • server/test/src/test/java/com/antgroup/openspg/test/scheduler/task/LocalExampleAsyncTaskMock.java
  • server/test/src/test/java/com/antgroup/openspg/test/scheduler/task/LocalExampleSyncTaskMock.java
  • server/test/src/test/java/com/antgroup/openspg/test/scheduler/translate/LocalExampleTranslateMock.java
  • server/test/src/test/resources/config/application.properties
  • server/test/src/test/resources/spring/spring-common.xml
Use this command to fix any missing license headers
```bash

docker run -it --rm -v $(pwd):/github/workspace apache/skywalking-eyes header fix

</details>

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has checked 1594 files.

Valid Invalid Ignored Fixed
1529 39 26 0
Click to see the invalid file list
  • server/common/model/src/main/java/com/antgroup/openspg/server/common/model/exception/SchedulerException.java
  • server/common/model/src/main/java/com/antgroup/openspg/server/common/model/scheduler/SchedulerEnum.java
  • server/core/scheduler/model/pom.xml
  • server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/service/SchedulerInstance.java
  • server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/service/SchedulerJob.java
  • server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/service/SchedulerTask.java
  • server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/task/TaskExecuteContext.java
  • server/core/scheduler/model/src/main/java/com/antgroup/openspg/server/core/scheduler/model/task/TaskExecuteDag.java
  • server/core/scheduler/service/pom.xml
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/api/SchedulerService.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/api/impl/SchedulerServiceImpl.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/common/SchedulerCommonService.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/config/SchedulerConfig.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/engine/SchedulerExecuteService.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/engine/impl/SchedulerExecuteServiceImpl.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/handler/SchedulerHandler.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/handler/impl/local/LocalSchedulerHandler.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/SchedulerInstanceService.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/SchedulerJobService.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/SchedulerTaskService.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/impl/local/LocalSchedulerInstanceServiceImpl.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/impl/local/LocalSchedulerJobServiceImpl.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/metadata/impl/local/LocalSchedulerTaskServiceImpl.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/TaskExecute.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/TaskExecuteTemplate.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/AsyncTaskExecute.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/AsyncTaskExecuteTemplate.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/SyncTaskExecute.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/SyncTaskExecuteTemplate.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/translate/Translate.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/translate/TranslatorFactory.java
  • server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/utils/SchedulerUtils.java
  • server/test/src/test/java/com/antgroup/openspg/test/kgschema/SPGSchemaFacadeTest.groovy
  • server/test/src/test/java/com/antgroup/openspg/test/scheduler/SchedulerServiceImplTest.java
  • server/test/src/test/java/com/antgroup/openspg/test/scheduler/task/LocalExampleAsyncTaskMock.java
  • server/test/src/test/java/com/antgroup/openspg/test/scheduler/task/LocalExampleSyncTaskMock.java
  • server/test/src/test/java/com/antgroup/openspg/test/scheduler/translate/LocalExampleTranslateMock.java
  • server/test/src/test/resources/config/application.properties
  • server/test/src/test/resources/spring/spring-common.xml
Use this command to fix any missing license headers
```bash

docker run -it --rm -v $(pwd):/github/workspace apache/skywalking-eyes header fix

</details>

Copy link
Collaborator

@caszkgui caszkgui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants