Skip to content

Commit

Permalink
[SYNCOPE-1801] Replacing Quartz by Spring scheduling features (#639)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilgrosso authored Mar 5, 2024
1 parent bea3415 commit dc64548
Show file tree
Hide file tree
Showing 170 changed files with 1,333 additions and 3,439 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
Expand Down Expand Up @@ -64,6 +62,7 @@
import org.apache.wicket.request.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.util.CollectionUtils;

Expand Down Expand Up @@ -104,7 +103,7 @@ public static SyncopeConsoleSession get() {

protected final Map<Class<?>, Object> services = Collections.synchronizedMap(new HashMap<>());

protected final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
protected final SimpleAsyncTaskExecutor executor;

protected String domain;

Expand Down Expand Up @@ -132,6 +131,9 @@ public SyncopeConsoleSession(final Request request) {
super(request);

clientFactory = SyncopeWebApplication.get().newClientFactory();

executor = new SimpleAsyncTaskExecutor();
executor.setVirtualThreads(true);
}

protected String message(final SyncopeClientException sce) {
Expand Down Expand Up @@ -273,7 +275,6 @@ public void invalidate() {
}
cleanup();
}
executor.shutdown();
super.invalidate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import org.apache.syncope.common.lib.to.OrgUnit
import org.apache.syncope.common.lib.to.Provision
import org.apache.syncope.common.lib.to.ProvisioningReport
import org.apache.syncope.core.persistence.api.entity.task.ProvisioningTask
import org.apache.syncope.core.provisioning.api.job.JobExecutionException
import org.apache.syncope.core.provisioning.api.pushpull.IgnoreProvisionException
import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningActions
import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile
import org.apache.syncope.core.provisioning.api.pushpull.PullActions
import org.identityconnectors.framework.common.objects.SyncDelta
import org.quartz.JobExecutionException

@CompileStatic
class MyPullActions implements PullActions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import groovy.transform.CompileStatic
import java.util.Set
import org.apache.syncope.common.lib.to.ProvisioningReport
import org.apache.syncope.core.persistence.api.entity.Entity
import org.apache.syncope.core.provisioning.api.job.JobExecutionException
import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile
import org.apache.syncope.core.provisioning.api.pushpull.PushActions
import org.quartz.JobExecutionException

@CompileStatic
class MyPushActions implements PushActions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,12 @@
*/
import groovy.transform.CompileStatic
import org.apache.syncope.common.lib.report.ReportConf
import org.apache.syncope.core.provisioning.api.job.JobExecutionContext
import org.apache.syncope.core.provisioning.api.job.JobExecutionException
import org.apache.syncope.core.provisioning.api.job.report.ReportJobDelegate
import org.quartz.JobExecutionContext
import org.quartz.JobExecutionException

@CompileStatic
class MyReportJobDelegate implements ReportJobDelegate {

void interrupt() {
}

boolean isInterrupted() {
return false
}

void setConf(ReportConf conf) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@
*/
import groovy.transform.CompileStatic
import org.apache.syncope.common.lib.types.TaskType
import org.apache.syncope.core.provisioning.api.job.JobExecutionContext
import org.apache.syncope.core.provisioning.api.job.JobExecutionException
import org.apache.syncope.core.provisioning.api.job.SchedTaskJobDelegate
import org.quartz.JobExecutionContext
import org.quartz.JobExecutionException

@CompileStatic
class MySchedTaskJobDelegate implements SchedTaskJobDelegate {

void interrupt() {
}

boolean isInterrupted() {
return false
}

@Override
void execute(TaskType taskType, String taskKey, boolean dryRun, JobExecutionContext context)
throws JobExecutionException {
Expand Down
2 changes: 1 addition & 1 deletion common/idrepo/lib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ under the License.

<dependency>
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-annotations</artifactId>
<artifactId>swagger-annotations-jakarta</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ under the License.
body="org.apache.syncope.core.provisioning.java.pushpull.PullJobDelegate"/>
<Implementation id="PushJobDelegate" type="TASKJOB_DELEGATE" engine="JAVA"
body="org.apache.syncope.core.provisioning.java.pushpull.PushJobDelegate"/>
<Implementation id="GroupMemberProvisionTaskJobDelegate" type="TASKJOB_DELEGATE" engine="JAVA"
body="org.apache.syncope.core.provisioning.java.job.GroupMemberProvisionTaskJobDelegate"/>

<Implementation id="ExpiredAccessTokenCleanup" type="TASKJOB_DELEGATE" engine="JAVA"
body="org.apache.syncope.core.provisioning.java.job.ExpiredAccessTokenCleanup"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ under the License.
body="org.apache.syncope.core.provisioning.java.pushpull.PullJobDelegate"/>
<Implementation id="PushJobDelegate" type="TASKJOB_DELEGATE" engine="JAVA"
body="org.apache.syncope.core.provisioning.java.pushpull.PushJobDelegate"/>
<Implementation id="GroupMemberProvisionTaskJobDelegate" type="TASKJOB_DELEGATE" engine="JAVA"
body="org.apache.syncope.core.provisioning.java.job.GroupMemberProvisionTaskJobDelegate"/>

<Implementation id="ExpiredAccessTokenCleanup" type="TASKJOB_DELEGATE" engine="JAVA"
body="org.apache.syncope.core.provisioning.java.job.ExpiredAccessTokenCleanup"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.syncope.core.provisioning.api.ConnectorManager;
import org.apache.syncope.core.provisioning.api.MappingManager;
import org.apache.syncope.core.provisioning.api.VirAttrHandler;
import org.apache.syncope.core.provisioning.api.job.JobExecutionException;
import org.apache.syncope.core.provisioning.api.pushpull.ConstantReconFilterBuilder;
import org.apache.syncope.core.provisioning.api.pushpull.KeyValueReconFilterBuilder;
import org.apache.syncope.core.provisioning.api.pushpull.ReconFilterBuilder;
Expand Down Expand Up @@ -104,7 +105,6 @@
import org.identityconnectors.framework.common.objects.Uid;
import org.identityconnectors.framework.common.objects.filter.Filter;
import org.identityconnectors.framework.spi.SearchResultsHandler;
import org.quartz.JobExecutionException;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.security.access.prepost.PreAuthorize;
Expand Down
6 changes: 5 additions & 1 deletion core/idrepo/logic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ under the License.
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>

<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@
import org.apache.syncope.common.rest.api.batch.BatchResponseItem;
import org.apache.syncope.core.persistence.api.dao.JobStatusDAO;
import org.apache.syncope.core.provisioning.api.job.JobManager;
import org.apache.syncope.core.provisioning.java.job.SyncopeTaskScheduler;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

public abstract class AbstractExecutableLogic<T extends EntityTO> extends AbstractJobLogic<T> {

public AbstractExecutableLogic(
final JobManager jobManager,
final SchedulerFactoryBean scheduler,
final SyncopeTaskScheduler scheduler,
final JobStatusDAO jobStatusDAO) {

super(jobManager, scheduler, jobStatusDAO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,142 +18,113 @@
*/
package org.apache.syncope.core.logic;

import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.syncope.common.lib.to.EntityTO;
import org.apache.syncope.common.lib.to.JobTO;
import org.apache.syncope.common.lib.types.JobAction;
import org.apache.syncope.common.lib.types.JobType;
import org.apache.syncope.core.persistence.api.dao.JobStatusDAO;
import org.apache.syncope.core.persistence.api.entity.JobStatus;
import org.apache.syncope.core.persistence.api.utils.FormatUtils;
import org.apache.syncope.core.provisioning.api.job.JobManager;
import org.apache.syncope.core.provisioning.java.job.SyncopeTaskScheduler;
import org.apache.syncope.core.provisioning.java.job.SystemLoadReporterJob;
import org.apache.syncope.core.provisioning.java.job.TaskJob;
import org.apache.syncope.core.provisioning.java.job.notification.NotificationJob;
import org.apache.syncope.core.provisioning.java.job.report.ReportJob;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.apache.syncope.core.spring.security.AuthContextUtils;

abstract class AbstractJobLogic<T extends EntityTO> extends AbstractTransactionalLogic<T> {

protected final JobManager jobManager;

protected final SchedulerFactoryBean scheduler;
protected final SyncopeTaskScheduler scheduler;

protected final JobStatusDAO jobStatusDAO;

protected AbstractJobLogic(
final JobManager jobManager,
final SchedulerFactoryBean scheduler,
final SyncopeTaskScheduler scheduler,
final JobStatusDAO jobStatusDAO) {

this.jobManager = jobManager;
this.scheduler = scheduler;
this.jobStatusDAO = jobStatusDAO;
}

protected abstract Triple<JobType, String, String> getReference(JobKey jobKey);
protected abstract Triple<JobType, String, String> getReference(String jobName);

protected JobTO getJobTO(final JobKey jobKey, final boolean includeCustom) throws SchedulerException {
protected Optional<JobTO> getJobTO(final String jobName, final boolean includeCustom) {
JobTO jobTO = null;

if (scheduler.getScheduler().checkExists(jobKey)) {
Triple<JobType, String, String> reference = getReference(jobKey);
if (scheduler.contains(AuthContextUtils.getDomain(), jobName)) {
Triple<JobType, String, String> reference = getReference(jobName);
if (reference != null) {
jobTO = new JobTO();
jobTO.setType(reference.getLeft());
jobTO.setRefKey(reference.getMiddle());
jobTO.setRefDesc(reference.getRight());
} else if (includeCustom) {
JobDetail jobDetail = scheduler.getScheduler().getJobDetail(jobKey);
if (!TaskJob.class.isAssignableFrom(jobDetail.getJobClass())
&& !ReportJob.class.isAssignableFrom(jobDetail.getJobClass())
&& !SystemLoadReporterJob.class.isAssignableFrom(jobDetail.getJobClass())
&& !NotificationJob.class.isAssignableFrom(jobDetail.getJobClass())) {

Optional<Class<?>> jobClass = scheduler.getJobClass(AuthContextUtils.getDomain(), jobName).
filter(jc -> !TaskJob.class.isAssignableFrom(jc)
&& !ReportJob.class.isAssignableFrom(jc)
&& !SystemLoadReporterJob.class.isAssignableFrom(jc)
&& !NotificationJob.class.isAssignableFrom(jc));
if (jobClass.isPresent()) {
jobTO = new JobTO();
jobTO.setType(JobType.CUSTOM);
jobTO.setRefKey(jobKey.getName());
jobTO.setRefDesc(jobDetail.getJobClass().getName());
jobTO.setRefKey(jobName);
jobTO.setRefDesc(jobClass.get().getName());
}
}

if (jobTO != null) {
List<? extends Trigger> jobTriggers = scheduler.getScheduler().getTriggersOfJob(jobKey);
if (jobTriggers.isEmpty()) {
jobTO.setScheduled(false);
} else {
Optional<OffsetDateTime> nextTrigger = scheduler.getNextTrigger(AuthContextUtils.getDomain(), jobName);
if (nextTrigger.isPresent()) {
jobTO.setScheduled(true);
jobTO.setStart(jobTriggers.get(0).getStartTime().toInstant().atOffset(FormatUtils.DEFAULT_OFFSET));
jobTO.setStart(nextTrigger.get());
} else {
jobTO.setScheduled(false);
}

jobTO.setRunning(jobManager.isRunning(jobKey));

jobTO.setStatus("UNKNOWN");
if (jobTO.isRunning()) {
try {
jobTO.setStatus(jobStatusDAO.findById(jobTO.getRefDesc()).
map(JobStatus::getStatus).
orElse(jobTO.getStatus()));
} catch (NoSuchBeanDefinitionException e) {
LOG.warn("Could not find job {} implementation", jobKey, e);
}
}
jobTO.setRunning(jobManager.isRunning(jobName));

jobTO.setStatus(jobStatusDAO.findById(jobName).map(JobStatus::getStatus).orElse("UNKNOWN"));
}
}

return jobTO;
return Optional.ofNullable(jobTO);
}

protected List<JobTO> doListJobs(final boolean includeCustom) {
List<JobTO> jobTOs = new ArrayList<>();
try {
for (JobKey jobKey : scheduler.getScheduler().
getJobKeys(GroupMatcher.jobGroupEquals(Scheduler.DEFAULT_GROUP))) {

JobTO jobTO = getJobTO(jobKey, includeCustom);
if (jobTO != null) {
jobTOs.add(jobTO);
}
}
} catch (SchedulerException e) {
LOG.debug("Problems while retrieving scheduled jobs", e);
for (String jobName : scheduler.getJobNames(AuthContextUtils.getDomain())) {
getJobTO(jobName, includeCustom).ifPresent(jobTOs::add);
}

return jobTOs;
}

protected void doActionJob(final JobKey jobKey, final JobAction action) {
try {
if (scheduler.getScheduler().checkExists(jobKey)) {
switch (action) {
case START:
scheduler.getScheduler().triggerJob(jobKey);
break;
protected void doActionJob(final String jobName, final JobAction action) {
if (scheduler.contains(AuthContextUtils.getDomain(), jobName)) {
switch (action) {
case START ->
scheduler.start(AuthContextUtils.getDomain(), jobName);

case STOP:
scheduler.getScheduler().interrupt(jobKey);
break;
case STOP ->
scheduler.cancel(AuthContextUtils.getDomain(), jobName);

case DELETE:
scheduler.getScheduler().deleteJob(jobKey);
break;
case DELETE ->
scheduler.delete(AuthContextUtils.getDomain(), jobName);

default:
default -> {
}
} else {
LOG.warn("Could not find job {}", jobKey);
}
} catch (SchedulerException e) {
LOG.debug("Problems during {} operation on job {}", action.toString(), jobKey, e);
} else {
LOG.warn("Could not find job {}", jobName);
}
}
}
Loading

0 comments on commit dc64548

Please sign in to comment.