diff --git a/xmppserver/src/main/java/org/jivesoftware/openfire/muc/HistoryStrategy.java b/xmppserver/src/main/java/org/jivesoftware/openfire/muc/HistoryStrategy.java index 2b8d83c1d4..6f1416a6f2 100644 --- a/xmppserver/src/main/java/org/jivesoftware/openfire/muc/HistoryStrategy.java +++ b/xmppserver/src/main/java/org/jivesoftware/openfire/muc/HistoryStrategy.java @@ -16,7 +16,6 @@ package org.jivesoftware.openfire.muc; -import org.dom4j.Element; import org.dom4j.tree.DefaultElement; import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.muc.spi.MUCPersistenceManager; @@ -30,10 +29,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; +import java.io.*; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.Lock; @@ -55,10 +51,20 @@ public class HistoryStrategy implements Externalizable { private static final Logger Log = LoggerFactory.getLogger(HistoryStrategy.class); /** - * An unlimited cache that records MUC room messages. The key of the cache is the room JID for which a list of - * messages is recorded. + * An unlimited cache that records (references to) MUC room messages. The key of the cache is the room JID for which + * a list of messages is recorded. The value is a wrapper around a set of references for messages. These references + * are keys in the {@link #MUC_HISTORY_MESSAGES_CACHE} cache. */ - private static final Cache> MUC_HISTORY_CACHE = CacheFactory.createCache("MUC History"); + private static final Cache>> MUC_HISTORY_META_CACHE = CacheFactory.createCache("MUC History Meta"); + + /** + * An unlimited cache that holds individual messages, expected to be MUC room messages. The key of the cache is the + * reference used in {@link #MUC_HISTORY_META_CACHE}. The value is the dom4j element that backs the message stanza. + * + * This cache should only be referenced while holding a lock on the key of MUC_HISTORY_META_CACHE that represents + * the address of the room in which a message was exchanged! + */ + private static final Cache MUC_HISTORY_MESSAGES_CACHE = CacheFactory.createCache("MUC History Messages"); /** * The address of the room (expected to be a bare JID) for which this instance records message history. @@ -214,22 +220,37 @@ public void addMessage(@Nonnull final Message... packets) strategyMaxNumber = maxNumber; } - final Lock lock = MUC_HISTORY_CACHE.getLock(roomJID); + final Lock lock = MUC_HISTORY_META_CACHE.getLock(roomJID); lock.lock(); try { - final CacheableOptional optional = MUC_HISTORY_CACHE.get(roomJID); - final Messages history; + final CacheableOptional> optional = MUC_HISTORY_META_CACHE.get(roomJID); + final ConcurrentLinkedQueue references; if (optional == null || optional.isAbsent()) { - history = new Messages(); + references = new ConcurrentLinkedQueue<>(); } else { - history = optional.get(); + references = optional.get(); } for (final Message message : messages) { - history.add(message, strategyType, strategyMaxNumber); + // store message according to active strategy. + if (strategyType == Type.number) { + if (references.size() >= strategyMaxNumber) { + // We have to remove messages so the new message won't exceed the max history size. + while (!references.isEmpty() && references.size() >= strategyMaxNumber) { + final UUID oldReference = references.poll(); + MUC_HISTORY_MESSAGES_CACHE.remove(oldReference); + } + } + } + + if (strategyType == Type.all || strategyType == Type.number) { + final UUID reference = UUID.randomUUID(); + references.add(reference); + MUC_HISTORY_MESSAGES_CACHE.put(reference, (DefaultElement) message.getElement()); + } } // Explicitly add back to cache (Hazelcast won't update-by-reference). - MUC_HISTORY_CACHE.put(roomJID, CacheableOptional.of(history)); + MUC_HISTORY_META_CACHE.put(roomJID, CacheableOptional.of(references)); } finally { lock.unlock(); } @@ -255,7 +276,7 @@ boolean isHistoryEnabled() { */ protected Queue getHistoryFromCache() { // Ensure room history is in cache. Doing this outside of the lock below, to reduce the likelihood of deadlocks occurring. - if (!MUC_HISTORY_CACHE.containsKey(roomJID)) { + if (!MUC_HISTORY_META_CACHE.containsKey(roomJID)) { try { final MUCRoom room = XMPPServer.getInstance().getMultiUserChatManager().getMultiUserChatService(roomJID).getChatRoom(roomJID.getNode()); MUCPersistenceManager.loadHistory(room, getMaxNumber()); @@ -265,15 +286,22 @@ protected Queue getHistoryFromCache() { } // Obtain history from cache. - final Lock lock = MUC_HISTORY_CACHE.getLock(roomJID); + final Lock lock = MUC_HISTORY_META_CACHE.getLock(roomJID); lock.lock(); try { - final CacheableOptional optional = MUC_HISTORY_CACHE.get(roomJID); - if (optional == null || optional.isAbsent()) { - return new Messages().asCollection(); - } else { - return optional.get().asCollection(); + final CacheableOptional> optional = MUC_HISTORY_META_CACHE.get(roomJID); + final ConcurrentLinkedQueue result = new ConcurrentLinkedQueue<>(); + if (optional != null && !optional.isAbsent()) { + for (final UUID reference : optional.get()) { + final DefaultElement messageElement = MUC_HISTORY_MESSAGES_CACHE.get(reference); + if (messageElement == null) { + Log.warn("Unable to retrieve message of room {} from clustered cache by reference: {}", roomJID, reference); + } else { + result.add(new Message(messageElement, true)); + } + } } + return result; } finally { lock.unlock(); } @@ -310,10 +338,15 @@ public ListIterator getReverseMessageHistory(){ */ public void purge() { - final Lock lock = MUC_HISTORY_CACHE.getLock(roomJID); + final Lock lock = MUC_HISTORY_META_CACHE.getLock(roomJID); lock.lock(); try { - MUC_HISTORY_CACHE.put(roomJID, CacheableOptional.of(null)); + final CacheableOptional> oldReferences = MUC_HISTORY_META_CACHE.put(roomJID, CacheableOptional.of(null)); + if (oldReferences != null && oldReferences.isPresent()) { + for (final UUID oldReference : oldReferences.get()) { + MUC_HISTORY_MESSAGES_CACHE.remove(oldReference); + } + } } finally { lock.unlock(); } @@ -482,93 +515,6 @@ private boolean isSubjectChangeStrict() { return JiveGlobals.getBooleanProperty("xmpp.muc.subject.change.strict", true); } - /** - * A wrapper for a collection of Message instances that is cached. - */ - public static class Messages implements Cacheable, Externalizable - { - private ConcurrentLinkedQueue history = new ConcurrentLinkedQueue<>(); - - public Messages() {} - - public void add(Message packet, Type strategyType, int strategyMaxNumber) - { - // store message according to active strategy - if (strategyType == Type.all) { - history.add(packet); - } else if (strategyType == Type.number) { - if (history.size() >= strategyMaxNumber) { - // We have to remove messages so the new message won't exceed the max history size. - while (!history.isEmpty() && history.size() >= strategyMaxNumber) { - history.poll(); - } - } - history.add(packet); - } - } - - public Queue asCollection() - { - return history; - } - - @Override - public int getCachedSize() throws CannotCalculateSizeException - { - int size = 0; - size += CacheSizes.sizeOfObject(); // overhead of object - size += CacheSizes.sizeOfObject(); // overhead of collection. - - // OF-2498: repeated calculation of the true size of each message stanza is very resource intensive. - // To avoid performance issues, this implementation uses a size of 2k per message, which has - // empirically been observed to be roughly correct. Mileage will probably vary considerably. - size += history.size() * 2048; - - return size; - } - - @Override - public String toString() - { - // Note: this value is shown in the Openfire admin console (in the 'cache values' page). Do not expose - // privacy-sensitive data, such as message content. - return "A collection of " + history.size() + " message stanza(s)."; - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Messages messages = (Messages) o; - return history.equals(messages.history); - } - - @Override - public int hashCode() - { - return Objects.hash(history); - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - ExternalizableUtil.getInstance().writeLong(out, history.size()); - for (final Message packet : history) { - ExternalizableUtil.getInstance().writeSerializable(out, (DefaultElement) packet.getElement()); - } - } - - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - history = new ConcurrentLinkedQueue<>(); - final long size = ExternalizableUtil.getInstance().readLong(in); - for (int i=0; i { @Override public int compare(Message o1, Message o2) { diff --git a/xmppserver/src/main/java/org/jivesoftware/util/cache/CacheFactory.java b/xmppserver/src/main/java/org/jivesoftware/util/cache/CacheFactory.java index f2dfcb930e..baec6055bd 100644 --- a/xmppserver/src/main/java/org/jivesoftware/util/cache/CacheFactory.java +++ b/xmppserver/src/main/java/org/jivesoftware/util/cache/CacheFactory.java @@ -147,7 +147,8 @@ public class CacheFactory { cacheNames.put("JID Domain-parts", "jidDomainprep"); cacheNames.put("JID Resource-parts", "jidResourceprep"); cacheNames.put("Sequences", "sequences"); - cacheNames.put("MUC History", "mucHistory"); + cacheNames.put("MUC History Meta", "mucHistoryMeta"); + cacheNames.put("MUC History Messages", "mucHistoryMessages"); cacheNames.put("MUC Service Pings Sent", "mucPings"); cacheProps.put(PROPERTY_PREFIX_CACHE + "dnsRecords" + PROPERTY_SUFFIX_SIZE, 128 * 1024L); @@ -234,8 +235,10 @@ public class CacheFactory { cacheProps.put(PROPERTY_PREFIX_CACHE + "publishedItems" + PROPERTY_SUFFIX_MAX_LIFE_TIME, Duration.ofMinutes(15).toMillis()); cacheProps.put(PROPERTY_PREFIX_CACHE + "sequences" + PROPERTY_SUFFIX_SIZE, -1L); cacheProps.put(PROPERTY_PREFIX_CACHE + "sequences" + PROPERTY_SUFFIX_MAX_LIFE_TIME, -1L); - cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistory" + PROPERTY_SUFFIX_SIZE, -1L); - cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistory" + PROPERTY_SUFFIX_MAX_LIFE_TIME, -1L); + cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistoryMeta" + PROPERTY_SUFFIX_SIZE, -1L); + cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistoryMeta" + PROPERTY_SUFFIX_MAX_LIFE_TIME, -1L); + cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistoryMessages" + PROPERTY_SUFFIX_SIZE, -1L); + cacheProps.put(PROPERTY_PREFIX_CACHE + "mucHistoryMessages" + PROPERTY_SUFFIX_MAX_LIFE_TIME, -1L); cacheProps.put(PROPERTY_PREFIX_CACHE + "mucPings" + PROPERTY_SUFFIX_SIZE, -1L); cacheProps.put(PROPERTY_PREFIX_CACHE + "mucPings" + PROPERTY_SUFFIX_MAX_LIFE_TIME, Duration.ofMinutes(30).toMillis());