Skip to content
This repository has been archived by the owner on Apr 16, 2023. It is now read-only.

[WIP] Proposal for property change listeners on client objects #96

Draft
wants to merge 1 commit into
base: bleeding-1.X.X
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 14 additions & 21 deletions src/api/java/de/fearnixx/jeak/teamspeak/data/BasicDataHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ public BasicDataHolder() {
values = Collections.synchronizedMap(new LinkedHashMap<>());
}

public Map<String, String> getValues() {
synchronized (LOCK) {
return values;
}
public synchronized Map<String, String> getValues() {
return values;
}

@Override
Expand All @@ -30,19 +28,18 @@ public boolean hasProperty(String key) {
}

@Override
public Optional<String> getProperty(String key) {
public synchronized Optional<String> getProperty(String key) {
synchronized (LOCK) {
return Optional.ofNullable(values.getOrDefault(key, null));
}
}

@Override
public void setProperty(String key, String value) {
synchronized (LOCK) {
if (value == null)
values.remove(key);
else
values.put(key, value);
public synchronized void setProperty(String key, String value) {
if (value == null) {
values.remove(key);
} else {
values.put(key, value);
}
}

Expand All @@ -51,18 +48,14 @@ public void setProperty(String key, Object value) {
setProperty(key, value != null ? value.toString() : null);
}

public IDataHolder copyFrom(IDataHolder other) {
synchronized (LOCK) {
this.values = new ConcurrentHashMap<>();
return merge(other);
}
public synchronized IDataHolder copyFrom(IDataHolder other) {
this.values = new ConcurrentHashMap<>();
return merge(other);
}

public IDataHolder merge(IDataHolder other) {
synchronized (LOCK) {
for (Map.Entry<String, String> entry : other.getValues().entrySet()) {
this.values.put(entry.getKey(), entry.getValue());
}
public synchronized IDataHolder merge(IDataHolder other) {
for (Map.Entry<String, String> entry : other.getValues().entrySet()) {
this.values.put(entry.getKey(), entry.getValue());
}

return this;
Expand Down
2 changes: 1 addition & 1 deletion src/api/java/de/fearnixx/jeak/teamspeak/data/IClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* <p>
* Abstract representation of online clients
*/
public interface IClient extends IDataHolder, IUser {
public interface IClient extends IDataHolder, IWatchableDataHolder, IUser {

Boolean isValid();

Expand Down
25 changes: 25 additions & 0 deletions src/api/java/de/fearnixx/jeak/teamspeak/data/IPropertyWatcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package de.fearnixx.jeak.teamspeak.data;


/**
* Functional interface class for property watchers.
*
* @param <K> the type of key that is used with the watched objects. Usually, this will be {@link String} but it could be anything suitable as a {@link java.util.Map} key.
* @param <V> the type of value that is used with the watched property. Usually, this will be {@link Object} but can be specified further.
*/
@FunctionalInterface
public interface IPropertyWatcher<K, V> {

/**
* Notifies listener that the property has been changed.
* Invocation of this method is solely for notification purposes and does not allow additional manipulation.
* Directly invoking changes on the watched key is considered an anti-pattern as it potentially results in cyclic invocation.
*
* @param property For multipurpose-reasons, the property key will be passed to the listener. However, the listening side is <em>not</em> expected to be checking for the key if this instance of listener is registered only for one property.
* @param oldValue The value that has been replaced or {@code null}.
* @param presentValue The value that is now stored or {@code null}.
*
* @apiNote while most of the time a {@code null} value can be considered an addition/deletion, this information <em>is not part</em> of this contract. What a {@code null} value actually means is determined by the watched type.
*/
void onValueChanged(K property, V oldValue, V presentValue);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package de.fearnixx.jeak.teamspeak.data;

/**
* Expansion interface for {@link IDataHolder} contracts that additionally support listening for changes in specific properties.
*/
public interface IWatchableDataHolder {

/**
* Registers a new listener on the given property.
* The listener will be notified about changes to this property, in accordance to {@link IPropertyWatcher}.
*
* @param propertyName the property key to listen to.
* @param listener the actual listener. (May be a lambda-expression as {@link IPropertyWatcher} is an annotated {@link FunctionalInterface}.
*/
void watch(String propertyName, IPropertyWatcher<String, String> listener);
}
28 changes: 22 additions & 6 deletions src/main/java/de/fearnixx/jeak/teamspeak/cache/ClientCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

Expand All @@ -36,6 +33,7 @@ public class ClientCache {
private static final Logger logger = LoggerFactory.getLogger(ClientCache.class);

private final Map<Integer, TS3Client> clientCache = new ConcurrentHashMap<>(50);
private final List<Runnable> deferredListenerInvocations = new LinkedList<>();
private final Object LOCK;

@Inject
Expand Down Expand Up @@ -109,12 +107,16 @@ private void onListAnswer(IQueryEvent.IAnswer event) {
private void refreshClients(IRawQueryEvent.IMessage.IAnswer event) {
List<IRawQueryEvent.IMessage> objects = event.toList();
synchronized (LOCK) {
// Defer property listener invocations. We do not want them to mess with the cache update!
clientCache.forEach((id, cl) -> cl.deferPropListeners(deferredListenerInvocations::add));

// Generate mapping from the received message.
final Map<Integer, TS3Client> clientMapping = generateClientMapping(objects);

// Update internal mapping according to the generated mapping.
TS3Client oldClientRep;
Integer oID;
TS3Client freshClient;

Integer[] cIDs = clientCache.keySet().toArray(new Integer[0]);
for (int i = clientCache.size() - 1; i >= 0; i--) {
oID = cIDs[i];
Expand All @@ -124,7 +126,8 @@ private void refreshClients(IRawQueryEvent.IMessage.IAnswer event) {
if (freshClient == null) {
// Client removed - invalidate & remove
oldClientRep.invalidate();
clientCache.remove(oID);
TS3Client removed = clientCache.remove(oID);
removed.deferPropListeners(null);

} else {
clientMapping.remove(oID);
Expand All @@ -138,6 +141,10 @@ private void refreshClients(IRawQueryEvent.IMessage.IAnswer event) {
if (firstFill) {
logger.info("Client cache is ready.");
}

// Disable listener deferral and invoke all caught ones.
clientCache.forEach((id, cl) -> cl.deferPropListeners(null));
runDeferred();
}

logger.debug("Clientlist updated");
Expand Down Expand Up @@ -214,6 +221,15 @@ private void applyPermissions(TS3Client client) {
client.setFrwPermProvider(permService.getFrameworkProvider());
}

private void runDeferred() {
try {
deferredListenerInvocations.forEach(Runnable::run);
} catch (Exception e) {
logger.error("Exception caught in deferred property change listener!", e);
}
deferredListenerInvocations.clear();
}

public Map<Integer, IClient> getClientMap() {
synchronized (LOCK) {
return Map.copyOf(clientCache);
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/de/fearnixx/jeak/teamspeak/data/TS3ClientHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@
import de.fearnixx.jeak.teamspeak.PropertyKeys;
import de.fearnixx.jeak.teamspeak.except.ConsistencyViolationException;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Consumer;

public abstract class TS3ClientHolder extends TS3User implements IClient {

private boolean invalidated = false;
private final Map<String, ConcurrentLinkedDeque<IPropertyWatcher<String, Object>>> watchers = new ConcurrentHashMap<>();
private Consumer<Runnable> changeListenerInvokationSink = null;

public TS3ClientHolder() {
super();
Expand Down Expand Up @@ -161,4 +167,35 @@ public String toString() {
return getNickName() + '/' + getClientID() + "/db" + getClientDBID();
}

@Override
public synchronized void watch(String propertyName, IPropertyWatcher<String, String> listener) {
watchers.computeIfAbsent(propertyName, prop -> new ConcurrentLinkedDeque<>());
}

@Override
public synchronized void setProperty(String key, Object value) {
String oldValue = super.getProperty(key).orElse(null);
super.setProperty(key, value);
runListener(key, oldValue, value);
}

private synchronized void runListener(String property, Object presentValue, Object oldValue) {
var propWatchers = this.watchers.getOrDefault(property, null);
if (propWatchers != null) {
// Use weakly consistent iterator for iteration.
Runnable invocation = () ->
propWatchers.iterator()
.forEachRemaining(watcher -> watcher.onValueChanged(property, oldValue, presentValue));

if (changeListenerInvokationSink == null) {
invocation.run();
} else {
changeListenerInvokationSink.accept(invocation);
}
}
}

public synchronized void deferPropListeners(Consumer<Runnable> sink) {
changeListenerInvokationSink = sink;
}
}