Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OF-2530: Optimize MUC Message History cache usage #2134

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.jivesoftware.openfire.muc;

import org.dom4j.Element;
import org.dom4j.tree.DefaultElement;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.muc.spi.MUCPersistenceManager;
Expand All @@ -30,10 +29,7 @@

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.*;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
Expand All @@ -55,10 +51,20 @@ public class HistoryStrategy implements Externalizable {
private static final Logger Log = LoggerFactory.getLogger(HistoryStrategy.class);

/**
* An unlimited cache that records MUC room messages. The key of the cache is the room JID for which a list of
* messages is recorded.
* An unlimited cache that records (references to) MUC room messages. The key of the cache is the room JID for which
* a list of messages is recorded. The value is a wrapper around a set of references for messages. These references
* are keys in the {@link #MUC_HISTORY_MESSAGES_CACHE} cache.
*/
private static final Cache<JID, CacheableOptional<Messages>> MUC_HISTORY_CACHE = CacheFactory.createCache("MUC History");
private static final Cache<JID, CacheableOptional<ConcurrentLinkedQueue<UUID>>> MUC_HISTORY_META_CACHE = CacheFactory.createCache("MUC History Meta");

/**
* An unlimited cache that holds individual messages, expected to be MUC room messages. The key of the cache is the
* reference used in {@link #MUC_HISTORY_META_CACHE}. The value is the dom4j element that backs the message stanza.
*
* This cache should only be referenced while holding a lock on the key of MUC_HISTORY_META_CACHE that represents
* the address of the room in which a message was exchanged!
*/
private static final Cache<UUID, DefaultElement> MUC_HISTORY_MESSAGES_CACHE = CacheFactory.createCache("MUC History Messages");

/**
* The address of the room (expected to be a bare JID) for which this instance records message history.
Expand Down Expand Up @@ -214,22 +220,37 @@ public void addMessage(@Nonnull final Message... packets)
strategyMaxNumber = maxNumber;
}

final Lock lock = MUC_HISTORY_CACHE.getLock(roomJID);
final Lock lock = MUC_HISTORY_META_CACHE.getLock(roomJID);
lock.lock();
try {
final CacheableOptional<Messages> optional = MUC_HISTORY_CACHE.get(roomJID);
final Messages history;
final CacheableOptional<ConcurrentLinkedQueue<UUID>> optional = MUC_HISTORY_META_CACHE.get(roomJID);
final ConcurrentLinkedQueue<UUID> references;
if (optional == null || optional.isAbsent()) {
history = new Messages();
references = new ConcurrentLinkedQueue<>();
} else {
history = optional.get();
references = optional.get();
}
for (final Message message : messages) {
history.add(message, strategyType, strategyMaxNumber);
// store message according to active strategy.
if (strategyType == Type.number) {
if (references.size() >= strategyMaxNumber) {
// We have to remove messages so the new message won't exceed the max history size.
while (!references.isEmpty() && references.size() >= strategyMaxNumber) {
final UUID oldReference = references.poll();
MUC_HISTORY_MESSAGES_CACHE.remove(oldReference);
}
}
}

if (strategyType == Type.all || strategyType == Type.number) {
final UUID reference = UUID.randomUUID();
references.add(reference);
MUC_HISTORY_MESSAGES_CACHE.put(reference, (DefaultElement) message.getElement());
}
}

// Explicitly add back to cache (Hazelcast won't update-by-reference).
MUC_HISTORY_CACHE.put(roomJID, CacheableOptional.of(history));
MUC_HISTORY_META_CACHE.put(roomJID, CacheableOptional.of(references));
} finally {
lock.unlock();
}
Expand All @@ -255,7 +276,7 @@ boolean isHistoryEnabled() {
*/
protected Queue<Message> getHistoryFromCache() {
// Ensure room history is in cache. Doing this outside of the lock below, to reduce the likelihood of deadlocks occurring.
if (!MUC_HISTORY_CACHE.containsKey(roomJID)) {
if (!MUC_HISTORY_META_CACHE.containsKey(roomJID)) {
try {
final MUCRoom room = XMPPServer.getInstance().getMultiUserChatManager().getMultiUserChatService(roomJID).getChatRoom(roomJID.getNode());
MUCPersistenceManager.loadHistory(room, getMaxNumber());
Expand All @@ -265,15 +286,22 @@ protected Queue<Message> getHistoryFromCache() {
}

// Obtain history from cache.
final Lock lock = MUC_HISTORY_CACHE.getLock(roomJID);
final Lock lock = MUC_HISTORY_META_CACHE.getLock(roomJID);
lock.lock();
try {
final CacheableOptional<Messages> optional = MUC_HISTORY_CACHE.get(roomJID);
if (optional == null || optional.isAbsent()) {
return new Messages().asCollection();
} else {
return optional.get().asCollection();
final CacheableOptional<ConcurrentLinkedQueue<UUID>> optional = MUC_HISTORY_META_CACHE.get(roomJID);
final ConcurrentLinkedQueue<Message> result = new ConcurrentLinkedQueue<>();
if (optional != null && !optional.isAbsent()) {
for (final UUID reference : optional.get()) {
final DefaultElement messageElement = MUC_HISTORY_MESSAGES_CACHE.get(reference);
if (messageElement == null) {
Log.warn("Unable to retrieve message of room {} from clustered cache by reference: {}", roomJID, reference);
} else {
result.add(new Message(messageElement, true));
}
}
}
return result;
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -310,10 +338,15 @@ public ListIterator<Message> getReverseMessageHistory(){
*/
public void purge()
{
final Lock lock = MUC_HISTORY_CACHE.getLock(roomJID);
final Lock lock = MUC_HISTORY_META_CACHE.getLock(roomJID);
lock.lock();
try {
MUC_HISTORY_CACHE.put(roomJID, CacheableOptional.of(null));
final CacheableOptional<ConcurrentLinkedQueue<UUID>> oldReferences = MUC_HISTORY_META_CACHE.put(roomJID, CacheableOptional.of(null));
if (oldReferences != null && oldReferences.isPresent()) {
for (final UUID oldReference : oldReferences.get()) {
MUC_HISTORY_MESSAGES_CACHE.remove(oldReference);
}
}
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -482,93 +515,6 @@ private boolean isSubjectChangeStrict() {
return JiveGlobals.getBooleanProperty("xmpp.muc.subject.change.strict", true);
}

/**
* A wrapper for a collection of Message instances that is cached.
*/
public static class Messages implements Cacheable, Externalizable
{
private ConcurrentLinkedQueue<Message> history = new ConcurrentLinkedQueue<>();

public Messages() {}

public void add(Message packet, Type strategyType, int strategyMaxNumber)
{
// store message according to active strategy
if (strategyType == Type.all) {
history.add(packet);
} else if (strategyType == Type.number) {
if (history.size() >= strategyMaxNumber) {
// We have to remove messages so the new message won't exceed the max history size.
while (!history.isEmpty() && history.size() >= strategyMaxNumber) {
history.poll();
}
}
history.add(packet);
}
}

public Queue<Message> asCollection()
{
return history;
}

@Override
public int getCachedSize() throws CannotCalculateSizeException
{
int size = 0;
size += CacheSizes.sizeOfObject(); // overhead of object
size += CacheSizes.sizeOfObject(); // overhead of collection.

// OF-2498: repeated calculation of the true size of each message stanza is very resource intensive.
// To avoid performance issues, this implementation uses a size of 2k per message, which has
// empirically been observed to be roughly correct. Mileage will probably vary considerably.
size += history.size() * 2048;

return size;
}

@Override
public String toString()
{
// Note: this value is shown in the Openfire admin console (in the 'cache values' page). Do not expose
// privacy-sensitive data, such as message content.
return "A collection of " + history.size() + " message stanza(s).";
}

@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Messages messages = (Messages) o;
return history.equals(messages.history);
}

@Override
public int hashCode()
{
return Objects.hash(history);
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
ExternalizableUtil.getInstance().writeLong(out, history.size());
for (final Message packet : history) {
ExternalizableUtil.getInstance().writeSerializable(out, (DefaultElement) packet.getElement());
}
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
history = new ConcurrentLinkedQueue<>();
final long size = ExternalizableUtil.getInstance().readLong(in);
for (int i=0; i<size;i++) {
Element packetElement = (Element) ExternalizableUtil.getInstance().readSerializable(in);
history.add(new Message(packetElement, true));
}
}
}

private static class MessageComparator implements Comparator<Message> {
@Override
public int compare(Message o1, Message o2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ public class CacheFactory {
cacheNames.put("JID Domain-parts", "jidDomainprep");
cacheNames.put("JID Resource-parts", "jidResourceprep");
cacheNames.put("Sequences", "sequences");
cacheNames.put("MUC History", "mucHistory");
cacheNames.put("MUC History Meta", "mucHistoryMeta");
cacheNames.put("MUC History Messages", "mucHistoryMessages");
cacheNames.put("MUC Service Pings Sent", "mucPings");

cacheProps.put(PROPERTY_PREFIX_CACHE + "dnsRecords" + PROPERTY_SUFFIX_SIZE, 128 * 1024L);
Expand Down Expand Up @@ -234,8 +235,10 @@ public class CacheFactory {
cacheProps.put(PROPERTY_PREFIX_CACHE + "publishedItems" + PROPERTY_SUFFIX_MAX_LIFE_TIME, Duration.ofMinutes(15).toMillis());
cacheProps.put(PROPERTY_PREFIX_CACHE + "sequences" + PROPERTY_SUFFIX_SIZE, -1L);
cacheProps.put(PROPERTY_PREFIX_CACHE + "sequences" + PROPERTY_SUFFIX_MAX_LIFE_TIME, -1L);
cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistory" + PROPERTY_SUFFIX_SIZE, -1L);
cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistory" + PROPERTY_SUFFIX_MAX_LIFE_TIME, -1L);
cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistoryMeta" + PROPERTY_SUFFIX_SIZE, -1L);
cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistoryMeta" + PROPERTY_SUFFIX_MAX_LIFE_TIME, -1L);
cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistoryMessages" + PROPERTY_SUFFIX_SIZE, -1L);
cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistoryMessages" + PROPERTY_SUFFIX_MAX_LIFE_TIME, -1L);
cacheProps.put(PROPERTY_PREFIX_CACHE + "mucPings" + PROPERTY_SUFFIX_SIZE, -1L);
cacheProps.put(PROPERTY_PREFIX_CACHE + "mucPings" + PROPERTY_SUFFIX_MAX_LIFE_TIME, Duration.ofMinutes(30).toMillis());

Expand Down