Skip to content

Commit 8579625

Browse files
authored
Merge pull request #52 from agorapulse/chore/align-forsk-with-number-of-messages
align forks with number of messages
2 parents 43e84e1 + 62665b3 commit 8579625

File tree

5 files changed

+65
-12
lines changed

5 files changed

+65
-12
lines changed

docs/guide/src/docs/asciidoc/usage.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ include::{root-dir}/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/
138138
<5> Use the `@QueueConsumer` annotation with the name of the queue to consume messages
139139
<6> The consumer job must have a single parameter of the same type as the producer job returns
140140

141+
TIP: The value for `@Fork` is the same as the number of `maxMessages` for `@QueueConsumer` annotation. The value of `waitingTime` in `@QueueConsumer` is the same as the associated `@FixedRate` value.
142+
141143
==== Advanced Usage
142144

143145
The name of the queue can be customised using `@Consumes` and `@Produces`

libs/micronaut-worker-executor-redis/micronaut-worker-executor-redis.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,5 @@ dependencies {
2424
implementation 'io.micronaut.reactor:micronaut-reactor'
2525

2626
testImplementation project(':micronaut-worker-tck')
27+
testImplementation 'org.yaml:snakeyaml'
2728
}

libs/micronaut-worker-executor-redis/src/test/groovy/com/agorapulse/worker/redis/RedisJobExecutorSpec.groovy

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,7 @@ class RedisJobExecutorSpec extends AbstractJobExecutorSpec {
4545

4646
ApplicationContext ctx = ApplicationContext
4747
.builder(
48-
'redis.uri': "redis://$redis.host:${redis.getMappedPort(6379)}",
49-
'worker.jobs.long-running-job-execute-producer.enabled': 'true',
50-
'worker.jobs.long-running-job-execute-on-leader.enabled': 'true',
51-
'worker.jobs.long-running-job-execute-on-follower.enabled': 'true',
52-
'worker.jobs.long-running-job-execute-consecutive.enabled': 'true',
53-
'worker.jobs.long-running-job-execute-unlimited.enabled': 'true',
54-
'worker.jobs.long-running-job-execute-concurrent.enabled': 'true',
55-
'worker.jobs.long-running-job-execute-concurrent-consumer.enabled': 'true',
56-
'worker.jobs.long-running-job-execute-fork-consumer.enabled': 'true',
57-
'worker.jobs.long-running-job-execute-regular-consumer.enabled': 'true',
58-
'worker.jobs.long-running-job-execute-fork.enabled': 'true'
48+
'redis.uri': "redis://$redis.host:${redis.getMappedPort(6379)}"
5949
)
6050
.environments(CONCURRENT_JOB_TEST_ENVIRONMENT)
6151
.build()
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
2+
3+
worker:
4+
jobs:
5+
long-running-job-execute-producer:
6+
enabled: true
7+
scheduler: long-running-job-execute-producer
8+
long-running-job-execute-on-leader:
9+
enabled: true
10+
scheduler: long-running-job-execute-on-leader
11+
long-running-job-execute-on-follower:
12+
enabled: true
13+
scheduler: long-running-job-execute-on-follower
14+
long-running-job-execute-consecutive:
15+
enabled: true
16+
scheduler: long-running-job-execute-consecutive
17+
long-running-job-execute-unlimited:
18+
enabled: true
19+
scheduler: long-running-job-execute-unlimited
20+
long-running-job-execute-concurrent:
21+
enabled: true
22+
scheduler: long-running-job-execute-concurrent
23+
long-running-job-execute-concurrent-consumer:
24+
enabled: true
25+
scheduler: long-running-job-execute-concurrent-consumer
26+
long-running-job-execute-fork-consumer:
27+
enabled: true
28+
scheduler: long-running-job-execute-fork-consumer
29+
long-running-job-execute-regular-consumer:
30+
enabled: true
31+
scheduler: long-running-job-execute-regular-consumer
32+
long-running-job-execute-fork:
33+
enabled: true
34+
scheduler: long-running-job-execute-fork
35+
36+
micronaut:
37+
executors:
38+
long-running-job-execute-producer:
39+
number-of-threads: 1
40+
long-running-job-execute-on-leader:
41+
number-of-threads: 1
42+
long-running-job-execute-on-follower:
43+
number-of-threads: 1
44+
long-running-job-execute-consecutive:
45+
number-of-threads: 1
46+
long-running-job-execute-unlimited:
47+
number-of-threads: 1
48+
long-running-job-execute-concurrent:
49+
number-of-threads: 1
50+
long-running-job-execute-concurrent-consumer:
51+
number-of-threads: 1
52+
long-running-job-execute-fork-consumer:
53+
number-of-threads: 2
54+
long-running-job-execute-regular-consumer:
55+
number-of-threads: 1
56+
long-running-job-execute-fork:
57+
number-of-threads: 2

libs/micronaut-worker/src/main/java/com/agorapulse/worker/convention/QueueConsumer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.agorapulse.worker.WorkerConfiguration;
2222
import com.agorapulse.worker.annotation.Consumes;
2323
import com.agorapulse.worker.annotation.FixedRate;
24+
import com.agorapulse.worker.annotation.Fork;
2425
import com.agorapulse.worker.annotation.Job;
2526
import io.micronaut.context.annotation.AliasFor;
2627
import jakarta.inject.Named;
@@ -34,6 +35,7 @@
3435

3536
@Documented
3637
@Consumes
38+
@Fork(JobConfiguration.ConsumerQueueConfiguration.DEFAULT_MAX_MESSAGES)
3739
@FixedRate("20s")
3840
@Retention(RUNTIME)
3941
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@@ -61,7 +63,8 @@
6163
/**
6264
* @return the maximum of messages consumed in a single run, defaults to {@link JobConfiguration.ConsumerQueueConfiguration#DEFAULT_MAX_MESSAGES}
6365
*/
64-
@AliasFor(annotation = Consumes.class, member = "value")
66+
@AliasFor(annotation = Fork.class, member = "value")
67+
@AliasFor(annotation = Consumes.class, member = "maxMessages")
6568
int maxMessages() default JobConfiguration.ConsumerQueueConfiguration.DEFAULT_MAX_MESSAGES;
6669

6770
/**

0 commit comments

Comments
 (0)