Skip to content

Add actor testcontainer tests #1192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Mar 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
14764a8
Add actor testcontainer tests
akkie Jan 14, 2025
83c8776
adding auto config
salaboy Jan 22, 2025
b9ff56f
updating ActorClient
salaboy Jan 22, 2025
be84c35
registering?
salaboy Jan 23, 2025
470f32e
updating actors test and actorruntime
salaboy Feb 3, 2025
638a225
updating ActorRuntime
salaboy Feb 3, 2025
9c29db9
Adding WorkflowTaskOptions and use it instead of TaskOptions (#1200)
artur-ciocanu Jan 30, 2025
f0468fc
Fix formatting issues
akkie Feb 5, 2025
b5e88b5
adding spring boot workflows integration (#1195)
salaboy Feb 3, 2025
f0707cf
Register workflows and acitivities using instances along classes (#1201)
artur-ciocanu Feb 3, 2025
f73b989
feat: Adding basic HTTPEndpoint configuration support in testcontaine…
lbroudoux Feb 6, 2025
8cbf701
fixing actors IT test and messaging IT with app-health-checks
salaboy Feb 7, 2025
737de67
Add app health check support to Dapr Testcontainer (#1213)
artur-ciocanu Feb 10, 2025
cb290c8
adding license headers + adding wait for actors in test
salaboy Feb 20, 2025
f0287e7
Add app health check support to Dapr Testcontainer (#1213)
artur-ciocanu Feb 10, 2025
d198865
Picks a port for DaprActorITS for test containers to avoid conflict.
artursouza Feb 21, 2025
2519af5
Add app health check support to Dapr Testcontainer (#1213)
artur-ciocanu Feb 10, 2025
a9fdda8
using random port thanks to @artur-ciocanu
salaboy Feb 21, 2025
3f4069f
Merge branch 'master' into actor-testcontainer
salaboy Feb 26, 2025
67d972f
Merge branch 'master' into actor-testcontainer
artursouza Feb 28, 2025
3cc8e46
Merge branch 'master' into actor-testcontainer
artursouza Mar 1, 2025
4e88a36
Merge branch 'master' into actor-testcontainer
artur-ciocanu Mar 1, 2025
448b76e
Merge branch 'master' into actor-testcontainer
artur-ciocanu Mar 1, 2025
6755f19
Update TestRestController.java
artur-ciocanu Mar 1, 2025
da40fc5
Update DaprActorsIT.java
artur-ciocanu Mar 1, 2025
0e3101e
Update DaprContainer.java
artur-ciocanu Mar 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.dapr.spring.boot.autoconfigure.client;

import io.dapr.actors.client.ActorClient;
import io.dapr.actors.runtime.ActorRuntime;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.config.Properties;
Expand Down Expand Up @@ -70,6 +72,20 @@ DaprWorkflowClient daprWorkflowClient(DaprConnectionDetails daprConnectionDetail
return new DaprWorkflowClient(properties);
}

@Bean
@ConditionalOnMissingBean
ActorClient daprActorClient(DaprConnectionDetails daprConnectionDetails) {
Properties properties = createPropertiesFromConnectionDetails(daprConnectionDetails);
return new ActorClient(properties);
}

@Bean
@ConditionalOnMissingBean
ActorRuntime daprActorRuntime(DaprConnectionDetails daprConnectionDetails) {
Properties properties = createPropertiesFromConnectionDetails(daprConnectionDetails);
return ActorRuntime.getInstance(properties);
}

@Bean
@ConditionalOnMissingBean
WorkflowRuntimeBuilder daprWorkflowRuntimeBuilder(DaprConnectionDetails daprConnectionDetails) {
Expand Down
22 changes: 2 additions & 20 deletions sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.utils.NetworkUtils;
import io.dapr.utils.Version;
import io.dapr.v1.DaprGrpc;
import io.grpc.Channel;
Expand Down Expand Up @@ -83,7 +84,7 @@ public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOp
* @param resiliencyOptions Client resiliency options.
*/
public ActorClient(Properties overrideProperties, Map<String, String> metadata, ResiliencyOptions resiliencyOptions) {
this(buildManagedChannel(overrideProperties),
this(NetworkUtils.buildGrpcManagedChannel(overrideProperties),
metadata,
resiliencyOptions,
overrideProperties.getValue(Properties.API_TOKEN));
Expand Down Expand Up @@ -129,25 +130,6 @@ public void close() {
}
}

/**
* Creates a GRPC managed channel (or null, if not applicable).
*
* @param overrideProperties Overrides
* @return GRPC managed channel or null.
*/
private static ManagedChannel buildManagedChannel(Properties overrideProperties) {
int port = overrideProperties.getValue(Properties.GRPC_PORT);
if (port <= 0) {
throw new IllegalArgumentException("Invalid port.");
}

var sidecarHost = overrideProperties.getValue(Properties.SIDECAR_IP);

return ManagedChannelBuilder.forAddress(sidecarHost, port)
.usePlaintext()
.userAgent(Version.getSdkVersion())
.build();
}

/**
* Build an instance of the Client based on the provided setup.
Expand Down
103 changes: 55 additions & 48 deletions sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
import io.dapr.config.Properties;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.Version;
import io.dapr.utils.NetworkUtils;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import reactor.core.publisher.Mono;

import java.io.Closeable;
Expand Down Expand Up @@ -80,23 +79,32 @@ public class ActorRuntime implements Closeable {
* @throws IllegalStateException If cannot instantiate Runtime.
*/
private ActorRuntime() throws IllegalStateException {
this(buildManagedChannel());
this(new Properties());
}

/**
* The default constructor. This should not be called directly.
*
* @throws IllegalStateException If cannot instantiate Runtime.
*/
private ActorRuntime(Properties properties) throws IllegalStateException {
this(NetworkUtils.buildGrpcManagedChannel(properties));
}

/**
* Constructor once channel is available. This should not be called directly.
*
* @param channel GRPC managed channel to be closed (or null).
* @throws IllegalStateException If cannot instantiate Runtime.
* @throws IllegalStateException If you cannot instantiate Runtime.
*/
private ActorRuntime(ManagedChannel channel) throws IllegalStateException {
this(channel, buildDaprClient(channel));
this(channel, new DaprClientImpl(channel));
}

/**
* Constructor with dependency injection, useful for testing. This should not be called directly.
*
* @param channel GRPC managed channel to be closed (or null).
* @param channel GRPC managed channel to be closed (or null).
* @param daprClient Client to communicate with Dapr.
* @throws IllegalStateException If class has one instance already.
*/
Expand Down Expand Up @@ -128,6 +136,24 @@ public static ActorRuntime getInstance() {
return instance;
}

/**
* Returns an ActorRuntime object.
*
* @param properties Properties to be used for the runtime.
* @return An ActorRuntime object.
*/
public static ActorRuntime getInstance(Properties properties) {
if (instance == null) {
synchronized (ActorRuntime.class) {
if (instance == null) {
instance = new ActorRuntime(properties);
}
}
}

return instance;
}

/**
* Gets the Actor configuration for this runtime.
*
Expand All @@ -149,24 +175,22 @@ public byte[] serializeConfig() throws IOException {

/**
* Registers an actor with the runtime, using {@link DefaultObjectSerializer} and {@link DefaultActorFactory}.
*
* {@link DefaultObjectSerializer} is not recommended for production scenarios.
*
* @param clazz The type of actor.
* @param <T> Actor class type.
* @param clazz The type of actor.
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(Class<T> clazz) {
registerActor(clazz, new DefaultObjectSerializer(), new DefaultObjectSerializer());
}

/**
* Registers an actor with the runtime, using {@link DefaultObjectSerializer}.
*
* {@link DefaultObjectSerializer} is not recommended for production scenarios.
*
* @param clazz The type of actor.
* @param actorFactory An optional factory to create actors. This can be used for dependency injection.
* @param <T> Actor class type.
* @param clazz The type of actor.
* @param actorFactory An optional factory to create actors. This can be used for dependency injection.
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(Class<T> clazz, ActorFactory<T> actorFactory) {
registerActor(clazz, actorFactory, new DefaultObjectSerializer(), new DefaultObjectSerializer());
Expand All @@ -181,8 +205,8 @@ public <T extends AbstractActor> void registerActor(Class<T> clazz, ActorFactory
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(
Class<T> clazz, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) {
registerActor(clazz, new DefaultActorFactory<T>(), objectSerializer, stateSerializer);
Class<T> clazz, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) {
registerActor(clazz, new DefaultActorFactory<T>(), objectSerializer, stateSerializer);
}

/**
Expand All @@ -195,9 +219,9 @@ public <T extends AbstractActor> void registerActor(
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(
Class<T> clazz, ActorFactory<T> actorFactory,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
Class<T> clazz, ActorFactory<T> actorFactory,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
if (clazz == null) {
throw new IllegalArgumentException("Class is required.");
}
Expand All @@ -216,12 +240,12 @@ public <T extends AbstractActor> void registerActor(
// Create ActorManager, if not yet registered.
this.actorManagers.computeIfAbsent(actorTypeInfo.getName(), (k) -> {
ActorRuntimeContext<T> context = new ActorRuntimeContext<>(
this,
objectSerializer,
actorFactory,
actorTypeInfo,
this.daprClient,
new DaprStateAsyncProvider(this.daprClient, stateSerializer));
this,
objectSerializer,
actorFactory,
actorTypeInfo,
this.daprClient,
new DaprStateAsyncProvider(this.daprClient, stateSerializer));
this.config.addRegisteredActorType(actorTypeInfo.getName());
return new ActorManager<T>(context);
});
Expand All @@ -236,7 +260,7 @@ public <T extends AbstractActor> void registerActor(
*/
public Mono<Void> deactivate(String actorTypeName, String actorId) {
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.deactivateActor(new ActorId(actorId)));
.flatMap(m -> m.deactivateActor(new ActorId(actorId)));
}

/**
Expand All @@ -252,8 +276,8 @@ public Mono<Void> deactivate(String actorTypeName, String actorId) {
public Mono<byte[]> invoke(String actorTypeName, String actorId, String actorMethodName, byte[] payload) {
ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeMethod(id, actorMethodName, payload));
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager) m).invokeMethod(id, actorMethodName, payload));
}

/**
Expand All @@ -268,8 +292,8 @@ public Mono<byte[]> invoke(String actorTypeName, String actorId, String actorMet
public Mono<Void> invokeReminder(String actorTypeName, String actorId, String reminderName, byte[] params) {
ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeReminder(new ActorId(actorId), reminderName, params));
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager) m).invokeReminder(new ActorId(actorId), reminderName, params));
}

/**
Expand All @@ -284,8 +308,8 @@ public Mono<Void> invokeReminder(String actorTypeName, String actorId, String re
public Mono<Void> invokeTimer(String actorTypeName, String actorId, String timerName, byte[] params) {
ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeTimer(new ActorId(actorId), timerName, params));
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager) m).invokeTimer(new ActorId(actorId), timerName, params));
}

/**
Expand Down Expand Up @@ -318,23 +342,6 @@ private static DaprClient buildDaprClient(ManagedChannel channel) {
return new DaprClientImpl(channel);
}

/**
* Creates a GRPC managed channel (or null, if not applicable).
*
* @return GRPC managed channel or null.
*/
private static ManagedChannel buildManagedChannel() {
int port = Properties.GRPC_PORT.get();
if (port <= 0) {
throw new IllegalStateException("Invalid port.");
}

return ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), port)
.usePlaintext()
.userAgent(Version.getSdkVersion())
.build();
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void writeReadState() throws Exception {
proxyBuilder = new ActorProxyBuilder(actorType, ActorProxy.class, deferClose(run2.newActorClient()));
ActorProxy newProxy = proxyBuilder.build(actorId);

// wating for actor to be activated
// waiting for actor to be activated
Thread.sleep(2000);

callWithRetry(() -> {
Expand Down
Loading