From 16e38c57c424d0842746cdaeb83bd4da2babb2fc Mon Sep 17 00:00:00 2001 From: musketyr Date: Wed, 11 Dec 2024 17:18:01 +0100 Subject: [PATCH] composable annotations --- .../agorapulse/worker/JobConfiguration.java | 2 + .../DefaultJobConfiguration.java | 6 +- .../worker/convention/QueueConsumer.java | 4 +- .../worker/processor/MethodJobProcessor.java | 236 +++++++++++------- .../QueueListenerAndProducerSpec.groovy | 70 ++++++ 5 files changed, 231 insertions(+), 87 deletions(-) diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobConfiguration.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobConfiguration.java index 826e66ad..fe911ba1 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobConfiguration.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobConfiguration.java @@ -74,6 +74,8 @@ interface QueueConfiguration { interface ConsumerQueueConfiguration extends QueueConfiguration { int DEFAULT_MAX_MESSAGES = 10; + Duration DEFAULT_WAITING_TIME = Duration.ofSeconds(20); + String DEFAULT_WAITING_TIME_STRING = "20s"; /** * @return the number of messages which are fetched from the queue in a single poll, defaults to {@link #DEFAULT_MAX_MESSAGES} when set to 0 diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/configuration/DefaultJobConfiguration.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/configuration/DefaultJobConfiguration.java index db72b3d7..36c9f33b 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/configuration/DefaultJobConfiguration.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/configuration/DefaultJobConfiguration.java @@ -119,7 +119,11 @@ public void mergeWith(ConsumerQueueConfiguration overrides) { this.maxMessages = overrides.getMaxMessages(); } - if (overrides.getWaitingTime() != null && !overrides.getWaitingTime().isZero() && overrides.getWaitingTime() != this.waitingTime) { + if (overrides.getWaitingTime() != null + && !overrides.getWaitingTime().isZero() + && !overrides.getWaitingTime().equals(this.waitingTime) + && !overrides.getWaitingTime().equals(DEFAULT_WAITING_TIME) + ) { this.waitingTime = overrides.getWaitingTime(); } } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/convention/QueueConsumer.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/convention/QueueConsumer.java index 0fbc31ac..88257e7b 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/convention/QueueConsumer.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/convention/QueueConsumer.java @@ -36,7 +36,7 @@ @Documented @Consumes @Fork(JobConfiguration.ConsumerQueueConfiguration.DEFAULT_MAX_MESSAGES) -@FixedRate("20s") +@FixedRate(JobConfiguration.ConsumerQueueConfiguration.DEFAULT_WAITING_TIME_STRING) @Retention(RUNTIME) @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) public @interface QueueConsumer { @@ -70,7 +70,7 @@ */ @AliasFor(annotation = Consumes.class, member = "waitingTime") @AliasFor(annotation = FixedRate.class, member = "value") - String waitingTime() default ""; + String waitingTime() default JobConfiguration.ConsumerQueueConfiguration.DEFAULT_WAITING_TIME_STRING; /** * The number of messages to consume and also the number of threads to use to consume the messages. diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/MethodJobProcessor.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/MethodJobProcessor.java index 249a3ec8..2912eb2e 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/MethodJobProcessor.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/MethodJobProcessor.java @@ -17,13 +17,9 @@ */ package com.agorapulse.worker.processor; -import com.agorapulse.worker.JobConfiguration; -import com.agorapulse.worker.JobConfigurationException; -import com.agorapulse.worker.JobManager; -import com.agorapulse.worker.JobScheduler; -import com.agorapulse.worker.WorkerConfiguration; -import com.agorapulse.worker.annotation.*; +import com.agorapulse.worker.*; import com.agorapulse.worker.annotation.Job; +import com.agorapulse.worker.annotation.*; import com.agorapulse.worker.configuration.DefaultJobConfiguration; import com.agorapulse.worker.configuration.MutableJobConfiguration; import com.agorapulse.worker.convention.QueueConsumer; @@ -32,23 +28,23 @@ import io.micronaut.context.BeanContext; import io.micronaut.context.env.Environment; import io.micronaut.context.processor.ExecutableMethodProcessor; -import io.micronaut.core.annotation.AnnotationValue; -import io.micronaut.core.naming.NameUtils; import io.micronaut.core.util.StringUtils; import io.micronaut.inject.BeanDefinition; import io.micronaut.inject.ExecutableMethod; import io.micronaut.inject.qualifiers.Qualifiers; import io.micronaut.runtime.ApplicationConfiguration; import io.micronaut.scheduling.TaskExceptionHandler; +import jakarta.inject.Singleton; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import jakarta.inject.Singleton; -import java.lang.annotation.Annotation; import java.time.Duration; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.function.BiFunction; +import java.util.function.Predicate; /** * A {@link ExecutableMethodProcessor} for the {@link Job} annotation. @@ -69,7 +65,94 @@ public class MethodJobProcessor implements ExecutableMethodProcessor { private static final String MEMBER_INITIAL_DELAY = "initialDelay"; private static final String MEMBER_CRON = "cron"; private static final String MEMBER_FIXED_DELAY = "fixedDelay"; + private static final String MEMBER_MAX_MESSAGES = "maxMessages"; + private static final String MEMBER_WAITING_TIME = "waitingTime"; private static final String MEMBER_SCHEDULER = "scheduler"; + private static final String MEMBER_TYPE = "type"; + private static final String MEMBER_VALUE = "value"; + private static final String MEMBER_NAME = "name"; + + private static final Map ANNOTATION_TO_NAME_VALUE_MAP = Map.of( + "jakarta.inject.Named", MEMBER_VALUE, + "javax.inject.Named", MEMBER_VALUE, + Job.class.getName(), MEMBER_VALUE, + QueueConsumer.class.getName(), MEMBER_NAME, + QueueProducer.class.getName(), MEMBER_NAME, + FixedRate.class.getName(), MEMBER_NAME, + InitialDelay.class.getName(), MEMBER_NAME, + FixedDelay.class.getName(), MEMBER_NAME, + Cron.class.getName(), MEMBER_NAME + ); + + private static final Map ANNOTATION_TO_CRON_MAP = Map.of( + Cron.class.getName(), MEMBER_VALUE, + QueueProducer.class.getName(), MEMBER_CRON, + Job.class.getName(), MEMBER_CRON + ); + + private static final Map ANNOTATION_TO_FIXED_RATE_MAP = Map.of( + FixedRate.class.getName(), MEMBER_VALUE, + QueueConsumer.class.getName(), MEMBER_WAITING_TIME, + QueueProducer.class.getName(), MEMBER_FIXED_RATE, + Job.class.getName(), MEMBER_FIXED_RATE + ); + + private static final Map ANNOTATION_TO_INITIAL_DELAY_MAP = Map.of( + InitialDelay.class.getName(), MEMBER_VALUE, + QueueProducer.class.getName(), MEMBER_INITIAL_DELAY, + Job.class.getName(), MEMBER_INITIAL_DELAY + ); + + private static final Map ANNOTATION_TO_FIXED_DELAY_MAP = Map.of( + FixedDelay.class.getName(), MEMBER_VALUE, + QueueProducer.class.getName(), MEMBER_FIXED_DELAY, + Job.class.getName(), MEMBER_FIXED_DELAY + ); + + private static final Map ANNOTATION_TO_SCHEDULER_MAP = Map.of( + Job.class.getName(), MEMBER_SCHEDULER, + Cron.class.getName(), MEMBER_SCHEDULER, + FixedRate.class.getName(), MEMBER_SCHEDULER, + InitialDelay.class.getName(), MEMBER_SCHEDULER, + FixedDelay.class.getName(), MEMBER_SCHEDULER, + QueueConsumer.class.getName(), MEMBER_SCHEDULER, + QueueProducer.class.getName(), MEMBER_SCHEDULER + ); + + private static final Map ANNOTATION_TO_FORK_MAP = Map.of( + Fork.class.getName(), MEMBER_VALUE, + QueueConsumer.class.getName(), MEMBER_MAX_MESSAGES + ); + + private static final Map ANNOTATION_TO_MAX_MESSAGES_MAP = Map.of( + Consumes.class.getName(), MEMBER_MAX_MESSAGES, + QueueConsumer.class.getName(), MEMBER_MAX_MESSAGES + ); + + private static final Map ANNOTATION_TO_CONSUMER_TYPE_MAP = Map.of( + Consumes.class.getName(), MEMBER_TYPE, + QueueConsumer.class.getName(), MEMBER_TYPE + ); + + public static final Map ANNOTATION_TO_PRODUCER_TYPE_MAP = Map.of( + Produces.class.getName(), MEMBER_TYPE, + QueueProducer.class.getName(), MEMBER_TYPE + ); + + private static final Map ANNOTATION_TO_WAITING_TIME_MAP = Map.of( + Consumes.class.getName(), MEMBER_WAITING_TIME, + QueueConsumer.class.getName(), MEMBER_WAITING_TIME + ); + + private static final Map ANNOTATION_TO_CONSUMER_QUEUE_NAME_MAP = Map.of( + Consumes.class.getName(), MEMBER_VALUE, + QueueConsumer.class.getName(), MEMBER_VALUE + ); + + private static final Map ANNOTATION_TO_PRODUCER_QUEUE_NAME_MAP = Map.of( + Produces.class.getName(), MEMBER_VALUE, + QueueProducer.class.getName(), MEMBER_VALUE + ); private final BeanContext beanContext; private final TaskExceptionHandler taskExceptionHandler; @@ -116,51 +199,48 @@ public void process(BeanDefinition beanDefinition, ExecutableMethod met return; } - JobConfiguration configuration = getJobConfiguration(beanDefinition, method); - - com.agorapulse.worker.Job task = new MethodJob<>( - configuration, - method, - beanDefinition, - beanContext, - jobMethodInvoker, - taskExceptionHandler - ); + try { + JobConfiguration configuration = getJobConfiguration(beanDefinition, method); + + com.agorapulse.worker.Job task = new MethodJob<>( + configuration, + method, + beanDefinition, + beanContext, + jobMethodInvoker, + taskExceptionHandler + ); - jobManager.register(task); + jobManager.register(task); - if (!configuration.isEnabled()) { - LOG.info("Job {} is disabled in the configuration. Remove jobs.{}.enabled = false configuration to re-enable it.", configuration.getName(), configuration.getName()); - return; - } + if (!configuration.isEnabled()) { + LOG.info("Job {} is disabled in the configuration. Remove jobs.{}.enabled = false configuration to re-enable it.", configuration.getName(), configuration.getName()); + return; + } - try { jobScheduler.schedule(task); } catch (JobConfigurationException e) { - LOG.error("Job declared in method " + method + " is ignored because it is not correctly configured", e); + LOG.error("Job declared in method {} is ignored because it is not correctly configured", method, e); } } - private String getJobName(BeanDefinition beanDefinition, ExecutableMethod method) { - Optional valueFromJob = method.stringValue(Job.class).map(NameUtils::hyphenate); - - if (valueFromJob.isPresent()) { - return valueFromJob.get(); - } - - Optional valueFromJavaxNamed = method.stringValue("jakarta.inject.Named").map(NameUtils::hyphenate); - - if (valueFromJavaxNamed.isPresent()) { - return valueFromJavaxNamed.get(); - } + private Optional getFirstAnnotationValue(Map annotationTypeToValueMap, BiFunction> extractor, Predicate validator) { + return annotationTypeToValueMap + .entrySet() + .stream() + .map(e -> extractor.apply(e.getKey(), e.getValue()).filter(validator)) + .flatMap(Optional::stream) + .findFirst(); + } - Optional valueFromJakartaNamed = method.stringValue("jakarta.inject.Named").map(NameUtils::hyphenate); + private String getJobName(BeanDefinition beanDefinition, ExecutableMethod method) { + Optional maybeName = getFirstAnnotationValue(ANNOTATION_TO_NAME_VALUE_MAP, method::stringValue, StringUtils::isNotEmpty); - if (valueFromJakartaNamed.isPresent()) { - return valueFromJakartaNamed.get(); + if (maybeName.isPresent()) { + return maybeName.get(); } - // there are more then one job definition + // there are more than one job definition if (beanDefinition.getExecutableMethods().size() > 1) { return JobManager.getDefaultJobName(method.getDeclaringType(), method.getMethodName()); } @@ -176,25 +256,28 @@ private JobConfiguration getJobConfiguration(BeanDefinition beanDefinition, E DefaultJobConfiguration configuration = new DefaultJobConfiguration(jobName, workerConfiguration); - method.stringValue(Job.class, MEMBER_CRON).ifPresent(configuration::setCron); - method.stringValue(Job.class, MEMBER_FIXED_DELAY).ifPresent(fixedDelay -> configuration.setFixedDelay(convertDuration(jobName, fixedDelay, "fixed delay"))); - method.stringValue(Job.class, MEMBER_FIXED_RATE).ifPresent(fixedRate -> configuration.setFixedRate(convertDuration(jobName, fixedRate, "fixed rate"))); - method.stringValue(Job.class, MEMBER_INITIAL_DELAY).ifPresent(initialDelay -> configuration.setInitialDelay(convertDuration(jobName, initialDelay, "initial delay"))); - method.stringValue(Job.class, MEMBER_SCHEDULER).ifPresent(configuration::setScheduler); + getFirstAnnotationValue(ANNOTATION_TO_CRON_MAP, method::stringValue, StringUtils::isNotEmpty).ifPresent(configuration::setCron); + getFirstAnnotationValue(ANNOTATION_TO_FIXED_DELAY_MAP, method::stringValue, StringUtils::isNotEmpty).ifPresent(fixedDelay -> configuration.setFixedDelay(convertDuration(jobName, fixedDelay, "fixed delay"))); + getFirstAnnotationValue(ANNOTATION_TO_FIXED_RATE_MAP, method::stringValue, StringUtils::isNotEmpty).ifPresent(fixedRate -> configuration.setFixedRate(convertDuration(jobName, fixedRate, "fixed rate"))); + getFirstAnnotationValue(ANNOTATION_TO_INITIAL_DELAY_MAP, method::stringValue, StringUtils::isNotEmpty).ifPresent(initialDelay -> configuration.setInitialDelay(convertDuration(jobName, initialDelay, "initial delay"))); + getFirstAnnotationValue(ANNOTATION_TO_SCHEDULER_MAP, method::stringValue, StringUtils::isNotEmpty).ifPresent(configuration::setScheduler); boolean consumer = method.getArguments().length == 1; boolean producer = !method.getReturnType().getType().equals(void.class); if (method.hasAnnotation(QueueConsumer.class) && !consumer) { - throw new JobConfigurationException(com.agorapulse.worker.Job.create(configuration, () -> {}), "Method annotated with @QueueListener must have exactly one argument"); + throw new JobConfigurationException(com.agorapulse.worker.Job.create(configuration, () -> { + }), "Method annotated with @QueueListener must have exactly one argument"); } if (method.hasAnnotation(QueueProducer.class)) { if (!producer) { - throw new JobConfigurationException(com.agorapulse.worker.Job.create(configuration, () -> {}), "Method annotated with @QueueProducer must have a return type. Use reactive type to the best performance."); + throw new JobConfigurationException(com.agorapulse.worker.Job.create(configuration, () -> { + }), "Method annotated with @QueueProducer must have a return type. Use reactive type to the best performance."); } if (configuration.getCron() == null && configuration.getFixedDelay() == null && configuration.getInitialDelay() == null && configuration.getFixedRate() == null) { - throw new JobConfigurationException(com.agorapulse.worker.Job.create(configuration, () -> {}), "One of the cron, fixedDelay, initialDelay or fixedRate must be specified for a method annotated with @QueueProducer or one of @Cron, @FixedDelay, @InitialDelay or @FixedRate must be specified for the method."); + throw new JobConfigurationException(com.agorapulse.worker.Job.create(configuration, () -> { + }), "One of the cron, fixedDelay, initialDelay or fixedRate must be specified for a method annotated with @QueueProducer or one of @Cron, @FixedDelay, @InitialDelay or @FixedRate must be specified for the method."); } } @@ -202,14 +285,15 @@ private JobConfiguration getJobConfiguration(BeanDefinition beanDefinition, E configuration.setFollowerOnly(method.findAnnotation(FollowerOnly.class).isPresent()); if (configuration.isLeaderOnly() && configuration.isFollowerOnly()) { - throw new JobConfigurationException(com.agorapulse.worker.Job.create(configuration, () -> {}), "Cannot use @FollowerOnly on a producer method or method annotated with @LeaderOnly"); + throw new JobConfigurationException(com.agorapulse.worker.Job.create(configuration, () -> { + }), "Cannot use @FollowerOnly on a producer method or method annotated with @LeaderOnly"); } method.findAnnotation(Concurrency.class).flatMap(a -> a.getValue(Integer.class)).ifPresent(configuration::setConcurrency); - method.findAnnotation(Fork.class).flatMap(a -> a.getValue(Integer.class)).ifPresent(configuration::setFork); + getFirstAnnotationValue(ANNOTATION_TO_FORK_MAP, (annotation, member) -> method.intValue(annotation, member).stream().boxed().findAny(), i -> i > 0).ifPresent(configuration::setFork); - configureConsumerQueue(jobName, method.findAnnotation(Consumes.class), configuration.getConsumer()); - configureQueue(method.findAnnotation(Produces.class), configuration.getProducer()); + configureConsumerQueue(jobName, method, ANNOTATION_TO_CONSUMER_QUEUE_NAME_MAP, ANNOTATION_TO_CONSUMER_TYPE_MAP, configuration.getConsumer()); + configureQueue(method, ANNOTATION_TO_PRODUCER_QUEUE_NAME_MAP, ANNOTATION_TO_PRODUCER_TYPE_MAP, configuration.getProducer()); configuration.mergeWith(configurationOverrides); @@ -222,44 +306,28 @@ private JobConfiguration getJobConfiguration(BeanDefinition beanDefinition, E } if (method.getArguments().length > 1) { - throw new JobConfigurationException(com.agorapulse.worker.Job.create(configuration, () -> {}), "Cannot have more than one argument in a method annotated with @Job"); + throw new JobConfigurationException(com.agorapulse.worker.Job.create(configuration, () -> { + }), "Cannot have more than one argument in a method annotated with @Job"); } return configuration; } - private void configureQueue(Optional> consumesAnnotation, MutableJobConfiguration.MutableQueueConfiguration queueConfiguration) { - if (consumesAnnotation.isPresent()) { - AnnotationValue annotationValue = consumesAnnotation.get(); - - annotationValue.stringValue().ifPresent(queueConfiguration::setQueueName); - - annotationValue.stringValue("type").ifPresent(type -> { - if (StringUtils.isNotEmpty(type)) { - queueConfiguration.setQueueType(type); - } - }); - } + private void configureQueue(ExecutableMethod method, Map annotationsToQueue, Map annotationsToType, MutableJobConfiguration.MutableQueueConfiguration queueConfiguration) { + getFirstAnnotationValue(annotationsToQueue, method::stringValue, StringUtils::isNotEmpty).ifPresent(queueConfiguration::setQueueName); + getFirstAnnotationValue(annotationsToType, method::stringValue, StringUtils::isNotEmpty).ifPresent(queueConfiguration::setQueueType); } - private void configureConsumerQueue(String jobName, Optional> consumesAnnotation, MutableJobConfiguration.MutableConsumerQueueConfiguration queueConfiguration) { - if (consumesAnnotation.isPresent()) { - configureQueue(consumesAnnotation, queueConfiguration); - - AnnotationValue annotationValue = consumesAnnotation.get(); + private void configureConsumerQueue(String jobName, ExecutableMethod method, Map annotationsToQueue, Map annotationsToType, MutableJobConfiguration.MutableConsumerQueueConfiguration queueConfiguration) { + configureQueue(method, annotationsToQueue, annotationsToType, queueConfiguration); - annotationValue.stringValue("waitingTime").ifPresent(waitingTime -> { - if (StringUtils.isNotEmpty(waitingTime)) { - queueConfiguration.setWaitingTime(convertDuration(jobName, waitingTime, "waiting time")); - } - }); + getFirstAnnotationValue(ANNOTATION_TO_WAITING_TIME_MAP, method::stringValue, StringUtils::isNotEmpty).ifPresent(waitingTime -> queueConfiguration.setWaitingTime(convertDuration(jobName, waitingTime, "waiting time"))); - annotationValue.intValue("maxMessages").ifPresent(maxMessages -> { - if (maxMessages > 0 && maxMessages != JobConfiguration.ConsumerQueueConfiguration.DEFAULT_MAX_MESSAGES) { - queueConfiguration.setMaxMessages(maxMessages); - } - }); - } + getFirstAnnotationValue( + ANNOTATION_TO_MAX_MESSAGES_MAP, + (anno, member) -> method.intValue(anno, member).stream().boxed().findAny(), + maxMessages -> maxMessages > 0 && maxMessages != JobConfiguration.ConsumerQueueConfiguration.DEFAULT_MAX_MESSAGES + ).ifPresent(queueConfiguration::setMaxMessages); } private String extractQueueNameFromMethod(ExecutableMethod method) { diff --git a/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/convention/QueueListenerAndProducerSpec.groovy b/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/convention/QueueListenerAndProducerSpec.groovy index cc01921e..c6ede432 100644 --- a/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/convention/QueueListenerAndProducerSpec.groovy +++ b/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/convention/QueueListenerAndProducerSpec.groovy @@ -22,6 +22,7 @@ import com.agorapulse.worker.JobManager import com.agorapulse.worker.annotation.Cron import io.micronaut.test.extensions.spock.annotation.MicronautTest import jakarta.inject.Inject +import jakarta.inject.Named import reactor.core.publisher.Flux import spock.lang.Specification @@ -46,6 +47,31 @@ class QueueListenerAndProducerSpec extends Specification { } // end::quickstart[] + @com.agorapulse.worker.annotation.Job('queue-listener-job') + @QueueConsumer(value = 'my-queue', maxMessages = 5, waitingTime = '10s') + public void listenToMyQueueMultiple(Message message) { + // your code here + } + + @Named('queue-producer-job') + @Cron('0 0 0/4 ? * *') + @QueueProducer('my-queue') + public Flux produceToMyQueueMultiple() { + return Flux.just("Hello", "World").map(Message::new); + } + + @Named('not-a-consumer-job') + @QueueConsumer('my-queue') + public void notAConsumer() { + // your code here + } + + @Named('not-a-producer') + @QueueProducer('my-queue') + public void notAProducer() { + // your code here + } + @Inject JobManager jobManager void 'listener job is registered'() { @@ -75,4 +101,48 @@ class QueueListenerAndProducerSpec extends Specification { job.configuration.cron == '0 0 0/1 ? * *' } + void 'multiple annotations are composed for consumer'() { + given: + String jobName = 'queue-listener-job' + expect: + jobName in jobManager.jobNames + + when: + Job job = jobManager.getJob(jobName).get() + then: + verifyAll(job.configuration) { + fixedRate == Duration.ofSeconds(10) + consumer.queueName == 'my-queue' + consumer.maxMessages == 5 + consumer.waitingTime == Duration.ofSeconds(10) + fork == 5 + } + + } + + void 'multiple annotations are composed for producer'() { + given: + String jobName = 'queue-producer-job' + expect: + jobName in jobManager.jobNames + + when: + Job job = jobManager.getJob(jobName).get() + then: + verifyAll(job.configuration) { + cron == '0 0 0/4 ? * *' + producer.queueName == 'my-queue' + } + } + + void 'not a consumer job fails to be configured'() { + expect: + jobManager.getJob('not-a-consumer-job').empty + } + + void 'not a producer job fails to be configured'() { + expect: + jobManager.getJob('not-a-producer').empty + } + }