Skip to content

Commit

Permalink
[AMQ-9530]Fix SelectorAwareVirtualTopicInterceptor ClassCastException…
Browse files Browse the repository at this point in the history
… if next is not Topic.
  • Loading branch information
NikitaShupletsov committed Aug 16, 2024
1 parent 4e3084d commit 7336803
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.activemq.broker.region.virtual;

import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;

import java.util.Optional;

public class BaseVirtualDestinationFilter extends DestinationFilter {

public BaseVirtualDestinationFilter(Destination next) {
super(next);
}

Optional<BaseDestination> getBaseDestination(Destination virtualDest) {
if (virtualDest instanceof BaseDestination) {
return Optional.of((BaseDestination) virtualDest);
} else if (virtualDest instanceof DestinationFilter) {
return Optional.ofNullable(((DestinationFilter) virtualDest).getAdaptor(BaseDestination.class));
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
package org.apache.activemq.broker.region.virtual;

import java.util.Optional;
import java.util.Set;

import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.IndirectMessageReference;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
Expand All @@ -34,7 +34,7 @@
* Creates a mapped Queue that can recover messages from subscription recovery
* policy of its Virtual Topic.
*/
public class MappedQueueFilter extends DestinationFilter {
public class MappedQueueFilter extends BaseVirtualDestinationFilter {

private final ActiveMQDestination virtualDestination;

Expand Down Expand Up @@ -65,18 +65,19 @@ public synchronized void addSubscription(ConnectionContext context, Subscription
if (virtualDest.getActiveMQDestination().isTopic() &&
(virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) {

Topic topic = (Topic) getBaseDestination(virtualDest);
if (topic != null) {
Optional<Topic> topic = getBaseDestination(virtualDest).map(Topic.class::cast);
if (topic.isPresent()) {
// re-use browse() to get recovered messages
final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination());
final Message[] messages = topic.get().getSubscriptionRecoveryPolicy().browse(topic.get().getActiveMQDestination());

// add recovered messages to subscription
for (Message message : messages) {
final Message copy = message.copy();
copy.setOriginalDestination(message.getDestination());
copy.setDestination(newDestination);
if (regionDest == null) {
regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
regionDest = regionBroker.getDestinations(newDestination).stream().findFirst()
.flatMap(this::getBaseDestination).orElse(null);
}
copy.setRegionDestination(regionDest);
sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy);
Expand All @@ -87,15 +88,6 @@ public synchronized void addSubscription(ConnectionContext context, Subscription
}
}

private BaseDestination getBaseDestination(Destination virtualDest) {
if (virtualDest instanceof BaseDestination) {
return (BaseDestination) virtualDest;
} else if (virtualDest instanceof DestinationFilter) {
return ((DestinationFilter) virtualDest).getAdaptor(BaseDestination.class);
}
return null;
}

@Override
public synchronized void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
super.removeSubscription(context, sub, lastDeliveredSequenceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package org.apache.activemq.broker.region.virtual;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
Expand All @@ -41,8 +42,12 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto

public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
super(next, virtualTopic);
selectorCachePlugin = (SubQueueSelectorCacheBroker)
((Topic)next).createConnectionContext().getBroker().getAdaptor(SubQueueSelectorCacheBroker.class);
selectorCachePlugin = getBaseDestination(next)
.map(BaseDestination::createConnectionContext)
.map(ConnectionContext::getBroker)
.map(b -> b.getAdaptor(SubQueueSelectorCacheBroker.class))
.map(SubQueueSelectorCacheBroker.class::cast)
.orElse(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
Expand All @@ -39,7 +38,7 @@
/**
* A Destination which implements <a href="https://activemq.apache.org/virtual-destinations">Virtual Topic</a>
*/
public class VirtualTopicInterceptor extends DestinationFilter {
public class VirtualTopicInterceptor extends BaseVirtualDestinationFilter {

private final String prefix;
private final String postfix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.spring.ConsumerBean;
Expand Down Expand Up @@ -99,7 +103,25 @@ protected BrokerService createBroker() throws Exception {
virtualTopic.setSelectorAware(true);
VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic});
broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
TestDestinationInterceptor testInterceptor = new TestDestinationInterceptor();
broker.setDestinationInterceptors(new DestinationInterceptor[]{testInterceptor, interceptor});
return broker;
}

private static class TestDestinationInterceptor implements DestinationInterceptor {

@Override
public org.apache.activemq.broker.region.Destination intercept(org.apache.activemq.broker.region.Destination destination) {
return new DestinationFilter(destination);
}

@Override
public void remove(org.apache.activemq.broker.region.Destination destination) {
}

@Override
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
}
}

}

0 comments on commit 7336803

Please sign in to comment.