Skip to content

Commit 665b4e1

Browse files
authored
Merge pull request #2652 from guusdk/OF-2921_Async-closure
Make close event handling asynchronous and remote session locator improvement
2 parents 561d2df + a73885d commit 665b4e1

10 files changed

+228
-121
lines changed

xmppserver/src/main/java/org/jivesoftware/openfire/ConnectionCloseListener.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2004-2008 Jive Software, 2017-2018 Ignite Realtime Foundation. All rights reserved.
2+
* Copyright (C) 2004-2008 Jive Software, 2017-2025 Ignite Realtime Foundation. All rights reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,17 +16,47 @@
1616

1717
package org.jivesoftware.openfire;
1818

19+
import javax.annotation.Nullable;
20+
import java.util.concurrent.CompletableFuture;
21+
1922
/**
2023
* Implement and register with a connection to receive notification
2124
* of the connection closing.
2225
*
2326
* @author Iain Shigeoka
27+
* @author Guus der Kinderen, guus@goodbytes.nl
2428
*/
25-
public interface ConnectionCloseListener {
29+
public interface ConnectionCloseListener
30+
{
2631
/**
27-
* Called when a connection is closed.
32+
* Called when a connection is being closed.
33+
*
34+
* @param handback The handback object associated with the connection listener during Connection.registerCloseListener()
35+
* @deprecated replaced by {@link #onConnectionClosing(Object)}
36+
*/
37+
@Deprecated(forRemoval = true) // Remove in or after Openfire 5.1.0
38+
default void onConnectionClose( Object handback ) {};
39+
40+
/**
41+
* Called when a connection is being closed.
42+
*
43+
* This method is intended to be used to start asynchronous processes. The Future that is returned is to be used
44+
* to status of such an asynchronous process.
2845
*
2946
* @param handback The handback object associated with the connection listener during Connection.registerCloseListener()
47+
* @return a Future representing pending completion of the event listener invocation.
3048
*/
31-
void onConnectionClose( Object handback );
49+
default CompletableFuture<Void> onConnectionClosing(@Nullable final Object handback)
50+
{
51+
// The default implementation is a blocking invocation of the method that is being replaced: onConnectionClose()
52+
// This is designed to facilitate a graceful migration, where pre-existing implementations of this interface
53+
// will continue to work without change. When the deprecated method is deleted, this default implementation
54+
// should also be removed.
55+
try {
56+
onConnectionClose(handback);
57+
} catch (Throwable t) {
58+
return CompletableFuture.failedFuture(t);
59+
}
60+
return CompletableFuture.completedFuture(null);
61+
}
3262
}

xmppserver/src/main/java/org/jivesoftware/openfire/SessionManager.java

Lines changed: 82 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2005-2008 Jive Software, 2017-2024 Ignite Realtime Foundation. All rights reserved.
2+
* Copyright (C) 2005-2008 Jive Software, 2017-2025 Ignite Realtime Foundation. All rights reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,6 +51,7 @@
5151
import java.net.UnknownHostException;
5252
import java.time.Duration;
5353
import java.util.*;
54+
import java.util.concurrent.CompletableFuture;
5455
import java.util.concurrent.ConcurrentHashMap;
5556
import java.util.concurrent.ConcurrentMap;
5657
import java.util.concurrent.atomic.AtomicInteger;
@@ -1332,107 +1333,131 @@ public Locale getLocaleForSession(JID address)
13321333
return session.getLanguage();
13331334
}
13341335

