Skip to content

Commit 01ed098

Browse files
committed
Add the ability to reply asynchronously to a ResendRequest
1 parent 1911973 commit 01ed098

File tree

9 files changed

+831
-30
lines changed

9 files changed

+831
-30
lines changed

.editorconfig

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Helps IDEA users apply some of the formatting rules enforced by checkstyle
2+
3+
root = true
4+
5+
[*.java]
6+
indent_size = 4
7+
max_line_length = 120
8+
ij_java_method_brace_style = next_line
9+
ij_java_block_brace_style = next_line
10+
ij_java_else_on_new_line = true
11+
ij_java_class_brace_style = next_line
12+
ij_java_space_after_type_cast = false
13+
ij_any_catch_on_new_line = true
14+
ij_any_spaces_around_equality_operators = true
15+
ij_java_continuation_indent_size = 4

artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestController.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package uk.co.real_logic.artio.session;
1717

1818
import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder;
19+
import uk.co.real_logic.artio.util.AsciiBuffer;
1920

2021
/**
2122
* Customer interface to control whether resend requests are responded to or not.
@@ -33,11 +34,16 @@ public interface ResendRequestController
3334
* (eg: begin sequence number > end sequence number or begin sequence number > last sent sequence number)
3435
* then this callback won't be invoked.
3536
*
37+
* SessionProxy is now also notified immediately after this method is called, with additional parameters that
38+
* allow to delay the processing of the ResendRequest. The SessionProxy can thus override the decision made by
39+
* ResendRequestController.
40+
*
3641
* @param session the session that has received the resend request.
3742
* @param resendRequest the decoded resend request in question.
3843
* @param correctedEndSeqNo the end sequence number that Artio will reply with. This is useful if, for example, the
3944
* resend request uses 0 for its endSeqNo parameter.
4045
* @param response respond to the resend request by calling methods on this object.
46+
* @see SessionProxy#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse, AsciiBuffer, int, int)
4147
*/
4248
void onResend(
4349
Session session,

artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestResponse.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
package uk.co.real_logic.artio.session;
1717

1818
import uk.co.real_logic.artio.builder.AbstractRejectEncoder;
19+
import uk.co.real_logic.artio.util.AsciiBuffer;
1920

2021
public class ResendRequestResponse
2122
{
22-
private boolean result;
23+
private boolean resendNow;
24+
private boolean delayProcessing;
2325

2426
private int refTagId;
2527
private AbstractRejectEncoder rejectEncoder;
@@ -29,7 +31,8 @@ public class ResendRequestResponse
2931
*/
3032
public void resend()
3133
{
32-
result = true;
34+
resendNow = true;
35+
delayProcessing = false;
3336
}
3437

3538
/**
@@ -41,14 +44,16 @@ public void reject(final int refTagId)
4144
{
4245
this.refTagId = refTagId;
4346

44-
result = false;
47+
resendNow = false;
48+
delayProcessing = false;
4549
}
4650

4751
public void reject(final AbstractRejectEncoder rejectEncoder)
4852
{
4953
this.rejectEncoder = rejectEncoder;
5054

51-
result = false;
55+
resendNow = false;
56+
delayProcessing = false;
5257
}
5358

5459
AbstractRejectEncoder rejectEncoder()
@@ -58,11 +63,36 @@ AbstractRejectEncoder rejectEncoder()
5863

5964
boolean result()
6065
{
61-
return result;
66+
return resendNow;
6267
}
6368

6469
int refTagId()
6570
{
6671
return refTagId;
6772
}
73+
74+
/**
75+
* Since version 0.148(?) it is possible to postpone the execution of a ResendRequest. This method indicates
76+
* that the request must not be processed nor rejected. It is the responsibility of the caller to call
77+
* Session.executeResendRequest() when ready.
78+
*
79+
* @see Session#executeResendRequest(int, int, AsciiBuffer, int, int)
80+
* @return true if response to the request must not be done immediately
81+
*/
82+
public boolean shouldDelay()
83+
{
84+
return delayProcessing;
85+
}
86+
87+
/**
88+
* This method indicates that the request must not be processed nor rejected. It is the responsibility of
89+
* the caller to call Session.executeResendRequest() when ready.
90+
*
91+
* @see Session#executeResendRequest(int, int, AsciiBuffer, int, int)
92+
*/
93+
public void delay()
94+
{
95+
resendNow = false;
96+
delayProcessing = true;
97+
}
6898
}

artio-core/src/main/java/uk/co/real_logic/artio/session/Session.java

Lines changed: 74 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2097,50 +2097,99 @@ Action onResendRequest(
20972097
final ResendRequestResponse resendRequestResponse = this.resendRequestResponse;
20982098
if (!backpressuredResendRequestResponse)
20992099
{
2100+
// historic behavior
21002101
resendRequestController.onResend(this, resendRequest, correctedEndSeqNo, resendRequestResponse);
2102+
2103+
// also invoke the proxy
2104+
if (Pressure.isBackPressured(proxy.onResend(this, resendRequest,
2105+
correctedEndSeqNo, resendRequestResponse, messageBuffer, messageOffset, messageLength)))
2106+
{
2107+
return ABORT;
2108+
}
21012109
}
21022110

21032111
if (resendRequestResponse.result())
21042112
{
2105-
final long correlationId = generateReplayCorrelationId();
2106-
2107-
// Notify the sender end point that a replay is going to happen.
2108-
if (!backpressuredResendRequestResponse || backpressuredOutboundValidResendRequest)
2113+
return executeResendRequest(
2114+
beginSeqNum, correctedEndSeqNo, oldLastReceivedMsgSeqNum, messageBuffer, messageOffset, messageLength
2115+
);
2116+
}
2117+
else if (!resendRequestResponse.shouldDelay())
2118+
{
2119+
final AbstractRejectEncoder rejectEncoder = resendRequestResponse.rejectEncoder();
2120+
if (rejectEncoder != null)
21092121
{
2110-
if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
2111-
correlationId, outboundPublication))
2112-
{
2113-
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
2114-
backpressuredResendRequestResponse = true;
2115-
backpressuredOutboundValidResendRequest = true;
2116-
return ABORT;
2117-
}
2118-
2119-
backpressuredOutboundValidResendRequest = false;
2122+
return sendCustomReject(oldLastReceivedMsgSeqNum, rejectEncoder);
21202123
}
21212124

2125+
return sendReject(msgSeqNum, resendRequestResponse.refTagId(), OTHER, oldLastReceivedMsgSeqNum);
2126+
}
2127+
else
2128+
{
2129+
return CONTINUE;
2130+
}
2131+
}
2132+
2133+
private Action executeResendRequest(
2134+
final int beginSeqNum, final int correctedEndSeqNo, final int oldLastReceivedMsgSeqNum,
2135+
final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength
2136+
)
2137+
{
2138+
final long correlationId = generateReplayCorrelationId();
2139+
2140+
// Notify the sender end point that a replay is going to happen.
2141+
if (!backpressuredResendRequestResponse || backpressuredOutboundValidResendRequest)
2142+
{
21222143
if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
2123-
correlationId, inboundPublication))
2144+
correlationId, outboundPublication))
21242145
{
2125-
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
2146+
if (lastReceivedMsgSeqNum >= 0)
2147+
{
2148+
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
2149+
}
21262150
backpressuredResendRequestResponse = true;
2151+
backpressuredOutboundValidResendRequest = true;
21272152
return ABORT;
21282153
}
21292154

