Skip to content

Commit 1783d0f

Browse files
committed
Add support for edge case with out-of-order arriving messages.
1 parent 194eb59 commit 1783d0f

File tree

1 file changed

+38
-17
lines changed

1 file changed

+38
-17
lines changed

support/src/main/java/bisq/support/mediation/MediationRequestService.java

+38-17
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import bisq.chat.bisq_easy.open_trades.BisqEasyOpenTradeChannel;
2626
import bisq.chat.bisq_easy.open_trades.BisqEasyOpenTradeChannelService;
2727
import bisq.common.application.Service;
28+
import bisq.common.observable.Pin;
2829
import bisq.contract.bisq_easy.BisqEasyContract;
2930
import bisq.i18n.Res;
3031
import bisq.network.NetworkService;
@@ -41,11 +42,9 @@
4142
import lombok.extern.slf4j.Slf4j;
4243

4344
import java.nio.charset.StandardCharsets;
44-
import java.util.ArrayList;
45-
import java.util.Comparator;
46-
import java.util.Optional;
47-
import java.util.Set;
45+
import java.util.*;
4846
import java.util.concurrent.CompletableFuture;
47+
import java.util.concurrent.CopyOnWriteArraySet;
4948
import java.util.stream.Collectors;
5049

5150
import static com.google.common.base.Preconditions.checkArgument;
@@ -60,6 +59,8 @@ public class MediationRequestService implements Service, ConfidentialMessageServ
6059
private final BisqEasyOpenTradeChannelService bisqEasyOpenTradeChannelService;
6160
private final AuthorizedBondedRolesService authorizedBondedRolesService;
6261
private final BannedUserService bannedUserService;
62+
private final Set<MediatorsResponse> pendingMediatorsResponseMessages = new CopyOnWriteArraySet<>();
63+
private Pin channeldPin;
6364

6465
public MediationRequestService(NetworkService networkService,
6566
ChatService chatService,
@@ -89,6 +90,10 @@ public CompletableFuture<Boolean> initialize() {
8990
@Override
9091
public CompletableFuture<Boolean> shutdown() {
9192
networkService.removeConfidentialMessageListener(this);
93+
if (channeldPin != null) {
94+
channeldPin.unbind();
95+
channeldPin = null;
96+
}
9297
return CompletableFuture.completedFuture(true);
9398
}
9499

@@ -110,7 +115,7 @@ public void onMessage(EnvelopePayloadMessage envelopePayloadMessage) {
110115
/* --------------------------------------------------------------------- */
111116

112117
public void requestMediation(BisqEasyOpenTradeChannel channel,
113-
BisqEasyContract contract) {
118+
BisqEasyContract contract) {
114119
checkArgument(channel.getBisqEasyOffer().equals(contract.getOffer()));
115120
UserIdentity myUserIdentity = channel.getMyUserIdentity();
116121
checkArgument(!bannedUserService.isUserProfileBanned(myUserIdentity.getUserProfile()));
@@ -166,18 +171,34 @@ public Optional<UserProfile> selectMediator(Set<AuthorizedBondedRole> mediators,
166171

167172
private void processMediationResponse(MediatorsResponse mediatorsResponse) {
168173
bisqEasyOpenTradeChannelService.findChannelByTradeId(mediatorsResponse.getTradeId())
169-
.ifPresent(channel -> {
170-
// Requester had it activated at request time
171-
if (channel.isInMediation()) {
172-
bisqEasyOpenTradeChannelService.addMediatorsResponseMessage(channel, Res.get("authorizedRole.mediator.message.toRequester"));
173-
} else {
174-
bisqEasyOpenTradeChannelService.setIsInMediation(channel, true);
175-
bisqEasyOpenTradeChannelService.addMediatorsResponseMessage(channel, Res.get("authorizedRole.mediator.message.toNonRequester"));
176-
177-
//todo (Critical) - check if we do sent from both peers
178-
// Peer who has not requested sends their messages as well, so mediator can be sure to get all messages
179-
}
180-
});
174+
.ifPresentOrElse(channel -> {
175+
// Requester had it activated at request time
176+
if (channel.isInMediation()) {
177+
bisqEasyOpenTradeChannelService.addMediatorsResponseMessage(channel, Res.get("authorizedRole.mediator.message.toRequester"));
178+
} else {
179+
bisqEasyOpenTradeChannelService.setIsInMediation(channel, true);
180+
bisqEasyOpenTradeChannelService.addMediatorsResponseMessage(channel, Res.get("authorizedRole.mediator.message.toNonRequester"));
181+
182+
//todo (Critical) - check if we do sent from both peers
183+
// Peer who has not requested sends their messages as well, so mediator can be sure to get all messages
184+
}
185+
pendingMediatorsResponseMessages.remove(mediatorsResponse);
186+
},
187+
() -> {
188+
// This handles an edge case that the MediatorsResponse arrives before the take offer request was
189+
// processed (in case we are the maker and have been offline at take offer).
190+
log.warn("We received a MediatorsResponse but did not find a matching bisqEasyOpenTradeChannel for trade ID {}.\n" +
191+
"We add it to the pendingMediatorsResponseMessages set and reprocess it once a new trade channel has been added.",
192+
mediatorsResponse.getTradeId());
193+
pendingMediatorsResponseMessages.add(mediatorsResponse);
194+
if (channeldPin == null) {
195+
channeldPin = bisqEasyOpenTradeChannelService.getChannels().addObserver(this::maybeProcessPendingMediatorsResponseMessages);
196+
}
197+
});
198+
}
199+
200+
private void maybeProcessPendingMediatorsResponseMessages() {
201+
new HashSet<>(pendingMediatorsResponseMessages).forEach(this::processMediationResponse);
181202
}
182203
}
183204

0 commit comments

Comments
 (0)