1335-
private class ClientSessionListener implements ConnectionCloseListener {
1336+
private class ClientSessionListener implements ConnectionCloseListener
1337+
{
13361338
/**
1337-
* Handle a session that just closed.
1339+
* Handle a client session that just closed.
13381340
*
13391341
* @param handback The session that just closed
1342+
* @return a Future representing pending completion of the event listener invocation.
13401343
*/
13411344
@Override
1342-
public void onConnectionClose(Object handback) {
1343-
try {
1344-
LocalClientSession session = (LocalClientSession) handback;
1345-
if (session.isDetached()) {
1346-
Log.debug("Closing session with address {} and streamID {} is detached already.", session.getAddress(), session.getStreamID());
1347-
return;
1348-
}
1349-
if (session.getStreamManager().getResume()) {
1350-
Log.debug("Closing session with address {} and streamID {} has SM enabled; detaching.", session.getAddress(), session.getStreamID());
1351-
session.setDetached();
1352-
return;
1353-
} else {
1354-
Log.debug("Closing session with address {} and streamID {} does not have SM enabled.", session.getAddress(), session.getStreamID());
1355-
}
1356-
try {
1357-
if ((session.getPresence().isAvailable() || !session.wasAvailable()) &&
1358-
routingTable.hasClientRoute(session.getAddress())) {
1359-
// Send an unavailable presence to the user's subscribers
1360-
// Note: This gives us a chance to send an unavailable presence to the
1361-
// entities that the user sent directed presences
1362-
Presence presence = new Presence();
1363-
presence.setType(Presence.Type.unavailable);
1364-
presence.setFrom(session.getAddress());
1365-
1366-
// Broadcast asynchronously, to reduce the likelihood of the broadcast introducing a deadlock (OF-2921).
1367-
TaskEngine.getInstance().submit(() -> router.route(presence));
1368-
}
1345+
public CompletableFuture<Void> onConnectionClosing(Object handback)
1346+
{
1347+
final LocalClientSession session = (LocalClientSession) handback;
1348+
if (session.isDetached()) {
1349+
Log.debug("Closing client session with address {} and streamID {} is detached already; this is a no-op.", session.getAddress(), session.getStreamID());
1350+
return CompletableFuture.completedFuture(null);
1351+
}
1352+
if (session.getStreamManager().getResume()) {
1353+
Log.debug("Closing client session with address {} and streamID {} has SM enabled; detaching.", session.getAddress(), session.getStreamID());
1354+
session.setDetached();
1355+
return CompletableFuture.completedFuture(null);
1356+
}
1357+
1358+
CompletableFuture<Void> result = CompletableFuture.runAsync(() -> Log.debug("Closing client session with address {} and streamID {} that does not have SM resume.", session.getAddress(), session.getStreamID()));
1359+
1360+
if ((session.getPresence().isAvailable() || !session.wasAvailable()) && routingTable.hasClientRoute(session.getAddress())) {
1361+
// Send an unavailable presence to the user's subscribers. This gives us a chance to send an
1362+
// unavailable presence to the entities that the user sent directed presences
1363+
final Presence presence = new Presence();
1364+
presence.setType(Presence.Type.unavailable);
1365+
presence.setFrom(session.getAddress());
1366+
1367+
result = result.thenRunAsync(() -> router.route(presence));
1368+
}
13691369

1370+
// In the completion stage remove the session (which means it'll be removed no matter if the previous stage had exceptions).
1371+
return result.whenComplete((v,t) -> {
1372+
try {
13701373
session.getStreamManager().onClose(router, serverAddress);
1371-
}
1372-
finally {
1373-
// Remove the session
1374+
} finally {
1375+
// Note that the session can't be removed before the unavailable presence has been sent (as session-provided data is used by the broadcast).
13741376
removeSession(session);
13751377
}
1376-
}
1377-
catch (Exception e) {
1378-
// Can't do anything about this problem...
1379-
Log.error(LocaleUtils.getLocalizedString("admin.error.close"), e);
1380-
}
1378+
});
13811379
}
13821380
}
13831381

1384-
private class IncomingServerSessionListener implements ConnectionCloseListener {
1382+
private class IncomingServerSessionListener implements ConnectionCloseListener
1383+
{
13851384
/**
1386-
* Handle a session that just closed.
1385+
* Handle an incoming server-to-server session that just closed.
13871386
*
13881387
* @param handback The session that just closed
1388+
* @return a Future representing pending completion of the event listener invocation.
13891389
*/
13901390
@Override
1391-
public void onConnectionClose(Object handback) {
1392-
LocalIncomingServerSession session = (LocalIncomingServerSession)handback;
1391+
public CompletableFuture<Void> onConnectionClosing(Object handback)
1392+
{
1393+
final LocalIncomingServerSession session = (LocalIncomingServerSession)handback;
1394+
1395+
CompletableFuture<Void> result = CompletableFuture.runAsync(() -> Log.debug("Closing incoming server session with address {} and streamID {}.", session.getAddress(), session.getStreamID()));
1396+
13931397
// Remove all the domains that were registered for this server session.
1398+
final Collection<CompletableFuture<Void>> tasks = new ArrayList<>();
13941399
for (String domain : session.getValidatedDomains()) {
1395-
unregisterIncomingServerSession(domain, session);
1400+
tasks.add(CompletableFuture.runAsync(() -> unregisterIncomingServerSession(domain, session)));
13961401
}
1402+
1403+
return result.thenCompose(e -> CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])));
13971404
}
13981405
}
13991406

1400-
private class OutgoingServerSessionListener implements ConnectionCloseListener {
1407+
private class OutgoingServerSessionListener implements ConnectionCloseListener
1408+
{
14011409
/**
1402-
* Handle a session that just closed.
1410+
* Handle an outgoing server-to-server session that just closed.
14031411
*
14041412
* @param handback The session that just closed
1413+
* @return a Future representing pending completion of the event listener invocation.
14051414
*/
14061415
@Override
1407-
public void onConnectionClose(Object handback) {
1408-
OutgoingServerSession session = (OutgoingServerSession)handback;
1416+
public CompletableFuture<Void> onConnectionClosing(Object handback)
1417+
{
1418+
final OutgoingServerSession session = (OutgoingServerSession)handback;
1419+
1420+
CompletableFuture<Void> result = CompletableFuture.runAsync(() -> Log.debug("Closing outgoing server session with address {} and streamID {}.", session.getAddress(), session.getStreamID()));
1421+
14091422
// Remove all the domains that were registered for this server session.
1423+
final Collection<CompletableFuture<Void>> tasks = new ArrayList<>();
14101424
for (DomainPair domainPair : session.getOutgoingDomainPairs()) {
1411-
// Remove the route to the session using the domain.
1412-
server.getRoutingTable().removeServerRoute(domainPair);
1425+
tasks.add(CompletableFuture.runAsync(() -> server.getRoutingTable().removeServerRoute(domainPair)));
14131426
}
1427+
1428+
return result.thenCompose(e -> CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])));
14141429
}
14151430
}
14161431

1417-
private class ConnectionMultiplexerSessionListener implements ConnectionCloseListener {
1432+
private class ConnectionMultiplexerSessionListener implements ConnectionCloseListener
1433+
{
14181434
/**
1419-
* Handle a session that just closed.
1435+
* Handle a multiplexer session that just closed.
14201436
*
14211437
* @param handback The session that just closed
1438+
* @return a Future representing pending completion of the event listener invocation.
14221439
*/
14231440
@Override
1424-
public void onConnectionClose(Object handback) {
1425-
ConnectionMultiplexerSession session = (ConnectionMultiplexerSession)handback;
1441+
public CompletableFuture<Void> onConnectionClosing(Object handback)
1442+
{
1443+
final ConnectionMultiplexerSession session = (ConnectionMultiplexerSession)handback;
1444+
final String domain = session.getAddress().getDomain();
1445+
1446+
CompletableFuture<Void> result = CompletableFuture.runAsync(() -> Log.debug("Closing multiplexer session with address {} and streamID {}.", session.getAddress(), session.getStreamID()));
1447+
14261448
// Remove all the domains that were registered for this server session
1427-
String domain = session.getAddress().getDomain();
1428-
localSessionManager.getConnnectionManagerSessions().remove(session.getAddress().toString());
1449+
result = result.thenRunAsync(() -> localSessionManager.getConnnectionManagerSessions().remove(session.getAddress().toString()));
1450+
14291451
// Remove track of the cluster node hosting the CM connection
1430-
multiplexerSessionsCache.remove(session.getAddress().toString());
1452+
result = result.thenRunAsync(() -> multiplexerSessionsCache.remove(session.getAddress().toString()));
1453+
14311454
if (getConnectionMultiplexerSessions(domain).isEmpty()) {
14321455
// Terminate ClientSessions originated from this connection manager
14331456
// that are still active since the connection manager has gone down
1434-
ConnectionMultiplexerManager.getInstance().multiplexerUnavailable(domain);
1457+
result = result.thenRunAsync(() -> ConnectionMultiplexerManager.getInstance().multiplexerUnavailable(domain));
14351458
}
1459+
1460+
return result;
14361461
}
14371462
}
14381463

xmppserver/src/main/java/org/jivesoftware/openfire/net/AbstractConnection.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@
2323
import org.slf4j.LoggerFactory;
2424

2525
import javax.annotation.Nonnull;
26-
import java.util.HashMap;
27-
import java.util.HashSet;
28-
import java.util.Map;
29-
import java.util.Set;
26+
import java.util.*;
27+
import java.util.concurrent.CompletableFuture;
3028

3129
/**
3230
* A partial implementation of the {@link org.jivesoftware.openfire.Connection} interface, implementing functionality
@@ -134,7 +132,7 @@ public void setAdditionalNamespaces(@Nonnull final Set<Namespace> additionalName
134132
@Override
135133
public void registerCloseListener(ConnectionCloseListener listener, Object callback) {
136134
if (isClosed()) {
137-
listener.onConnectionClose(callback);
135+
listener.onConnectionClosing(callback).join();
138136
}
139137
else {
140138
closeListeners.put( listener, callback );
@@ -149,17 +147,23 @@ public void removeCloseListener(ConnectionCloseListener listener) {
149147
/**
150148
* Notifies all close listeners that the connection has been closed. Used by subclasses to properly finish closing
151149
* the connection.
150+
*
151+
* @return A Future that represents the state of the close listeners invocations.
152152
*/
153-
protected void notifyCloseListeners() {
154-
for( final Map.Entry<ConnectionCloseListener, Object> entry : closeListeners.entrySet() )
153+
protected CompletableFuture<?> notifyCloseListeners()
154+
{
155+
Log.debug("Notifying close listeners of connection {}", this);
156+
final ArrayList<CompletableFuture<?>> futures = new ArrayList<>();
157+
158+
for (final Map.Entry<ConnectionCloseListener, Object> entry : closeListeners.entrySet() )
155159
{
156-
if (entry.getKey() != null) {
157-
try {
158-
entry.getKey().onConnectionClose(entry.getValue());
159-
} catch (Exception e) {
160-
Log.error("Error notifying listener: " + entry.getKey(), e);
161-
}
160+
final ConnectionCloseListener listener = entry.getKey();
161+
if (listener != null) {
162+
final Object handback = entry.getValue();
163+
futures.add(listener.onConnectionClosing(handback));
162164
}
163165
}
166+
167+
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
164168
}
165169
}

xmppserver/src/main/java/org/jivesoftware/openfire/net/ComponentStanzaHandler.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2005-2008 Jive Software, 2017-2023 Ignite Realtime Foundation. All rights reserved.
2+
* Copyright (C) 2005-2008 Jive Software, 2017-2025 Ignite Realtime Foundation. All rights reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,9 +18,7 @@
1818

1919
import org.dom4j.Element;
2020
import org.dom4j.Namespace;
21-
import org.jivesoftware.openfire.Connection;
22-
import org.jivesoftware.openfire.PacketRouter;
23-
import org.jivesoftware.openfire.XMPPServer;
21+
import org.jivesoftware.openfire.*;
2422
import org.jivesoftware.openfire.auth.UnauthorizedException;
2523
import org.jivesoftware.openfire.component.InternalComponentManager;
2624
import org.jivesoftware.openfire.session.ComponentSession;
@@ -35,6 +33,9 @@
3533
import org.xmpp.packet.PacketError;
3634
import org.xmpp.packet.Presence;
3735

36+
import javax.annotation.Nullable;
37+
import java.util.concurrent.CompletableFuture;
38+
3839
/**
3940
* Handler of XML stanzas sent by external components connected directly to the server. Received packet will
4041
* have their FROM attribute overridden to avoid spoofing.<p>
@@ -103,7 +104,12 @@ else if (extraDomain.endsWith(initialDomain)) {
103104
subdomain = extraDomain;
104105
}
105106
InternalComponentManager.getInstance().addComponent(subdomain, component);
106-
componentSession.getConnection().registerCloseListener( handback -> InternalComponentManager.getInstance().removeComponent( subdomain, (ComponentSession.ExternalComponent) handback ), component );
107+
componentSession.getConnection().registerCloseListener(new ConnectionCloseListener() {
108+
@Override
109+
public CompletableFuture<Void> onConnectionClosing(@Nullable Object handback) {
110+
return CompletableFuture.runAsync(() -> InternalComponentManager.getInstance().removeComponent(subdomain, (ComponentSession.ExternalComponent) handback));
111+
}
112+
}, component);
107113
// Send confirmation that the new domain has been registered
108114
connection.deliverRawText("<bind/>");
109115
}

xmppserver/src/main/java/org/jivesoftware/openfire/net/SocketConnection.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2005-2008 Jive Software, 2016-2024 Ignite Realtime Foundation. All rights reserved.
2+
* Copyright (C) 2005-2008 Jive Software, 2016-2025 Ignite Realtime Foundation. All rights reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -423,8 +423,13 @@ private void close(@Nullable final StreamError error, final boolean networkInter
423423
}
424424

425425
closeConnection();
426-
notifyCloseListeners();
427-
closeListeners.clear();
426+
427+
notifyCloseListeners().whenComplete((v,t) -> {
428+
closeListeners.clear();
429+
if (t != null) {
430+
Log.warn("Exception while invoking close listeners for {}", this, t);
431+
}
432+
});
428433
}
429434
}
430435

0 commit comments

Comments
 (0)