2130-
backpressuredResendRequestResponse = false;
2131-
replaysInFlight++;
2132-
return CONTINUE;
2155+
backpressuredOutboundValidResendRequest = false;
21332156
}
2134-
else
2157+
2158+
if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
2159+
correlationId, inboundPublication))
21352160
{
2136-
final AbstractRejectEncoder rejectEncoder = resendRequestResponse.rejectEncoder();
2137-
if (rejectEncoder != null)
2161+
if (lastReceivedMsgSeqNum >= 0)
21382162
{
2139-
return sendCustomReject(oldLastReceivedMsgSeqNum, rejectEncoder);
2163+
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
21402164
}
2141-
2142-
return sendReject(msgSeqNum, resendRequestResponse.refTagId(), OTHER, oldLastReceivedMsgSeqNum);
2165+
backpressuredResendRequestResponse = true;
2166+
return ABORT;
21432167
}
2168+
2169+
backpressuredResendRequestResponse = false;
2170+
replaysInFlight++;
2171+
return CONTINUE;
2172+
}
2173+
2174+
2175+
/**
2176+
* Executes a resend request. Used to be done immediately when receiving such a request, but
2177+
* it is now possible to delay the execution, so this method must be called when ready.
2178+
*
2179+
* @param beginSeqNum begin sequence number found in received ResendRequest
2180+
* @param correctedEndSeqNo corrected end sequence number
2181+
* @param messageBuffer buffer containing the ResendRequest message
2182+
* @param messageOffset offset of message in buffer
2183+
* @param messageLength length of message in buffer
2184+
* @return an Action: be sure to handle back pressure!
2185+
* @see SessionProxy#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse, AsciiBuffer, int, int)
2186+
*/
2187+
public Action executeResendRequest(
2188+
final int beginSeqNum, final int correctedEndSeqNo,
2189+
final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength
2190+
)
2191+
{
2192+
return executeResendRequest(beginSeqNum, correctedEndSeqNo, -1, messageBuffer, messageOffset, messageLength);
21442193
}
21452194

