Skip to content

Commit

Permalink
Add OpenTelemetry support on WorkQueueManager
Browse files Browse the repository at this point in the history
Signed-off-by: Adriano Machado <60320+ammachado@users.noreply.github.com>
  • Loading branch information
ammachado committed Sep 12, 2024
1 parent d743405 commit a977e68
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public final void setBus(Bus bus) {
imanager = bus.getExtension(InstrumentationManager.class);
if (null != imanager) {
try {
imanager.register(createManagedBeanWrapper());
imanager.register(new WorkQueueManagerImplMBeanWrapper(this));
} catch (JMException jmex) {
LOG.log(Level.WARNING, jmex.getMessage(), jmex);
}
Expand All @@ -101,10 +101,6 @@ public final void setBus(Bus bus) {
}
}

protected WorkQueueManagerImplMBeanWrapper createManagedBeanWrapper() {
return new WorkQueueManagerImplMBeanWrapper(this);
}

public synchronized AutomaticWorkQueue getAutomaticWorkQueue() {
AutomaticWorkQueue defaultQueue = getNamedWorkQueue(DEFAULT_QUEUE_NAME);
if (defaultQueue == null) {
Expand All @@ -116,8 +112,7 @@ public synchronized AutomaticWorkQueue getAutomaticWorkQueue() {
public synchronized void shutdown(boolean processRemainingTasks) {
inShutdown = true;
for (AutomaticWorkQueue q : namedQueues.values()) {
if (q instanceof AutomaticWorkQueueImpl) {
AutomaticWorkQueueImpl impl = (AutomaticWorkQueueImpl)q;
if (q instanceof AutomaticWorkQueueImpl impl) {
if (impl.isShared()) {
synchronized (impl) {
impl.removeSharedUser();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public void run() {

}

class WatchDog extends Thread {
protected class WatchDog extends Thread {
DelayQueue<DelayedTaskWrapper> delayQueue;
AtomicBoolean shutdown = new AtomicBoolean(false);

Expand Down Expand Up @@ -318,7 +318,8 @@ public void run() {
}

}
class AWQThreadFactory implements ThreadFactory {

protected class AWQThreadFactory implements ThreadFactory {
final AtomicInteger threadNumber = new AtomicInteger(1);
ThreadGroup group;
String name;
Expand Down Expand Up @@ -382,6 +383,7 @@ public void setName(String s) {
threadFactory.setName(s);
}
}

public String getName() {
return name;
}
Expand Down Expand Up @@ -483,7 +485,6 @@ public void shutdown(boolean processRemainingWorkItems) {
}
}


/**
* Gets the maximum size (capacity) of the backing queue.
* @return the maximum size (capacity) of the backing queue.
Expand All @@ -500,7 +501,6 @@ public long getSize() {
return executor == null ? 0 : executor.getQueue().size();
}


public boolean isEmpty() {
return executor == null || executor.getQueue().isEmpty();
}
Expand Down Expand Up @@ -562,24 +562,28 @@ public boolean isShutdown() {
}
return executor.isShutdown();
}

public int getLargestPoolSize() {
if (executor == null) {
return 0;
}
return executor.getLargestPoolSize();
}

public int getPoolSize() {
if (executor == null) {
return 0;
}
return executor.getPoolSize();
}

public int getActiveCount() {
if (executor == null) {
return 0;
}
return executor.getActiveCount();
}

public void update(Dictionary<String, String> config) {
String s = config.get("highWaterMark");
if (s != null) {
Expand All @@ -602,6 +606,7 @@ public void update(Dictionary<String, String> config) {
this.maxQueueSize = Integer.parseInt(s);
}
}

public Dictionary<String, String> getProperties() {
Dictionary<String, String> properties = new Hashtable<>();
NumberFormat nf = NumberFormat.getIntegerInstance();
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/resources/META-INF/cxf/bus-extensions.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
org.apache.cxf.bus.managers.PhaseManagerImpl:org.apache.cxf.phase.PhaseManager:true
org.apache.cxf.bus.managers.WorkQueueManagerImpl:org.apache.cxf.workqueue.WorkQueueManager:true
org.apache.cxf.bus.managers.CXFBusLifeCycleManager:org.apache.cxf.buslifecycle.BusLifeCycleManager:true
org.apache.cxf.bus.managers.CXFBusLifeCycleManager:org.apache.cxf.buslifecycle.BusLifeCycleManager:true
org.apache.cxf.bus.managers.ServerRegistryImpl:org.apache.cxf.endpoint.ServerRegistry:true
org.apache.cxf.bus.managers.EndpointResolverRegistryImpl:org.apache.cxf.endpoint.EndpointResolverRegistry:true
org.apache.cxf.bus.managers.HeaderManagerImpl:org.apache.cxf.headers.HeaderManager:true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.apache.cxf.tracing.opentelemetry;

import org.apache.cxf.tracing.opentelemetry.internal.CurrentContextThreadPoolExecutor;
import org.apache.cxf.workqueue.AutomaticWorkQueueImpl;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class OpenTelemetryAutomaticWorkQueueImpl extends AutomaticWorkQueueImpl {

@Override
protected ThreadPoolExecutor createThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
AutomaticWorkQueueImpl.WatchDog watchDog
) {
return new CurrentContextThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory) {

@Override
protected void terminated() {
ThreadFactory f = this.getThreadFactory();
if (f instanceof AWQThreadFactory awqThreadFactory) {
awqThreadFactory.shutdown();
}
if (watchDog != null) {
watchDog.shutdown();
}
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.cxf.tracing.opentelemetry;

import org.apache.cxf.Bus;
import org.apache.cxf.bus.managers.WorkQueueManagerImpl;
import org.apache.cxf.workqueue.AutomaticWorkQueue;

public class OpenTelemetryWorkQueueManagerImpl extends WorkQueueManagerImpl {

public OpenTelemetryWorkQueueManagerImpl(Bus bus) {
super(bus);
}

@Override
public synchronized AutomaticWorkQueue getAutomaticWorkQueue() {
return new OpenTelemetryAutomaticWorkQueueImpl();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.apache.cxf.tracing.opentelemetry.internal;

import io.opentelemetry.context.Context;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CurrentContextThreadPoolExecutor extends ThreadPoolExecutor {

public CurrentContextThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory
) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return super.newTaskFor(Context.current().wrap(runnable), value);
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return super.newTaskFor(Context.current().wrap(callable));
}

@Override
public Future<?> submit(Runnable task) {
return super.submit(Context.current().wrap(task));
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return super.submit(Context.current().wrap(task), result);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(Context.current().wrap(task));
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return super.invokeAny(wrap(tasks));
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return super.invokeAny(wrap(tasks), timeout, unit);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return super.invokeAll(wrap(tasks));
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return super.invokeAll(wrap(tasks), timeout, unit);
}

@Override
public void execute(Runnable command) {
super.execute(Context.current().wrap(command));
}

protected static <T> Collection<? extends Callable<T>> wrap(Collection<? extends Callable<T>> tasks) {
return tasks.stream().map(task -> Context.current().wrap(task)).toList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.cxf.tracing.opentelemetry.OpenTelemetryWorkQueueManagerImpl:org.apache.cxf.workqueue.WorkQueueManager:true

0 comments on commit a977e68

Please sign in to comment.