Skip to content

Commit

Permalink
Merge pull request wildfly#18509 from pferraro/WFLY-20068
Browse files Browse the repository at this point in the history
WFLY-20068 Upgrade wildfly-clustering to 5.0.3.Final.
  • Loading branch information
bstansberry authored Dec 17, 2024
2 parents 50ee7dd + 3de4fcc commit 4630125
Show file tree
Hide file tree
Showing 35 changed files with 437 additions and 181 deletions.
45 changes: 19 additions & 26 deletions boms/standard-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-bom</artifactId>
<version>${version.org.assertj}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
Expand Down Expand Up @@ -229,17 +237,10 @@
</dependency>
<dependency>
<groupId>org.jboss.shrinkwrap</groupId>
<artifactId>shrinkwrap-api</artifactId>
<version>${version.org.jboss.shrinkwrap.shrinkwrap}</version>
<scope>test</scope>
</dependency>

<!-- only still used by JPA tests -->
<dependency>
<groupId>org.jboss.shrinkwrap</groupId>
<artifactId>shrinkwrap-impl-base</artifactId>
<artifactId>shrinkwrap-bom</artifactId>
<version>${version.org.jboss.shrinkwrap.shrinkwrap}</version>
<scope>test</scope>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
Expand Down Expand Up @@ -298,11 +299,13 @@
<version>${version.org.keycloak}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<artifactId>mockito-bom</artifactId>
<version>${version.org.mockito}</version>
<scope>test</scope>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
Expand All @@ -311,23 +314,13 @@
<version>${version.org.syslog4j}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>${version.org.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${version.org.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<artifactId>testcontainers-bom</artifactId>
<version>${version.org.testcontainers}</version>
<scope>test</scope>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import org.wildfly.clustering.ejb.timer.Timer;
import org.wildfly.clustering.ejb.timer.TimerManager;
import org.wildfly.clustering.server.scheduler.Scheduler;

import org.wildfly.clustering.ejb.timer.ImmutableTimerMetaData;
import org.wildfly.clustering.ejb.timer.TimeoutMetaData;

/**
* @author Paul Ferraro
Expand All @@ -19,5 +21,5 @@ public interface TimerFactory<I, V> {

TimerMetaDataFactory<I, V> getMetaDataFactory();

Timer<I> createTimer(I id, ImmutableTimerMetaData metaData, TimerManager<I> manager, Scheduler<I, ImmutableTimerMetaData> scheduler);
Timer<I> createTimer(I id, ImmutableTimerMetaData metaData, TimerManager<I> manager, Scheduler<I, TimeoutMetaData> scheduler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,30 @@

import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.wildfly.clustering.cache.batch.Batch;
import org.wildfly.clustering.context.DefaultThreadFactory;
import org.wildfly.clustering.server.expiration.ExpirationMetaData;
import org.wildfly.clustering.server.infinispan.CacheContainerGroup;
import org.wildfly.clustering.server.infinispan.expiration.AbstractExpirationScheduler;
import org.wildfly.clustering.server.infinispan.expiration.ExpirationMetaDataFunction;
import org.wildfly.clustering.server.infinispan.scheduler.AbstractCacheEntryScheduler;
import org.wildfly.clustering.ejb.bean.Bean;
import org.wildfly.clustering.ejb.bean.BeanExpirationConfiguration;
import org.wildfly.clustering.ejb.bean.BeanInstance;
import org.wildfly.clustering.ejb.bean.ImmutableBeanMetaData;
import org.wildfly.clustering.ejb.cache.bean.BeanFactory;
import org.wildfly.clustering.ejb.cache.bean.BeanMetaDataKey;
import org.wildfly.clustering.ejb.cache.bean.ImmutableBeanMetaDataFactory;
import org.wildfly.clustering.ejb.infinispan.logging.InfinispanEjbLogger;
import org.wildfly.clustering.server.local.scheduler.LocalScheduler;
import org.wildfly.clustering.server.local.scheduler.LocalSchedulerConfiguration;
import org.wildfly.clustering.server.local.scheduler.ScheduledEntries;
import org.wildfly.clustering.server.scheduler.Scheduler;
import org.wildfly.security.manager.WildFlySecurityManager;

/**
Expand All @@ -34,12 +39,12 @@
* @param <V> the bean instance type
* @param <M> the metadata value type
*/
public class BeanExpirationScheduler<K, V extends BeanInstance<K>, M> extends AbstractExpirationScheduler<K> {
public class BeanExpirationScheduler<K, V extends BeanInstance<K>, M> extends AbstractCacheEntryScheduler<K, BeanMetaDataKey<K>, M, ExpirationMetaData> {
private static final ThreadFactory THREAD_FACTORY = new DefaultThreadFactory(BeanExpirationScheduler.class, WildFlySecurityManager.getClassLoaderPrivileged(BeanExpirationScheduler.class));
private final ImmutableBeanMetaDataFactory<K, M> factory;

public BeanExpirationScheduler(String name, CacheContainerGroup group, Supplier<Batch> batchFactory, BeanFactory<K, V, M> factory, BeanExpirationConfiguration<K, V> expiration, Duration closeTimeout) {
super(new LocalScheduler<>(new LocalSchedulerConfiguration<>() {
this(new LocalScheduler<>(new LocalSchedulerConfiguration<>() {
@Override
public String getName() {
return name;
Expand All @@ -64,19 +69,29 @@ public ThreadFactory getThreadFactory() {
public Duration getCloseTimeout() {
return closeTimeout;
}
}));
}), factory);
}

private BeanExpirationScheduler(Scheduler<K, Instant> scheduler, BeanFactory<K, V, M> factory) {
super(scheduler.map(ExpirationMetaDataFunction.INSTANCE));
this.factory = factory.getMetaDataFactory();
}

@Override
public void schedule(K id) {
M value = this.factory.findValue(id);
if (value != null) {
ImmutableBeanMetaData<K> metaData = this.factory.createImmutableBeanMetaData(id, value);
this.schedule(id, metaData);
this.schedule(Map.entry(new InfinispanBeanMetaDataKey<>(id), value));
}
}

@Override
public void schedule(Map.Entry<BeanMetaDataKey<K>, M> entry) {
K id = entry.getKey().getId();
ImmutableBeanMetaData<K> metaData = this.factory.createImmutableBeanMetaData(id, entry.getValue());
this.schedule(id, metaData);
}

private static class BeanRemoveTask<K, V extends BeanInstance<K>, M> implements Predicate<K> {
private final Supplier<Batch> batchFactory;
private final BeanFactory<K, V, M> factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -28,15 +27,15 @@
import org.jboss.ejb.client.ClusterAffinity;
import org.jboss.ejb.client.NodeAffinity;
import org.wildfly.clustering.cache.CacheProperties;
import org.wildfly.clustering.cache.Key;
import org.wildfly.clustering.cache.batch.Batch;
import org.wildfly.clustering.cache.infinispan.embedded.distribution.Locality;
import org.wildfly.clustering.cache.infinispan.embedded.distribution.CacheStreamFilter;
import org.wildfly.clustering.cache.infinispan.embedded.listener.ListenerRegistration;
import org.wildfly.clustering.ejb.bean.Bean;
import org.wildfly.clustering.ejb.bean.BeanExpirationConfiguration;
import org.wildfly.clustering.ejb.bean.BeanInstance;
import org.wildfly.clustering.ejb.bean.BeanManager;
import org.wildfly.clustering.ejb.cache.bean.BeanFactory;
import org.wildfly.clustering.ejb.cache.bean.BeanMetaDataKey;
import org.wildfly.clustering.ejb.cache.bean.MutableBean;
import org.wildfly.clustering.ejb.cache.bean.OnCloseBean;
import org.wildfly.clustering.ejb.infinispan.logging.InfinispanEjbLogger;
Expand All @@ -47,15 +46,14 @@
import org.wildfly.clustering.server.infinispan.dispatcher.CacheContainerCommandDispatcherFactory;
import org.wildfly.clustering.server.infinispan.expiration.ScheduleWithExpirationMetaDataCommand;
import org.wildfly.clustering.server.infinispan.manager.AffinityIdentifierFactory;
import org.wildfly.clustering.server.infinispan.scheduler.CacheEntryScheduler;
import org.wildfly.clustering.server.infinispan.scheduler.CacheEntriesTask;
import org.wildfly.clustering.server.infinispan.scheduler.PrimaryOwnerScheduler;
import org.wildfly.clustering.server.infinispan.scheduler.PrimaryOwnerSchedulerConfiguration;
import org.wildfly.clustering.server.infinispan.scheduler.ScheduleCommand;
import org.wildfly.clustering.server.infinispan.scheduler.ScheduleLocalEntriesTask;
import org.wildfly.clustering.server.infinispan.scheduler.ScheduleWithTransientMetaDataCommand;
import org.wildfly.clustering.server.infinispan.scheduler.Scheduler;
import org.wildfly.clustering.server.infinispan.scheduler.SchedulerTopologyChangeListener;
import org.wildfly.clustering.server.manager.IdentifierFactory;
import org.wildfly.clustering.server.scheduler.Scheduler;

/**
* A {@link BeanManager} implementation backed by an infinispan cache.
Expand All @@ -66,15 +64,15 @@
*/
public class InfinispanBeanManager<K, V extends BeanInstance<K>, M> implements BeanManager<K, V> {

private final Cache<Key<K>, Object> cache;
private final Cache<BeanMetaDataKey<K>, M> cache;
private final CacheProperties properties;
private final RetryConfig retryConfig;
private final BeanFactory<K, V, M> beanFactory;
private final IdentifierFactory<K> identifierFactory;
private final CacheContainerCommandDispatcherFactory dispatcherFactory;
private final BeanExpirationConfiguration<K, V> expiration;
private final Supplier<Batch> batchFactory;
private final Predicate<Map.Entry<? super Key<K>, ? super Object>> filter;
private final Predicate<Map.Entry<? super BeanMetaDataKey<K>, ? super M>> filter;
private final Function<K, CacheContainerGroupMember> primaryOwnerLocator;
private final Affinity strongAffinity;

Expand Down Expand Up @@ -108,7 +106,7 @@ public void start() {
this.identifierFactory.start();

Duration stopTimeout = Duration.ofMillis(this.cache.getCacheConfiguration().transaction().cacheStopTimeout());
CacheEntryScheduler<K, ExpirationMetaData> localScheduler = (this.expiration != null) && !this.expiration.getTimeout().isZero() ? new BeanExpirationScheduler<>(this.cache.getName(), this.dispatcherFactory.getGroup(), this.batchFactory, this.beanFactory, this.expiration, stopTimeout) : null;
BeanExpirationScheduler<K, V, M> localScheduler = (this.expiration != null) && !this.expiration.getTimeout().isZero() ? new BeanExpirationScheduler<>(this.cache.getName(), this.dispatcherFactory.getGroup(), this.batchFactory, this.beanFactory, this.expiration, stopTimeout) : null;

String dispatcherName = String.join("/", this.cache.getName(), this.filter.toString());
this.scheduler = (localScheduler != null) ? (this.dispatcherFactory.getGroup().isSingleton() ? localScheduler : new PrimaryOwnerScheduler<>(new PrimaryOwnerSchedulerConfiguration<>() {
Expand All @@ -123,7 +121,7 @@ public CacheContainerCommandDispatcherFactory getCommandDispatcherFactory() {
}

@Override
public CacheEntryScheduler<K, ExpirationMetaData> getScheduler() {
public Scheduler<K, ExpirationMetaData> getScheduler() {
return localScheduler;
}

Expand All @@ -143,11 +141,12 @@ public RetryConfig getRetryConfig() {
}
})) : null;

BiConsumer<Locality, Locality> scheduleTask = (localScheduler != null) ? new ScheduleLocalEntriesTask<>(this.cache, this.filter, localScheduler) : null;
this.schedulerListenerRegistration = (localScheduler != null) ? new SchedulerTopologyChangeListener<>(this.cache, localScheduler, scheduleTask).register() : null;
Consumer<CacheStreamFilter<Map.Entry<BeanMetaDataKey<K>, M>>> scheduleTask = (localScheduler != null) ? CacheEntriesTask.schedule(this.cache, this.filter, localScheduler) : null;
Consumer<CacheStreamFilter<Map.Entry<BeanMetaDataKey<K>, M>>> cancelTask = (localScheduler != null) ? CacheEntriesTask.cancel(this.cache, this.filter, localScheduler) : null;
this.schedulerListenerRegistration = (localScheduler != null) ? new SchedulerTopologyChangeListener<>(this.cache, scheduleTask, cancelTask).register() : null;
if (scheduleTask != null) {
// Schedule expiration of existing beans that we own
scheduleTask.accept(Locality.of(false), Locality.forCurrentConsistentHash(this.cache));
scheduleTask.accept(CacheStreamFilter.local(this.cache));
}
// If bean has expiration configuration, perform expiration task on close
Consumer<Bean<K, V>> closeTask = (this.expiration != null) ? bean -> {
Expand Down Expand Up @@ -262,17 +261,18 @@ public Supplier<Batch> getBatchFactory() {

@Override
public int getActiveCount() {
return this.count(EnumSet.of(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_LOAD));
return this.count(EnumSet.of(Flag.SKIP_CACHE_LOAD));
}

@Override
public int getPassiveCount() {
return this.count(EnumSet.of(Flag.CACHE_MODE_LOCAL)) - this.getActiveCount();
return this.count(Set.of()) - this.getActiveCount();
}

private int count(Set<Flag> flags) {
try (Stream<Key<K>> keys = this.cache.getAdvancedCache().withFlags(flags).keySet().stream()) {
return (int) keys.filter(InfinispanBeanGroupKey.class::isInstance).count();
CacheStreamFilter<Map.Entry<BeanMetaDataKey<K>, M>> filter = CacheStreamFilter.local(this.cache);
try (Stream<Map.Entry<BeanMetaDataKey<K>, M>> entries = filter.apply(this.cache.getAdvancedCache().withFlags(flags).entrySet().stream())) {
return (int) entries.filter(this.filter).count();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
import java.util.Map;

import org.infinispan.util.function.SerializablePredicate;
import org.wildfly.clustering.cache.Key;
import org.wildfly.clustering.ejb.cache.bean.BeanMetaDataEntry;
import org.wildfly.clustering.ejb.cache.bean.BeanMetaDataKey;

/**
* Filters a cache for entries specific to a particular bean.
* @author Paul Ferraro
* @param <K> the bean identifier type
*/
public class InfinispanBeanMetaDataFilter<K> implements SerializablePredicate<Map.Entry<? super Key<K>, ? super Object>> {
public class InfinispanBeanMetaDataFilter<K, V> implements SerializablePredicate<Map.Entry<? super K, ? super V>> {
private static final long serialVersionUID = -1079989480899595045L;

private final String beanName;
Expand All @@ -26,18 +26,29 @@ public InfinispanBeanMetaDataFilter(String beanName) {
}

@Override
public boolean test(Map.Entry<? super Key<K>, ? super Object> entry) {
if (entry.getKey() instanceof InfinispanBeanMetaDataKey) {
public boolean test(Map.Entry<? super K, ? super V> entry) {
if (entry.getKey() instanceof BeanMetaDataKey) {
Object value = entry.getValue();
if (value instanceof BeanMetaDataEntry) {
@SuppressWarnings("unchecked")
BeanMetaDataEntry<K> metaData = (BeanMetaDataEntry<K>) value;
BeanMetaDataEntry<?> metaData = (BeanMetaDataEntry<?>) value;
return this.beanName.equals(metaData.getName());
}
}
return false;
}

@Override
public boolean equals(Object object) {
if (!(object instanceof InfinispanBeanMetaDataFilter)) return false;
InfinispanBeanMetaDataFilter<?, ?> filter = (InfinispanBeanMetaDataFilter<?, ?>) object;
return this.beanName.equals(filter.beanName);
}

@Override
public int hashCode() {
return this.beanName.hashCode();
}

@Override
public String toString() {
return this.beanName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.kohsuke.MetaInfServices;
import org.wildfly.clustering.marshalling.protostream.AbstractSerializationContextInitializer;
import org.wildfly.clustering.marshalling.protostream.ProtoStreamMarshaller;
import org.wildfly.clustering.marshalling.protostream.Scalar;
import org.wildfly.clustering.marshalling.protostream.SerializationContext;
import org.wildfly.clustering.marshalling.protostream.SerializationContextInitializer;

Expand All @@ -25,5 +26,6 @@ public void registerMarshallers(SerializationContext context) {
ProtoStreamMarshaller<SessionID> sessionIdMarshaller = context.getMarshaller(SessionID.class);
context.registerMarshaller(sessionIdMarshaller.wrap((Class<InfinispanBeanMetaDataKey<SessionID>>) (Class<?>) InfinispanBeanMetaDataKey.class, InfinispanBeanMetaDataKey::getId, InfinispanBeanMetaDataKey::new));
context.registerMarshaller(sessionIdMarshaller.wrap((Class<InfinispanBeanGroupKey<SessionID>>) (Class<?>) InfinispanBeanGroupKey.class, InfinispanBeanGroupKey::getId, InfinispanBeanGroupKey::new));
context.registerMarshaller(Scalar.STRING.cast(String.class).toMarshaller(InfinispanBeanMetaDataFilter.class, Object::toString, InfinispanBeanMetaDataFilter::new));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@

package org.wildfly.clustering.ejb.infinispan.timer;

import org.wildfly.clustering.cache.CacheEntryRemover;
import org.wildfly.clustering.ejb.timer.ImmutableTimerMetaData;
import org.wildfly.clustering.ejb.timer.TimeoutListener;
import org.wildfly.clustering.ejb.timer.TimeoutMetaData;
import org.wildfly.clustering.ejb.timer.Timer;
import org.wildfly.clustering.ejb.timer.TimerManager;
import org.wildfly.clustering.ejb.timer.TimerRegistry;
import org.wildfly.clustering.server.scheduler.Scheduler;
import org.wildfly.clustering.cache.CacheEntryRemover;
import org.wildfly.clustering.ejb.timer.ImmutableTimerMetaData;
import org.wildfly.clustering.ejb.timer.TimeoutListener;

/**
* @author Paul Ferraro
Expand All @@ -21,14 +22,14 @@ public class InfinispanTimer<I> implements Timer<I> {
private final TimerManager<I> manager;
private final I id;
private final ImmutableTimerMetaData metaData;
private final Scheduler<I, ImmutableTimerMetaData> scheduler;
private final Scheduler<I, TimeoutMetaData> scheduler;
private final TimeoutListener<I> listener;
private final CacheEntryRemover<I> remover;
private final TimerRegistry<I> registry;

private volatile boolean canceled = false;

public InfinispanTimer(TimerManager<I> manager, I id, ImmutableTimerMetaData metaData, Scheduler<I, ImmutableTimerMetaData> scheduler, TimeoutListener<I> listener, CacheEntryRemover<I> remover, TimerRegistry<I> registry) {
public InfinispanTimer(TimerManager<I> manager, I id, ImmutableTimerMetaData metaData, Scheduler<I, TimeoutMetaData> scheduler, TimeoutListener<I> listener, CacheEntryRemover<I> remover, TimerRegistry<I> registry) {
this.manager = manager;
this.id = id;
this.metaData = metaData;
Expand Down
Loading

0 comments on commit 4630125

Please sign in to comment.