21462195
private Action sendCustomReject(final int oldLastReceivedMsgSeqNum, final AbstractRejectEncoder rejectEncoder)

artio-core/src/main/java/uk/co/real_logic/artio/session/SessionProxy.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
*/
1616
package uk.co.real_logic.artio.session;
1717

18+
import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder;
1819
import uk.co.real_logic.artio.dictionary.FixDictionary;
1920
import uk.co.real_logic.artio.fields.RejectReason;
2021
import uk.co.real_logic.artio.messages.CancelOnDisconnectOption;
2122
import uk.co.real_logic.artio.messages.DisconnectReason;
23+
import uk.co.real_logic.artio.util.AsciiBuffer;
2224

2325
/**
2426
* A proxy that allows users to hook the sending of FIX session protocol messages through an external system. This can
@@ -116,4 +118,34 @@ long sendSequenceReset(
116118
* @return true if asynchronous, false otherwise.
117119
*/
118120
boolean isAsync();
121+
122+
/**
123+
* Equivalent to onResend() method in ResendRequestController, but with finer control. It receives the buffer
124+
* containing the ResendRequest message, so a copy can be made in case we want to delay the processing of the
125+
* Resend request.
126+
*
127+
* @param session the session that has received the resend request.
128+
* @param resendRequest the decoded resend request in question.
129+
* @param correctedEndSeqNo the end sequence number that Artio will reply with. This is useful if, for example, the
130+
* resend request uses 0 for its endSeqNo parameter.
131+
* @param response respond to the resend request by calling methods on this object.
132+
* @param messageBuffer buffer containing the ResendRequest message
133+
* @param messageOffset offset of message in buffer
134+
* @param messageLength length of message in buffer
135+
* @return a null or negative number if back pressured
136+
* @see Session#executeResendRequest(int, int, AsciiBuffer, int, int)
137+
* @see ResendRequestController#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse)
138+
*/
139+
default long onResend(
140+
Session session,
141+
AbstractResendRequestDecoder resendRequest,
142+
int correctedEndSeqNo,
143+
ResendRequestResponse response,
144+
AsciiBuffer messageBuffer,
145+
int messageOffset,
146+
int messageLength
147+
)
148+
{
149+
return 1;
150+
}
119151
}

0 commit comments

Comments
 (0)