Skip to content

Commit

Permalink
Merge pull request #40 from agorapulse/feature/worker-function
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr authored Nov 26, 2024
2 parents af2b7f4 + 4e51053 commit d17d152
Show file tree
Hide file tree
Showing 50 changed files with 1,016 additions and 155 deletions.
5 changes: 5 additions & 0 deletions docs/guide/src/docs/asciidoc/installation.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ repositories {
}
dependencies {
// minimal dependency with local queue and executor
// select some of the following dependencies to enable more features
implementation 'com.agorapulse:micronaut-worker:{project-version}'
// to enable /jobs endpoint
Expand All @@ -29,6 +31,9 @@ dependencies {
// to enable Redis queues integration
implementation 'com.agorapulse:micronaut-worker-queues-redis:{project-version}'
// to enable running jobs as CLI apps
implementation 'com.agorapulse:micronaut-worker-runner:{project-version}'
// you also need Redis configuration on the classpath depending on your Micronaut version
// for Micronaut 1.x
implementation 'io.micronaut.configuration:micronaut-redis-lettuce'
Expand Down
1 change: 1 addition & 0 deletions docs/guide/src/docs/asciidoc/introduction.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Micronaut Worker is a library for advanced scheduling and work distribution in M
* Job execution events `JobExecutionStartedEvent`, `JobExecutionFinishedEvent` and `JobExecutionResultEvent`
* Built in support for https://github.com/agorapulse/micronaut-snitch[Micronaut Snitch]
* Built in support for https://agorapulse.github.io/micronaut-console[Micronaut Console]
* Ability to execute a single job from the CLI (for e.g. https://aws.amazon.com/batch/ or https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_definitions.html)

Unlike the `@Scheduled` annotation, Micronaut Worker annotations are not repeatable, but
they can be combined in meaningful ways. For example, a method annotated with `@FixedRate('10m') @InitialDelay('1m')` executes every
Expand Down
23 changes: 21 additions & 2 deletions docs/guide/src/docs/asciidoc/usage.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,18 @@ include::{root-dir}/libs/micronaut-worker/src/main/java/com/agorapulse/worker/ev

TIP: If https://github.com/agorapulse/micronaut-snitch[Micronaut Snitch] is present on the classpath and configured with the name of the job, the `snitch` method is called automatically after successful execution.

== CLI Runner

You can run a single job from the command line using the `com.agorapulse.worker.runner.JobRunner` class as the main class. The arguments are the names of the jobs to run. All other jobs are disabled, even when enabled in the configuration (see corner cases below). The application will run until all jobs are finished.

[source,shell]
.Run Job from CLI
----
java -cp myapp-shadow.jar com.agorapulse.worker.runner.JobRunner sample-job other-job
----

WARNING: In some corner cases, some unrelated jobs can still be executed if they have a very short delay or frequency if they are manually enabled in the configuration. Please, prefer annotation driven jobs over configuring them manually in the configuration to avoid this issue.

== Management

You can use `jobs` management endpoint, by default located at `/jobs`, to see the status of all the jobs in the application.
Expand All @@ -280,7 +292,7 @@ each job. The name is the lower-camel-case version of the job name, e.g., `sampl
include::{root-dir}/libs/micronaut-worker/src/test/resources/com/agorapulse/worker/console/ConsoleSpec/variables.txt[]
----

TIP: The job variables are instances of `JobAccessor`, which also provides methods `run()` and `enqueue(message)` to let you easily trigger jobs from the console.
TIP: The job variables are instances of `JobAccessor`, which also provides methods `run()` and `enqueue(message)` to let you easily trigger jobs from the console. You can also use method `reconfigure(consumer)` that changes the in-memory configuration of the job and reschedules it.

A simple script with just variable `jobs` will print the status of the current job execution. Depending
on which console endpoint you choose, you get either a text or JSON summary.
Expand All @@ -297,13 +309,20 @@ include::{root-dir}/libs/micronaut-worker/src/test/resources/com/agorapulse/work
include::{root-dir}/libs/micronaut-worker/src/test/resources/com/agorapulse/worker/console/ConsoleSpec/listJobsResponse.txt[]
----


[source]
.Job Manager Script - JSON Result
----
include::{root-dir}/libs/micronaut-worker/src/test/resources/com/agorapulse/worker/console/ConsoleSpec/listJobsResponse.json[]
----

Reconfiguring the job will try to change the configuration of the job and reschedule it if it's still enabled.

[source,options="nowrap"]
.Job Manager Script - Reconfigure
----
include::{root-dir}/libs/micronaut-worker/src/test/resources/com/agorapulse/worker/console/ConsoleSpec/reconfigure.groovy[]
----

Returning a job variable from the script will render details for that job.

[source,groovy]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import io.lettuce.core.support.BoundedPoolConfig;
import io.micronaut.core.type.Argument;
import io.micronaut.jackson.JacksonConfiguration;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -61,17 +63,17 @@ public RedisQueues(ObjectMapper objectMapper, RedisClient client, RedisPoolConfi
pool = new BoundedAsyncPool<>(new ConnectionFactory(client), config);
}

@Override
@SuppressWarnings("unchecked")
public <T> void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument, Consumer<T> action) {
@Override
public <T> Publisher<T> readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument) {
TransactionResult result = withTransaction(redisCommands -> {
String key = getKey(queueName);
redisCommands.zrange(key, 0, maxNumberOfMessages - 1);
redisCommands.zremrangebyrank(key, 0, maxNumberOfMessages - 1);
});

if (result == null) {
return;
return Flux.empty();
}


Expand All @@ -83,14 +85,14 @@ public <T> void readMessages(String queueName, int maxNumberOfMessages, Duration

List<String> messages = (List<String>) firstResponse;

messages.forEach(body -> {
return Flux.fromIterable(messages).handle((body, sink) -> {
try {
action.accept(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory())));
sink.next(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory())));
} catch (JsonProcessingException e) {
if (argument.equalsType(Argument.STRING)) {
action.accept((T) body);
sink.next((T) body);
} else {
throw new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e);
sink.error(new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e));
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
api "com.agorapulse:micronaut-aws-sdk-sqs:$micronautAwsSdkVersion"

implementation 'io.micronaut:micronaut-jackson-databind'
implementation 'io.micronaut.reactor:micronaut-reactor'

testImplementation project(':micronaut-worker-tck')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micronaut.core.type.Argument;
import io.micronaut.jackson.JacksonConfiguration;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class SqsQueues implements JobQueues {
Expand All @@ -42,10 +44,10 @@ public SqsQueues(SimpleQueueService simpleQueueService, ObjectMapper objectMappe
}

@Override
public <T> void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument, Consumer<T> action) {
simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).forEach(m -> {
readMessageInternal(queueName, argument, action, m.getBody(), m.getReceiptHandle(), true);
});
public <T> Publisher<T> readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument) {
return Flux.merge(simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).stream().map(m ->
readMessageInternal(queueName, argument, m.getBody(), m.getReceiptHandle(), true)
).toList());
}

@Override
Expand All @@ -70,28 +72,27 @@ public void sendRawMessage(String queueName, Object result) {
}
}

private <T> void readMessageInternal(String queueName, Argument<T> argument, Consumer<T> action, String body, String handle, boolean tryReformat) {
private <T> Mono<T> readMessageInternal(String queueName, Argument<T> argument, String body, String handle, boolean tryReformat) {
try {
action.accept(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory())));
Mono<T> result = Mono.just(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory())));
simpleQueueService.deleteMessage(queueName, handle);
return result;
} catch (JsonProcessingException e) {
if (tryReformat) {
if (String.class.isAssignableFrom(argument.getType())) {
action.accept(argument.getType().cast(body));
Mono<T> result = Mono.just(argument.getType().cast(body));
simpleQueueService.deleteMessage(queueName, handle);
return;
return result;
}
if (Collection.class.isAssignableFrom(argument.getType())) {
if (argument.getTypeParameters().length > 0 && CharSequence.class.isAssignableFrom(argument.getTypeParameters()[0].getType())) {
String quoted = Arrays.stream(body.split(",\\s*")).map(s -> "\"" + s + "\"").collect(Collectors.joining(","));
readMessageInternal(queueName, argument, action, "[" + quoted + "]", handle, false);
return;
return readMessageInternal(queueName, argument, "[" + quoted + "]", handle, false);
}
readMessageInternal(queueName, argument, action, "[" + body + "]", handle, false);
return;
return readMessageInternal(queueName, argument, "[" + body + "]", handle, false);
}
}
throw new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e);
return Mono.error(new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.amazonaws.services.sqs.model.AmazonSQSException
import com.amazonaws.services.sqs.model.Message
import com.fasterxml.jackson.databind.ObjectMapper
import io.micronaut.core.type.Argument
import reactor.core.publisher.Flux
import spock.lang.Shared
import spock.lang.Specification

Expand All @@ -42,10 +43,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'message is deleted once read'() {
when:
List<Map<String, String>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) {
values << it
}
List<Map<String, String>> values = Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String))
).collectList().block()
then:
values
values.size() == 2
Expand All @@ -71,10 +71,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'can read legacy messages'() {
when:
List<List<Long>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long)) {
values << it
}
List<List<Long>> values = Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long))
).collectList().block()
then:
values
values.size() == 2
Expand All @@ -101,10 +100,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'can read legacy string messages'() {
when:
List<List<String>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String)) {
values << it
}
List<List<String>> values = Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String))
).collectList().block()
then:
values
values.size() == 2
Expand All @@ -131,10 +129,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'message not deleted on error'() {
when:
List<Map<String, String>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) {
values << it
}
Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String))
).collectList().block()
then:
thrown IllegalArgumentException

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import io.micronaut.jackson.JacksonConfiguration;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import software.amazon.awssdk.services.sqs.model.SqsException;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class SqsQueues implements JobQueues {
Expand All @@ -44,10 +44,10 @@ public SqsQueues(SimpleQueueService simpleQueueService, ObjectMapper objectMappe
}

@Override
public <T> void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument, Consumer<T> action) {
simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).forEach(m -> {
readMessageInternal(queueName, argument, action, m.body(), m.receiptHandle(), true);
});
public <T> Publisher<T> readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument) {
return Flux.merge(simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).stream().map(m ->
readMessageInternal(queueName, argument, m.body(), m.receiptHandle(), true)
).toList());
}

@Override
Expand Down Expand Up @@ -78,28 +78,27 @@ public void sendRawMessages(String queueName, Publisher<?> result) {
Flux.from(simpleQueueService.sendMessages(queueName, Flux.from(result).map(String::valueOf))).subscribe();
}

private <T> void readMessageInternal(String queueName, Argument<T> argument, Consumer<T> action, String body, String handle, boolean tryReformat) {
private <T> Mono<T> readMessageInternal(String queueName, Argument<T> argument, String body, String handle, boolean tryReformat) {
try {
action.accept(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory())));
Mono<T> result = Mono.just(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory())));
simpleQueueService.deleteMessage(queueName, handle);
return result;
} catch (JsonProcessingException e) {
if (tryReformat) {
if (String.class.isAssignableFrom(argument.getType())) {
action.accept(argument.getType().cast(body));
Mono<T> result = Mono.just(argument.getType().cast(body));
simpleQueueService.deleteMessage(queueName, handle);
return;
return result;
}
if (Collection.class.isAssignableFrom(argument.getType())) {
if (argument.getTypeParameters().length > 0 && CharSequence.class.isAssignableFrom(argument.getTypeParameters()[0].getType())) {
String quoted = Arrays.stream(body.split(",\\s*")).map(s -> "\"" + s + "\"").collect(Collectors.joining(","));
readMessageInternal(queueName, argument, action, "[" + quoted + "]", handle, false);
return;
return readMessageInternal(queueName, argument, "[" + quoted + "]", handle, false);
}
readMessageInternal(queueName, argument, action, "[" + body + "]", handle, false);
return;
return readMessageInternal(queueName, argument, "[" + body + "]", handle, false);
}
}
throw new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e);
return Mono.error(new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.agorapulse.micronaut.amazon.awssdk.sqs.SimpleQueueService
import com.agorapulse.worker.queue.JobQueues
import com.fasterxml.jackson.databind.ObjectMapper
import io.micronaut.core.type.Argument
import reactor.core.publisher.Flux
import software.amazon.awssdk.services.sqs.model.Message
import software.amazon.awssdk.services.sqs.model.SqsException
import spock.lang.Shared
Expand All @@ -42,10 +43,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'message is deleted once read'() {
when:
List<Map<String, String>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) {
values << it
}
List<Map<String, String>> values = Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String))
).collectList().block()
then:
values
values.size() == 2
Expand All @@ -71,10 +71,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'can read legacy messages'() {
when:
List<List<Long>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long)) {
values << it
}
List<List<Long>> values = Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long))
).collectList().block()
then:
values
values.size() == 2
Expand All @@ -101,10 +100,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'can read legacy string messages'() {
when:
List<List<String>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String)) {
values << it
}
List<List<String>> values = Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String))
).collectList().block()
then:
values
values.size() == 2
Expand All @@ -131,10 +129,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'message not deleted on error'() {
when:
List<Map<String, String>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) {
values << it
}
Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String))
).collectList().block()
then:
thrown IllegalArgumentException

Expand Down
Loading

0 comments on commit d17d152

Please sign in to comment.