From 02301b290ed9c0688d7b87f5fffc8d6df8ed21ec Mon Sep 17 00:00:00 2001 From: pcdv Date: Fri, 15 Mar 2024 15:40:47 +0100 Subject: [PATCH 1/8] Add the ability to reply asynchronously to a ResendRequest --- .editorconfig | 15 + .../session/ResendRequestController.java | 6 + .../artio/session/ResendRequestResponse.java | 40 +- .../co/real_logic/artio/session/Session.java | 99 +++-- .../artio/session/SessionProxy.java | 32 ++ .../system_tests/RaceResendResetTest.java | 354 ++++++++++++++++++ .../real_logic/artio/util/DebugFIXClient.java | 98 +++++ .../co/real_logic/artio/util/DebugServer.java | 166 ++++++++ .../artio/util/FixMessageTweak.java | 51 +++ 9 files changed, 831 insertions(+), 30 deletions(-) create mode 100644 .editorconfig create mode 100644 artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java create mode 100644 artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java create mode 100644 artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java create mode 100644 artio-system-tests/src/test/java/uk/co/real_logic/artio/util/FixMessageTweak.java diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000000..2ab6bda649 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,15 @@ +# Helps IDEA users apply some of the formatting rules enforced by checkstyle + +root = true + +[*.java] +indent_size = 4 +max_line_length = 120 +ij_java_method_brace_style = next_line +ij_java_block_brace_style = next_line +ij_java_else_on_new_line = true +ij_java_class_brace_style = next_line +ij_java_space_after_type_cast = false +ij_any_catch_on_new_line = true +ij_any_spaces_around_equality_operators = true +ij_java_continuation_indent_size = 4 \ No newline at end of file diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestController.java b/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestController.java index be3b383cab..99897327ca 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestController.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestController.java @@ -16,6 +16,7 @@ package uk.co.real_logic.artio.session; import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder; +import uk.co.real_logic.artio.util.AsciiBuffer; /** * Customer interface to control whether resend requests are responded to or not. @@ -33,11 +34,16 @@ public interface ResendRequestController * (eg: begin sequence number > end sequence number or begin sequence number > last sent sequence number) * then this callback won't be invoked. * + * SessionProxy is now also notified immediately after this method is called, with additional parameters that + * allow to delay the processing of the ResendRequest. The SessionProxy can thus override the decision made by + * ResendRequestController. + * * @param session the session that has received the resend request. * @param resendRequest the decoded resend request in question. * @param correctedEndSeqNo the end sequence number that Artio will reply with. This is useful if, for example, the * resend request uses 0 for its endSeqNo parameter. * @param response respond to the resend request by calling methods on this object. + * @see SessionProxy#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse, AsciiBuffer, int, int) */ void onResend( Session session, diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestResponse.java b/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestResponse.java index d715af54f2..5f05e62287 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestResponse.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestResponse.java @@ -16,10 +16,12 @@ package uk.co.real_logic.artio.session; import uk.co.real_logic.artio.builder.AbstractRejectEncoder; +import uk.co.real_logic.artio.util.AsciiBuffer; public class ResendRequestResponse { - private boolean result; + private boolean resendNow; + private boolean delayProcessing; private int refTagId; private AbstractRejectEncoder rejectEncoder; @@ -29,7 +31,8 @@ public class ResendRequestResponse */ public void resend() { - result = true; + resendNow = true; + delayProcessing = false; } /** @@ -41,14 +44,16 @@ public void reject(final int refTagId) { this.refTagId = refTagId; - result = false; + resendNow = false; + delayProcessing = false; } public void reject(final AbstractRejectEncoder rejectEncoder) { this.rejectEncoder = rejectEncoder; - result = false; + resendNow = false; + delayProcessing = false; } AbstractRejectEncoder rejectEncoder() @@ -58,11 +63,36 @@ AbstractRejectEncoder rejectEncoder() boolean result() { - return result; + return resendNow; } int refTagId() { return refTagId; } + + /** + * Since version 0.148(?) it is possible to postpone the execution of a ResendRequest. This method indicates + * that the request must not be processed nor rejected. It is the responsibility of the caller to call + * Session.executeResendRequest() when ready. + * + * @see Session#executeResendRequest(int, int, AsciiBuffer, int, int) + * @return true if response to the request must not be done immediately + */ + public boolean shouldDelay() + { + return delayProcessing; + } + + /** + * This method indicates that the request must not be processed nor rejected. It is the responsibility of + * the caller to call Session.executeResendRequest() when ready. + * + * @see Session#executeResendRequest(int, int, AsciiBuffer, int, int) + */ + public void delay() + { + resendNow = false; + delayProcessing = true; + } } diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/session/Session.java b/artio-core/src/main/java/uk/co/real_logic/artio/session/Session.java index 4378fd117b..106d57bdef 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/session/Session.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/session/Session.java @@ -2097,50 +2097,99 @@ Action onResendRequest( final ResendRequestResponse resendRequestResponse = this.resendRequestResponse; if (!backpressuredResendRequestResponse) { + // historic behavior resendRequestController.onResend(this, resendRequest, correctedEndSeqNo, resendRequestResponse); + + // also invoke the proxy + if (Pressure.isBackPressured(proxy.onResend(this, resendRequest, + correctedEndSeqNo, resendRequestResponse, messageBuffer, messageOffset, messageLength))) + { + return ABORT; + } } if (resendRequestResponse.result()) { - final long correlationId = generateReplayCorrelationId(); - - // Notify the sender end point that a replay is going to happen. - if (!backpressuredResendRequestResponse || backpressuredOutboundValidResendRequest) + return executeResendRequest( + beginSeqNum, correctedEndSeqNo, oldLastReceivedMsgSeqNum, messageBuffer, messageOffset, messageLength + ); + } + else if (!resendRequestResponse.shouldDelay()) + { + final AbstractRejectEncoder rejectEncoder = resendRequestResponse.rejectEncoder(); + if (rejectEncoder != null) { - if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo, - correlationId, outboundPublication)) - { - lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum); - backpressuredResendRequestResponse = true; - backpressuredOutboundValidResendRequest = true; - return ABORT; - } - - backpressuredOutboundValidResendRequest = false; + return sendCustomReject(oldLastReceivedMsgSeqNum, rejectEncoder); } + return sendReject(msgSeqNum, resendRequestResponse.refTagId(), OTHER, oldLastReceivedMsgSeqNum); + } + else + { + return CONTINUE; + } + } + + private Action executeResendRequest( + final int beginSeqNum, final int correctedEndSeqNo, final int oldLastReceivedMsgSeqNum, + final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength + ) + { + final long correlationId = generateReplayCorrelationId(); + + // Notify the sender end point that a replay is going to happen. + if (!backpressuredResendRequestResponse || backpressuredOutboundValidResendRequest) + { if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo, - correlationId, inboundPublication)) + correlationId, outboundPublication)) { - lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum); + if (lastReceivedMsgSeqNum >= 0) + { + lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum); + } backpressuredResendRequestResponse = true; + backpressuredOutboundValidResendRequest = true; return ABORT; } - backpressuredResendRequestResponse = false; - replaysInFlight++; - return CONTINUE; + backpressuredOutboundValidResendRequest = false; } - else + + if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo, + correlationId, inboundPublication)) { - final AbstractRejectEncoder rejectEncoder = resendRequestResponse.rejectEncoder(); - if (rejectEncoder != null) + if (lastReceivedMsgSeqNum >= 0) { - return sendCustomReject(oldLastReceivedMsgSeqNum, rejectEncoder); + lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum); } - - return sendReject(msgSeqNum, resendRequestResponse.refTagId(), OTHER, oldLastReceivedMsgSeqNum); + backpressuredResendRequestResponse = true; + return ABORT; } + + backpressuredResendRequestResponse = false; + replaysInFlight++; + return CONTINUE; + } + + + /** + * Executes a resend request. Used to be done immediately when receiving such a request, but + * it is now possible to delay the execution, so this method must be called when ready. + * + * @param beginSeqNum begin sequence number found in received ResendRequest + * @param correctedEndSeqNo corrected end sequence number + * @param messageBuffer buffer containing the ResendRequest message + * @param messageOffset offset of message in buffer + * @param messageLength length of message in buffer + * @return an Action: be sure to handle back pressure! + * @see SessionProxy#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse, AsciiBuffer, int, int) + */ + public Action executeResendRequest( + final int beginSeqNum, final int correctedEndSeqNo, + final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength + ) + { + return executeResendRequest(beginSeqNum, correctedEndSeqNo, -1, messageBuffer, messageOffset, messageLength); } private Action sendCustomReject(final int oldLastReceivedMsgSeqNum, final AbstractRejectEncoder rejectEncoder) diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionProxy.java b/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionProxy.java index 66ac31e3f1..f629712ef0 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionProxy.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionProxy.java @@ -15,10 +15,12 @@ */ package uk.co.real_logic.artio.session; +import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder; import uk.co.real_logic.artio.dictionary.FixDictionary; import uk.co.real_logic.artio.fields.RejectReason; import uk.co.real_logic.artio.messages.CancelOnDisconnectOption; import uk.co.real_logic.artio.messages.DisconnectReason; +import uk.co.real_logic.artio.util.AsciiBuffer; /** * A proxy that allows users to hook the sending of FIX session protocol messages through an external system. This can @@ -116,4 +118,34 @@ long sendSequenceReset( * @return true if asynchronous, false otherwise. */ boolean isAsync(); + + /** + * Equivalent to onResend() method in ResendRequestController, but with finer control. It receives the buffer + * containing the ResendRequest message, so a copy can be made in case we want to delay the processing of the + * Resend request. + * + * @param session the session that has received the resend request. + * @param resendRequest the decoded resend request in question. + * @param correctedEndSeqNo the end sequence number that Artio will reply with. This is useful if, for example, the + * resend request uses 0 for its endSeqNo parameter. + * @param response respond to the resend request by calling methods on this object. + * @param messageBuffer buffer containing the ResendRequest message + * @param messageOffset offset of message in buffer + * @param messageLength length of message in buffer + * @return a null or negative number if back pressured + * @see Session#executeResendRequest(int, int, AsciiBuffer, int, int) + * @see ResendRequestController#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse) + */ + default long onResend( + Session session, + AbstractResendRequestDecoder resendRequest, + int correctedEndSeqNo, + ResendRequestResponse response, + AsciiBuffer messageBuffer, + int messageOffset, + int messageLength + ) + { + return 1; + } } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java new file mode 100644 index 0000000000..158e0a1aa4 --- /dev/null +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java @@ -0,0 +1,354 @@ +package uk.co.real_logic.artio.system_tests; + +import org.agrona.ErrorHandler; +import org.agrona.concurrent.EpochNanoClock; +import org.junit.Ignore; +import org.junit.Test; +import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder; +import uk.co.real_logic.artio.dictionary.generation.Exceptions; +import uk.co.real_logic.artio.engine.EngineConfiguration; +import uk.co.real_logic.artio.engine.FixEngine; +import uk.co.real_logic.artio.fields.EpochFractionFormat; +import uk.co.real_logic.artio.library.LibraryConfiguration; +import uk.co.real_logic.artio.protocol.GatewayPublication; +import uk.co.real_logic.artio.session.DirectSessionProxy; +import uk.co.real_logic.artio.session.ResendRequestResponse; +import uk.co.real_logic.artio.session.Session; +import uk.co.real_logic.artio.session.SessionCustomisationStrategy; +import uk.co.real_logic.artio.session.SessionIdStrategy; +import uk.co.real_logic.artio.session.SessionProxy; +import uk.co.real_logic.artio.util.AsciiBuffer; +import uk.co.real_logic.artio.util.DebugFIXClient; +import uk.co.real_logic.artio.util.DebugServer; +import uk.co.real_logic.artio.util.MutableAsciiBuffer; + +import java.io.IOException; +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static uk.co.real_logic.artio.TestFixtures.launchMediaDriver; +import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.ACCEPTOR_ID; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.INITIATOR_ID; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingConfig; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingLibraryConfig; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.connect; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingConfig; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingLibraryConfig; + +/** + * Reproduce race (issue #503) while sending ResendRequest and ResetSequence when both + * parties detect a gap on Logon. + *

+ * Also reproduces the fact that SessionProxy is not invoked when a ResetSequence message is sent during replay. + */ +public class RaceResendResetTest extends AbstractGatewayToGatewaySystemTest +{ + private boolean useProxy; + private boolean sendResendRequestCalled; + private boolean sendSequenceResetCalled; + + /** + * When positive, simulate a SessionProxy that sends outbound FIX messages asynchronously, + * through an external cluster. + */ + private long sleepBeforeSendResendRequest; + + private final ArrayList autoClose = new ArrayList<>(); + + private void launch() + { + mediaDriver = launchMediaDriver(); + launchAccepting(); + launchInitiating(); + testSystem = new TestSystem(acceptingLibrary, initiatingLibrary); + } + + private void launchInitiating() + { + final EngineConfiguration initiatingConfig = initiatingConfig(libraryAeronPort, nanoClock) + .deleteLogFileDirOnStart(true) + .initialAcceptedSessionOwner(SOLE_LIBRARY); + initiatingEngine = FixEngine.launch(initiatingConfig); + final LibraryConfiguration lib = initiatingLibraryConfig(libraryAeronPort, initiatingHandler, nanoClock); + if (useProxy) + { + lib.sessionProxyFactory(this::sessionProxyFactory); + } + initiatingLibrary = connect(lib); + } + + static class PendingResendRequest + { + final Session session; + final MutableAsciiBuffer message; + final int beginSeqNo; + final int endSeqNo; + + PendingResendRequest( + final Session session, final int beginSeqNo, final int endSeqNo, final MutableAsciiBuffer message + ) + { + this.session = session; + this.beginSeqNo = beginSeqNo; + this.endSeqNo = endSeqNo; + this.message = message; + } + + public void execute() + { + System.err.println("Execute resend request"); + session.executeResendRequest(beginSeqNo, endSeqNo, message, 0, message.capacity()); + } + } + + private void launchAccepting() + { + final EngineConfiguration acceptingConfig = acceptingConfig(port, ACCEPTOR_ID, INITIATOR_ID, nanoClock) + .deleteLogFileDirOnStart(true) + .initialAcceptedSessionOwner(SOLE_LIBRARY); + acceptingEngine = FixEngine.launch(acceptingConfig); + + final LibraryConfiguration acceptingLibraryConfig = acceptingLibraryConfig(acceptingHandler, nanoClock); + acceptingLibrary = connect(acceptingLibraryConfig); + } + + /** + * Sanity check that we can connect Artio to a debug server with canned messages. + */ + @Test + public void testDebugServer() throws IOException + { + final DebugServer srv = new DebugServer(port); + srv.setWaitForData(true); + srv.addFIXResponse( + "8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=1|52=***|98=0|108=10|141=N|35002=0|35003=0|10=024|" + ); + srv.start(); + + mediaDriver = launchMediaDriver(); + launchInitiating(); + testSystem = new TestSystem(initiatingLibrary); + connectAndAcquire(); + } + + class Proxy extends DirectSessionProxy + { + /** + * Stores details of received ResendRequest while we wait for ours to be sent. + */ + private PendingResendRequest pendingResendRequest; + + Proxy( + final int sessionBufferSize, final GatewayPublication gatewayPublication, + final SessionIdStrategy sessionIdStrategy, final SessionCustomisationStrategy customisationStrategy, + final EpochNanoClock clock, final long connectionId, final int libraryId, + final ErrorHandler errorHandler, final EpochFractionFormat epochFractionPrecision + ) + { + super(sessionBufferSize, gatewayPublication, sessionIdStrategy, customisationStrategy, clock, connectionId, + libraryId, errorHandler, epochFractionPrecision); + } + + @Override + public long onResend( + final Session session, final AbstractResendRequestDecoder resendRequest, + final int correctedEndSeqNo, final ResendRequestResponse response, + final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength + ) + { + onResendRequestReceived(session, resendRequest, correctedEndSeqNo, response, + messageBuffer, messageOffset, messageLength); + return 1; + } + + private void onResendRequestReceived( + final Session session, final AbstractResendRequestDecoder request, final int endSeqNo, + final ResendRequestResponse response, + final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength + ) + { + System.err.println("onResendRequestReceived() called"); + if (!useProxy || sleepBeforeSendResendRequest == 0) + { + response.resend(); + } + else + { + response.delay(); + final MutableAsciiBuffer buf = new MutableAsciiBuffer(new byte[messageLength]); + buf.putBytes(0, messageBuffer, messageOffset, messageLength); + pendingResendRequest = new PendingResendRequest(session, request.beginSeqNo(), endSeqNo, buf); + } + } + + @Override + public long sendResendRequest( + final int msgSeqNo, + final int beginSeqNo, + final int endSeqNo, + final int sequenceIndex, + final int lastMsgSeqNumProcessed) + { + System.err.println("sendResendRequest called with msgSeqNo = " + msgSeqNo); + sendResendRequestCalled = true; + if (sleepBeforeSendResendRequest > 0) + { + new Thread(() -> + { + try + { + Thread.sleep(sleepBeforeSendResendRequest); + } + catch (final InterruptedException ignored) + { + } + System.err.println("Executing super.sendResendRequest() after delay: msgSeqNo = " + msgSeqNo); + super.sendResendRequest(msgSeqNo, beginSeqNo, endSeqNo, sequenceIndex, lastMsgSeqNumProcessed); + if (pendingResendRequest != null) + { + pendingResendRequest.execute(); + } + else + { + System.err.println("onResend not called (async)"); + } + }).start(); + } + else + { + System.err.println("Directly executing sendResendRequest msgSeqNo = " + msgSeqNo); + super.sendResendRequest(msgSeqNo, beginSeqNo, endSeqNo, sequenceIndex, lastMsgSeqNumProcessed); + if (pendingResendRequest != null) + { + pendingResendRequest.execute(); + } + else + { + System.err.println("onResend not called (direct)"); + } + } + return 1; + } + + @Override + public long sendSequenceReset( + final int msgSeqNo, + final int newSeqNo, + final int sequenceIndex, + final int lastMsgSeqNumProcessed) + { + sendSequenceResetCalled = true; + return super.sendSequenceReset(msgSeqNo, newSeqNo, sequenceIndex, lastMsgSeqNumProcessed); + } + + @Override + public boolean isAsync() + { + return true; + } + } + + private SessionProxy sessionProxyFactory( + final int sessionBufferSize, + final GatewayPublication gatewayPublication, + final SessionIdStrategy sessionIdStrategy, + final SessionCustomisationStrategy customisationStrategy, + final EpochNanoClock clock, + final long connectionId, + final int libraryId, + final ErrorHandler errorHandler, + final EpochFractionFormat epochFractionPrecision) + { + return new Proxy(sessionBufferSize, gatewayPublication, sessionIdStrategy, customisationStrategy, + clock, connectionId, libraryId, errorHandler, epochFractionPrecision); + } + + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldNotInvertResendAndResetNoProxy() throws Exception + { + useProxy = false; + reconnectTest(); + } + + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldSendResendBeforeResetSyncProxy() throws Exception + { + useProxy = true; + sleepBeforeSendResendRequest = 0; + reconnectTest(); + } + + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldSendResendBeforeResetAsyncProxy() throws Exception + { + useProxy = true; + sleepBeforeSendResendRequest = 100; + reconnectTest(); + } + + @Ignore // SequenceReset is directly sent by replayer, does not go through SessionProxy + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldCallProxySendSequenceReset() throws Exception + { + useProxy = true; + reconnectTest(); + assertTrue("SessionProxy.sendResendRequest() not called", sendResendRequestCalled); + assertTrue("SessionProxy.sendSequenceReset() not called", sendSequenceResetCalled); + } + + private void reconnectTest() throws Exception + { + launch(); + + connectAndAcquire(); + + messagesCanBeExchanged(); + + disconnectSessions(); + Exceptions.closeAll(this::closeAcceptingEngine); + + assertEquals(3, acceptingSession.lastReceivedMsgSeqNum()); + assertEquals(3, initiatingSession.lastReceivedMsgSeqNum()); + + final DebugServer srv = new DebugServer(port); + srv.setWaitForData(true); + srv.addFIXResponse( + "8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=5|52=***|98=0|108=10|141=N|35002=0|35003=0|10=024|", + "8=FIX.4.4|9=94|35=2|49=acceptor|56=initiator|34=6|52=***|7=4|16=0|10=024|" + ); + srv.start(); + autoClose.add(srv::stop); + + connectPersistentSessions(4, 4, false); + + final DebugFIXClient exchange = new DebugFIXClient(srv.popClient(5000)); + autoClose.add(exchange::close); + exchange.popAndAssert("35=A 34=4"); + exchange.popAndAssert("35=2 34=5 7=4 16=0"); // ResendRequest now always received first + exchange.popAndAssert("35=4 34=4 36=6"); + } + + @Override + public void close() + { + for (final AutoCloseable autoCloseable : autoClose) + { + try + { + autoCloseable.close(); + } + catch (final Exception ignored) + { + } + } + super.close(); + } + + private void connectAndAcquire() + { + connectSessions(); + acceptingSession = acceptingHandler.lastSession(); + } +} diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java new file mode 100644 index 0000000000..635a7df540 --- /dev/null +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java @@ -0,0 +1,98 @@ +package uk.co.real_logic.artio.util; + +import org.junit.Assert; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Scanner; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Helper to pop FIX messages received on a socket. + * + * @see DebugServer + */ +public class DebugFIXClient +{ + private final DebugServer.HasIOStream io; + private final Thread thread; + + private final BlockingQueue> messages = new LinkedBlockingQueue<>(); + private volatile boolean disposed; + private String prefix = " <<< "; + + public DebugFIXClient(final DebugServer.HasIOStream io) + { + this.io = Objects.requireNonNull(io); + thread = new Thread(this::run, "DebugFIXClient"); + thread.start(); + } + + public void close() throws Exception + { + disposed = true; + io.in.close(); + io.in.close(); + thread.interrupt(); + thread.join(); + } + + private void run() + { + final StringBuilder s = new StringBuilder(128); + while (!disposed) + { + final Scanner scanner = new Scanner(io.in).useDelimiter("\u0001"); + Map msg = new HashMap<>(); + while (scanner.hasNext()) + { + final String fld = scanner.next(); + s.append(fld).append('|'); + final int eq = fld.indexOf('='); + final String tag = fld.substring(0, eq); + msg.put(tag, fld.substring(eq + 1)); + if (tag.equals("10")) + { + messages.add(msg); + msg = new HashMap<>(); + System.out.println(prefix + s); + s.setLength(0); + } + } + } + } + + public Map popMessage() throws InterruptedException + { + return messages.poll(5, TimeUnit.SECONDS); + } + + /** + * Pop a message and check that it contains some field/value pairs. + * + * @param tagValues a string of the form "35=5 58=Bye" + */ + public void popAndAssert(final String tagValues) throws InterruptedException + { + final Map map = popMessage(); + if (map == null) + { + throw new AssertionError("No message received"); + } + + for (final String rule : tagValues.split(" ")) + { + final String tag = rule.substring(0, rule.indexOf('=')); + final String value = map.get(tag); + Assert.assertEquals(rule, tag + "=" + value); + } + } + + public void setPrefix(final String prefix) + { + this.prefix = prefix; + } +} diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java new file mode 100644 index 0000000000..76630513f8 --- /dev/null +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java @@ -0,0 +1,166 @@ +package uk.co.real_logic.artio.util; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * A server that accepts TCP connections and is able to reply automatically with canned + * data. It can be used to simulate a FIX server in order to quickly sent specific messages. + */ +public class DebugServer +{ + private final int port; + private final Queue connectResponses; + private final BlockingQueue clients; + private final ServerSocket serverSocket; + + /** + * If true, wait until some data is received before sending prepared messages. + */ + private boolean waitForData; + + /** + * Creates a debug server listening on specified port. + * + * @param port TCP listening port + */ + public DebugServer(final int port) throws IOException + { + this.port = port; + this.connectResponses = new ConcurrentLinkedQueue<>(); + this.clients = new LinkedBlockingQueue<>(); + this.serverSocket = new ServerSocket(port); + } + + /** + * Adds a message that must be directly sent to connecting clients. Messages + * are sent in the same order they were added. + * + * @param message binary message to send to new clients + */ + public void addConnectResponse(final byte[] message) + { + connectResponses.add(message); + } + + /** + * Warning: causes problems because SendingTime and checksum needs to be regenerated + * and they are not. + * + * @param message FIX message to automatically send to new clients + */ + public void addFIXResponse(final String... message) + { + for (final String msg : message) + { + addConnectResponse(FixMessageTweak.recycle(msg)); + } + } + + /** + * Starts the debug server, accepting incoming connections and sending + * prepared data. + */ + public void start() throws IOException + { + new Thread("DebugServer-" + port) + { + @Override + public void run() + { + try + { + while (!serverSocket.isClosed()) + { + final Socket socket = serverSocket.accept(); + System.out.println("Connection accepted from " + socket.getInetAddress()); + try + { + final BufferedInputStream in = new BufferedInputStream(socket.getInputStream()); + final BufferedOutputStream out = new BufferedOutputStream(socket.getOutputStream()); + + if (!connectResponses.isEmpty() && waitForData) + { + in.mark(0); + in.read(); + in.reset(); + } + + final HasIOStream client = new HasIOStream(in, out); + sendResponses(client.out); + clients.add(client); + } + catch (final IOException e) + { + e.printStackTrace(); + } + } + } + catch (final IOException e) + { + if (!serverSocket.isClosed()) + { + e.printStackTrace(); + } + } + } + }.start(); + } + + public void stop() throws IOException + { + serverSocket.close(); + } + + /** + * Sends prepared data to the client. + * + * @param outputStream output stream for client + */ + private void sendResponses(final OutputStream outputStream) throws IOException + { + for (final byte[] response : connectResponses) + { + outputStream.write(response); + outputStream.flush(); + } + } + + public HasIOStream popClient(final long timeoutMs) throws InterruptedException + { + return clients.poll(timeoutMs, TimeUnit.MILLISECONDS); + } + + public int getPort() + { + return port; + } + + public void setWaitForData(final boolean waitForData) + { + this.waitForData = waitForData; + } + + public static class HasIOStream + { + + public final InputStream in; + public final OutputStream out; + + public HasIOStream(final InputStream in, final OutputStream out) + { + this.in = in; + this.out = out; + } + } +} diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/FixMessageTweak.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/FixMessageTweak.java new file mode 100644 index 0000000000..457106909d --- /dev/null +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/FixMessageTweak.java @@ -0,0 +1,51 @@ +package uk.co.real_logic.artio.util; + +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; + +/** + * Easily generate valid FIX messages from captured data. + */ +public class FixMessageTweak +{ + + /** + * Replaces sending time with current time and recompute BodyLength / Checksum. + * + * @param asciiMsg a FIX message where SOH character may have been replaced with '|' + * @return a FIX message ready to send on a socket + */ + public static byte[] recycle(final String asciiMsg) + { + String msg = asciiMsg; + msg = msg.replace('|', '\001'); + + // Replace Sending Time (52) with current time + final String time = LocalDateTime + .now(ZoneOffset.UTC) + .format(DateTimeFormatter.ofPattern("yyyyMMdd-HH:mm:ss.SSS")); + msg = msg.replaceAll("52=[^\u0001]*\u0001", "52=" + time + '\001'); + + // recompute body length + final int body = msg.indexOf('\001', 10) + 1; + final int trailer = msg.indexOf("10="); + msg = msg.replaceAll("9=\\d+", "9=" + (trailer - body)); + + // recompute checksum + msg = msg.replaceAll("10=[0-9]{3}", "10=" + computeChecksum(msg)); + + return msg.getBytes(StandardCharsets.UTF_8); + } + + private static String computeChecksum(final String fixMessage) + { + int checksum = 0; + for (int i = fixMessage.indexOf("10=") - 1; i >= 0; i--) + { + checksum += fixMessage.charAt(i); + } + return String.format("%03d", checksum % 256); + } +} From 3605fbfd99bcf0ab581a66ae8587a217fa1d3db3 Mon Sep 17 00:00:00 2001 From: pcdv Date: Fri, 5 Apr 2024 18:27:24 +0200 Subject: [PATCH 2/8] Stabilize test --- .../system_tests/RaceResendResetTest.java | 82 +++++++------------ .../real_logic/artio/util/DebugFIXClient.java | 21 ++++- .../co/real_logic/artio/util/DebugServer.java | 6 +- 3 files changed, 52 insertions(+), 57 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java index 158e0a1aa4..c6faac9826 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java @@ -5,7 +5,6 @@ import org.junit.Ignore; import org.junit.Test; import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder; -import uk.co.real_logic.artio.dictionary.generation.Exceptions; import uk.co.real_logic.artio.engine.EngineConfiguration; import uk.co.real_logic.artio.engine.FixEngine; import uk.co.real_logic.artio.fields.EpochFractionFormat; @@ -29,10 +28,6 @@ import static org.junit.Assert.assertTrue; import static uk.co.real_logic.artio.TestFixtures.launchMediaDriver; import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY; -import static uk.co.real_logic.artio.system_tests.SystemTestUtil.ACCEPTOR_ID; -import static uk.co.real_logic.artio.system_tests.SystemTestUtil.INITIATOR_ID; -import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingConfig; -import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingLibraryConfig; import static uk.co.real_logic.artio.system_tests.SystemTestUtil.connect; import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingConfig; import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingLibraryConfig; @@ -56,13 +51,14 @@ public class RaceResendResetTest extends AbstractGatewayToGatewaySystemTest private long sleepBeforeSendResendRequest; private final ArrayList autoClose = new ArrayList<>(); + private DebugServer initialAcceptor; - private void launch() + private void launch() throws IOException { mediaDriver = launchMediaDriver(); - launchAccepting(); + launchInitialAcceptor(); launchInitiating(); - testSystem = new TestSystem(acceptingLibrary, initiatingLibrary); + testSystem = new TestSystem(initiatingLibrary); } private void launchInitiating() @@ -103,15 +99,15 @@ public void execute() } } - private void launchAccepting() + private void launchInitialAcceptor() throws IOException { - final EngineConfiguration acceptingConfig = acceptingConfig(port, ACCEPTOR_ID, INITIATOR_ID, nanoClock) - .deleteLogFileDirOnStart(true) - .initialAcceptedSessionOwner(SOLE_LIBRARY); - acceptingEngine = FixEngine.launch(acceptingConfig); - - final LibraryConfiguration acceptingLibraryConfig = acceptingLibraryConfig(acceptingHandler, nanoClock); - acceptingLibrary = connect(acceptingLibraryConfig); + initialAcceptor = new DebugServer(port); + initialAcceptor.setWaitForData(true); + initialAcceptor.addFIXResponse( + "8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=1|52=***|98=0|108=10|141=N|35002=0|35003=0|10=024|", + "8=FIX.4.4|9=94|35=1|49=acceptor|56=initiator|34=2|52=***|112=hello|98=0|108=10|141=N|10=024|" + ); + initialAcceptor.start(); } /** @@ -154,33 +150,19 @@ class Proxy extends DirectSessionProxy @Override public long onResend( final Session session, final AbstractResendRequestDecoder resendRequest, - final int correctedEndSeqNo, final ResendRequestResponse response, + final int endSeqNo, final ResendRequestResponse response, final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength ) { - onResendRequestReceived(session, resendRequest, correctedEndSeqNo, response, - messageBuffer, messageOffset, messageLength); - return 1; - } - - private void onResendRequestReceived( - final Session session, final AbstractResendRequestDecoder request, final int endSeqNo, - final ResendRequestResponse response, - final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength - ) - { - System.err.println("onResendRequestReceived() called"); - if (!useProxy || sleepBeforeSendResendRequest == 0) - { - response.resend(); - } - else + System.err.println("onResend() called"); + if (useProxy && sleepBeforeSendResendRequest != 0) { response.delay(); final MutableAsciiBuffer buf = new MutableAsciiBuffer(new byte[messageLength]); buf.putBytes(0, messageBuffer, messageOffset, messageLength); - pendingResendRequest = new PendingResendRequest(session, request.beginSeqNo(), endSeqNo, buf); + pendingResendRequest = new PendingResendRequest(session, resendRequest.beginSeqNo(), endSeqNo, buf); } + return 1; } @Override @@ -224,10 +206,6 @@ public long sendResendRequest( { pendingResendRequest.execute(); } - else - { - System.err.println("onResend not called (direct)"); - } } return 1; } @@ -303,14 +281,14 @@ private void reconnectTest() throws Exception launch(); connectAndAcquire(); + final DebugFIXClient acc1 = new DebugFIXClient(initialAcceptor.popClient(5000)); + acc1.start(); - messagesCanBeExchanged(); - - disconnectSessions(); - Exceptions.closeAll(this::closeAcceptingEngine); - - assertEquals(3, acceptingSession.lastReceivedMsgSeqNum()); - assertEquals(3, initiatingSession.lastReceivedMsgSeqNum()); + acc1.popAndAssert("35=A 34=1"); + acc1.popAndAssert("35=0 34=2 112=hello"); + acc1.close(); + initialAcceptor.stop(); + assertEquals(2, initiatingSession.lastReceivedMsgSeqNum()); final DebugServer srv = new DebugServer(port); srv.setWaitForData(true); @@ -323,11 +301,12 @@ private void reconnectTest() throws Exception connectPersistentSessions(4, 4, false); - final DebugFIXClient exchange = new DebugFIXClient(srv.popClient(5000)); - autoClose.add(exchange::close); - exchange.popAndAssert("35=A 34=4"); - exchange.popAndAssert("35=2 34=5 7=4 16=0"); // ResendRequest now always received first - exchange.popAndAssert("35=4 34=4 36=6"); + final DebugFIXClient acc2 = new DebugFIXClient(srv.popClient(5000)); + acc2.start(); + autoClose.add(acc2::close); + acc2.popAndAssert("35=A 34=4"); + acc2.popAndAssert("35=2 34=5 7=4 16=0"); // ResendRequest now always received first + acc2.popAndAssert("35=4 34=4 36=6"); } @Override @@ -349,6 +328,5 @@ public void close() private void connectAndAcquire() { connectSessions(); - acceptingSession = acceptingHandler.lastSession(); } } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java index 635a7df540..81bca804bf 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java @@ -18,7 +18,7 @@ public class DebugFIXClient { private final DebugServer.HasIOStream io; - private final Thread thread; + private Thread thread; private final BlockingQueue> messages = new LinkedBlockingQueue<>(); private volatile boolean disposed; @@ -27,6 +27,11 @@ public class DebugFIXClient public DebugFIXClient(final DebugServer.HasIOStream io) { this.io = Objects.requireNonNull(io); + } + + public void start() + { + assert thread == null; thread = new Thread(this::run, "DebugFIXClient"); thread.start(); } @@ -36,6 +41,7 @@ public void close() throws Exception disposed = true; io.in.close(); io.in.close(); + io.socket.close(); thread.interrupt(); thread.join(); } @@ -58,7 +64,7 @@ private void run() { messages.add(msg); msg = new HashMap<>(); - System.out.println(prefix + s); + System.err.println(prefix + s); s.setLength(0); } } @@ -78,6 +84,7 @@ public Map popMessage() throws InterruptedException public void popAndAssert(final String tagValues) throws InterruptedException { final Map map = popMessage(); + System.err.println(map); if (map == null) { throw new AssertionError("No message received"); @@ -87,7 +94,15 @@ public void popAndAssert(final String tagValues) throws InterruptedException { final String tag = rule.substring(0, rule.indexOf('=')); final String value = map.get(tag); - Assert.assertEquals(rule, tag + "=" + value); + try + { + Assert.assertEquals(rule, tag + "=" + value); + } + catch (final Throwable e) + { + e.printStackTrace(); + throw e; + } } } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java index 76630513f8..78dd8e856f 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java @@ -96,7 +96,7 @@ public void run() in.reset(); } - final HasIOStream client = new HasIOStream(in, out); + final HasIOStream client = new HasIOStream(socket, in, out); sendResponses(client.out); clients.add(client); } @@ -154,11 +154,13 @@ public void setWaitForData(final boolean waitForData) public static class HasIOStream { + public final Socket socket; public final InputStream in; public final OutputStream out; - public HasIOStream(final InputStream in, final OutputStream out) + public HasIOStream(final Socket socket, final InputStream in, final OutputStream out) { + this.socket = socket; this.in = in; this.out = out; } From 670f895c17e9ef101b2385390d28d6b9bb81bb5a Mon Sep 17 00:00:00 2001 From: pcdv Date: Wed, 6 Jul 2022 17:51:50 +0200 Subject: [PATCH 3/8] Allow setting custom FIX tags in encoders (WIP) #462 --- .../artio/builder/CommonEncoderImpl.java | 45 +++++++++++++++++++ .../generation/EncoderGenerator.java | 14 +++++- .../artio/dictionary/ExampleDictionary.java | 7 +++ .../generation/EncoderGeneratorTest.java | 14 ++++++ 4 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java diff --git a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java new file mode 100644 index 0000000000..12adea47f2 --- /dev/null +++ b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java @@ -0,0 +1,45 @@ +package uk.co.real_logic.artio.builder; + +import uk.co.real_logic.artio.util.MutableAsciiBuffer; + +/** + * Class provides common implementation methods used by encoders. + */ +public class CommonEncoderImpl +{ + // TODO resizable buffer or way to set a size + protected MutableAsciiBuffer customTagsBuffer = new MutableAsciiBuffer(new byte[64]); + + protected int customTagsLength = 0; + + private int putTagHeader(final int tag) + { + int pos = customTagsLength; + pos += customTagsBuffer.putIntAscii(pos, tag); + customTagsBuffer.putByte(pos++, (byte)'='); + return pos; + } + + public CommonEncoderImpl customTag(final int tag, final int value) + { + int pos = putTagHeader(tag); + pos += customTagsBuffer.putIntAscii(pos, value); + customTagsBuffer.putSeparator(pos++); + customTagsLength = pos; + return this; + } + + public CommonEncoderImpl customTagAscii(final int tag, final CharSequence value) + { + int pos = putTagHeader(tag); + pos += customTagsBuffer.putStringWithoutLengthAscii(pos, value); + customTagsBuffer.putSeparator(pos++); + customTagsLength = pos; + return this; + } + + public void resetCustomTags() + { + customTagsLength = 0; + } +} diff --git a/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/EncoderGenerator.java b/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/EncoderGenerator.java index e4cbefa608..1d88926e0c 100644 --- a/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/EncoderGenerator.java +++ b/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/EncoderGenerator.java @@ -20,6 +20,7 @@ import org.agrona.MutableDirectBuffer; import org.agrona.concurrent.UnsafeBuffer; import org.agrona.generation.OutputManager; +import uk.co.real_logic.artio.builder.CommonEncoderImpl; import uk.co.real_logic.artio.builder.Encoder; import uk.co.real_logic.artio.builder.FieldBagEncoder; import uk.co.real_logic.artio.builder.SessionHeaderEncoder; @@ -210,6 +211,7 @@ protected void generateAggregateFile(final Aggregate aggregate, final AggregateT MutableDirectBuffer.class, UnsafeBuffer.class, AsciiSequenceView.class, + CommonEncoderImpl.class, FieldBagEncoder.class); generateAggregateClass(aggregate, aggregateType, className, out); }); @@ -342,7 +344,7 @@ private String classDeclaration( } else { - extendsClause = ""; + extendsClause = " extends CommonEncoderImpl"; } return String.format( "\n" + @@ -368,6 +370,9 @@ private String completeResetMethod( case HEADER: additionalReset = " beginStringAsCopy(DEFAULT_BEGIN_STRING, 0, DEFAULT_BEGIN_STRING.length);\n"; break; + case MESSAGE: + additionalReset = " resetCustomTags();\n"; + break; default: additionalReset = ""; } @@ -994,6 +999,13 @@ private String encodeMethod(final List entries, final AggregateType aggre if (aggregateType == AggregateType.MESSAGE) { suffix = + "\n" + + " if (customTagsLength > 0)\n" + + " {\n" + + " buffer.putBytes(position, customTagsBuffer, 0, customTagsLength);\n" + + " position += customTagsLength;\n" + + " }\n" + + "\n" + " position += trailer.startTrailer(buffer, position);\n" + "\n" + " final int messageStart = header.finishHeader(buffer, bodyStart, position - bodyStart);\n" + diff --git a/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/ExampleDictionary.java b/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/ExampleDictionary.java index 87d592f6c3..85a464f37d 100644 --- a/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/ExampleDictionary.java +++ b/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/ExampleDictionary.java @@ -423,6 +423,13 @@ public final class ExampleDictionary "8=FIX.4.4\0019=91\00135=0\001115=abc\001116=2\001117=1.1\001127=19700101-00:00:00.001" + "\001124=2\001130=2\001131=1\001404=10\001131=2\001404=20\00110=176\001"; + public static final String WITH_CUSTOM_TAGS = + "8=FIX.4.4\0019=110\00135=0\001115=abc\001116=2\001117=1.1\001127=19700101-00:00:00.001" + + "\001124=2\001130=2\001131=1\001404=10\001131=2\001404=20" + + "\00110100=42" + + "\00110101=foo" + + "\00110=227\001"; + public static final String NESTED_COMPONENT_MESSAGE = "8=FIX.4.4\0019=120\00135=0\001115=abc\001116=2\001117=1.1\001127=19700101-00:00:00.001" + "\001124=2\001130=2\001131=1\001404=10\001131=2\001404=20\001141=180\001142=2\001143=99\001143=100\001" + diff --git a/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/EncoderGeneratorTest.java b/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/EncoderGeneratorTest.java index 7312cac69e..0e4ea88e1b 100644 --- a/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/EncoderGeneratorTest.java +++ b/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/EncoderGeneratorTest.java @@ -23,6 +23,7 @@ import org.junit.BeforeClass; import org.junit.Test; import uk.co.real_logic.artio.EncodingException; +import uk.co.real_logic.artio.builder.CommonEncoderImpl; import uk.co.real_logic.artio.builder.Encoder; import uk.co.real_logic.artio.builder.FieldBagEncoder; import uk.co.real_logic.artio.fields.DecimalFloat; @@ -737,6 +738,19 @@ public void shouldValidateMissingRequiredFloatFields() throws Exception encoder.encode(buffer, 1); } + @Test + public void shouldSetCustomTags() throws Exception + { + final Encoder encoder = newHeartbeat(); + setRequiredFields(encoder); + setupComponent(encoder); + + ((CommonEncoderImpl)encoder).customTag(10100, 42); + ((CommonEncoderImpl)encoder).customTagAscii(10101, "foo"); + + assertEncodesTo(encoder, WITH_CUSTOM_TAGS); + } + @Test(expected = EncodingException.class) public void shouldValidateMissingRequiredIntFields() throws Exception { From 39befd36e3a60290154e5f18890e80840193f427 Mon Sep 17 00:00:00 2001 From: pcdv Date: Wed, 19 Jun 2024 11:21:25 +0200 Subject: [PATCH 4/8] Implement more types in CommonEncoderImpl --- .../artio/builder/CommonEncoderImpl.java | 54 ++++++++++++++++++- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java index 12adea47f2..39b567e7d6 100644 --- a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java +++ b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java @@ -7,8 +7,12 @@ */ public class CommonEncoderImpl { - // TODO resizable buffer or way to set a size - protected MutableAsciiBuffer customTagsBuffer = new MutableAsciiBuffer(new byte[64]); + protected MutableAsciiBuffer customTagsBuffer = new MutableAsciiBuffer(new byte[128]); + + public void setCustomTagsBuffer(MutableAsciiBuffer customTagsBuffer) + { + this.customTagsBuffer = customTagsBuffer; + } protected int customTagsLength = 0; @@ -20,6 +24,15 @@ private int putTagHeader(final int tag) return pos; } + public CommonEncoderImpl customTag(final int tag, final boolean value) + { + int pos = putTagHeader(tag); + pos += customTagsBuffer.putCharAscii(pos, value ? 'Y' : 'N'); + customTagsBuffer.putSeparator(pos++); + customTagsLength = pos; + return this; + } + public CommonEncoderImpl customTag(final int tag, final int value) { int pos = putTagHeader(tag); @@ -29,6 +42,33 @@ public CommonEncoderImpl customTag(final int tag, final int value) return this; } + public CommonEncoderImpl customTag(final int tag, final char value) + { + int pos = putTagHeader(tag); + pos += customTagsBuffer.putCharAscii(pos, value); + customTagsBuffer.putSeparator(pos++); + customTagsLength = pos; + return this; + } + + public CommonEncoderImpl customTag(final int tag, final long value) + { + int pos = putTagHeader(tag); + pos += customTagsBuffer.putLongAscii(pos, value); + customTagsBuffer.putSeparator(pos++); + customTagsLength = pos; + return this; + } + + public CommonEncoderImpl customTag(final int tag, final double value) + { + int pos = putTagHeader(tag); + pos += customTagsBuffer.putAscii(pos, String.valueOf(value)); + customTagsBuffer.putSeparator(pos++); + customTagsLength = pos; + return this; + } + public CommonEncoderImpl customTagAscii(final int tag, final CharSequence value) { int pos = putTagHeader(tag); @@ -38,6 +78,16 @@ public CommonEncoderImpl customTagAscii(final int tag, final CharSequence value) return this; } + public CommonEncoderImpl customTagAscii(final int tag, final byte[] value) + { + int pos = putTagHeader(tag); + customTagsBuffer.putBytes(pos, value); + pos += value.length; + customTagsBuffer.putSeparator(pos++); + customTagsLength = pos; + return this; + } + public void resetCustomTags() { customTagsLength = 0; From ad7f8263ec052ee788fb582cc1aac5bb14eefdfe Mon Sep 17 00:00:00 2001 From: pcdv Date: Wed, 19 Jun 2024 11:41:26 +0200 Subject: [PATCH 5/8] Checkstyle --- .../java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java index 39b567e7d6..e1fbf674c0 100644 --- a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java +++ b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java @@ -9,7 +9,7 @@ public class CommonEncoderImpl { protected MutableAsciiBuffer customTagsBuffer = new MutableAsciiBuffer(new byte[128]); - public void setCustomTagsBuffer(MutableAsciiBuffer customTagsBuffer) + public void setCustomTagsBuffer(final MutableAsciiBuffer customTagsBuffer) { this.customTagsBuffer = customTagsBuffer; } From ee38583b9c21ed3c1918d67c09e03b2b117f9125 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Thu, 7 Nov 2024 14:35:26 -0500 Subject: [PATCH 6/8] [Java] Upgrade to ByteBuddy 1.15.10. --- gradle/libs.versions.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index e0cd7acdfa..709db8a543 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,7 +1,7 @@ [versions] aeron = "1.46.7" agrona = "1.23.1" -byteBuddy = "1.15.7" +byteBuddy = "1.15.10" checkstyle = "10.19.0" junit = "5.11.3" gradle = "8.10.2" From 16160d7f9096b496bc984d40ad497ce8cb9de6fe Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Thu, 7 Nov 2024 14:35:41 -0500 Subject: [PATCH 7/8] [Java] Upgrade to Checkstyle 1.20.1. --- gradle/libs.versions.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 709db8a543..8e056409bb 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -2,7 +2,7 @@ aeron = "1.46.7" agrona = "1.23.1" byteBuddy = "1.15.10" -checkstyle = "10.19.0" +checkstyle = "10.20.1" junit = "5.11.3" gradle = "8.10.2" jmh = "1.37" From 80f73cdf501093248e319aa21b718a4337cddc9f Mon Sep 17 00:00:00 2001 From: pcdv Date: Wed, 13 Nov 2024 16:40:37 +0100 Subject: [PATCH 8/8] Add setUnknownTagVisitor() method in all decoders --- .../artio/builder/CommonDecoderImpl.java | 6 +++++ .../co/real_logic/artio/builder/Decoder.java | 7 +++++ .../artio/builder/UnknownTagVisitor.java | 26 +++++++++++++++++++ .../generation/DecoderGenerator.java | 10 ++++--- .../artio/dictionary/ExampleDictionary.java | 4 +++ .../DecoderGeneratorFlyweightTest.java | 13 ++++++++++ 6 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 artio-codecs/src/main/java/uk/co/real_logic/artio/builder/UnknownTagVisitor.java diff --git a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonDecoderImpl.java b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonDecoderImpl.java index aa952aba01..e3158aa3d3 100644 --- a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonDecoderImpl.java +++ b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonDecoderImpl.java @@ -31,6 +31,7 @@ public abstract class CommonDecoderImpl protected int invalidTagId = Decoder.NO_ERROR; protected int rejectReason = Decoder.NO_ERROR; protected AsciiBuffer buffer; + protected UnknownTagVisitor unknownTagVisitor; public int invalidTagId() { @@ -42,6 +43,11 @@ public int rejectReason() return rejectReason; } + public void setUnknownTagVisitor(final UnknownTagVisitor unknownTagVisitor) + { + this.unknownTagVisitor = unknownTagVisitor; + } + public int getInt( final AsciiBuffer buffer, final int startInclusive, final int endExclusive, final int tag, final boolean validation) diff --git a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/Decoder.java b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/Decoder.java index fa5111661b..f822c3a38f 100644 --- a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/Decoder.java +++ b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/Decoder.java @@ -91,4 +91,11 @@ public interface Decoder extends CharAppender * @return the encoder passed as an argument. */ Encoder toEncoder(Encoder encoder); + + /** + * Sets a visitor to be called when an unknown tag is encountered while decoding a message. + * + * @param visitor the visitor to invoke + */ + void setUnknownTagVisitor(UnknownTagVisitor visitor); } diff --git a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/UnknownTagVisitor.java b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/UnknownTagVisitor.java new file mode 100644 index 0000000000..5503d50b37 --- /dev/null +++ b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/UnknownTagVisitor.java @@ -0,0 +1,26 @@ +package uk.co.real_logic.artio.builder; + +import uk.co.real_logic.artio.util.AsciiBuffer; + +/** + * Interface for visiting unknown tags. + * + * @see CommonDecoderImpl#setUnknownTagVisitor(UnknownTagVisitor) + */ +public interface UnknownTagVisitor +{ + /** + * Called when an unknown tag is encountered while decoding a message. + * + * @param tag The tag number of the unknown tag + * @param buffer The buffer containing the unknown tag + * @param offset The offset at which the unknown tag starts + * @param length The length of the unknown tag + */ + void onUnknownTag( + int tag, + AsciiBuffer buffer, + int offset, + int length + ); +} diff --git a/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/DecoderGenerator.java b/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/DecoderGenerator.java index 54fc32a812..f88d54db61 100644 --- a/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/DecoderGenerator.java +++ b/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/DecoderGenerator.java @@ -1691,24 +1691,28 @@ private String decodeMethod(final List entries, final Aggregate aggregate final String suffix = " default:\n" + + " boolean isTrailer = " + unknownFieldPredicate(type) + ";\n" + " if (!" + CODEC_REJECT_UNKNOWN_FIELD_ENABLED + ")\n" + " {\n" + (isGroup ? " seenFields.remove(tag);\n" : " alreadyVisitedFields.remove(tag);\n") + + " if (unknownTagVisitor != null && !isTrailer)\n" + + " {\n" + + " unknownTagVisitor.onUnknownTag(tag, buffer, valueOffset, valueLength);\n" + + " }\n" + " }\n" + (isGroup ? "" : " else\n" + " {\n" + - " if (!" + unknownFieldPredicate(type) + ")\n" + + " if (!isTrailer)\n" + " {\n" + " unknownFields.add(tag);\n" + " }\n" + " }\n") + - // Skip the thing if it's a completely unknown field and you aren't validating messages " if (" + CODEC_REJECT_UNKNOWN_FIELD_ENABLED + - " || " + unknownFieldPredicate(type) + ")\n" + + " || isTrailer)\n" + " {\n" + decodeTrailerOrReturn(hasCommonCompounds, 5) + " }\n" + diff --git a/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/ExampleDictionary.java b/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/ExampleDictionary.java index 85a464f37d..fb5e26c9ae 100644 --- a/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/ExampleDictionary.java +++ b/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/ExampleDictionary.java @@ -244,6 +244,10 @@ public final class ExampleDictionary "8=FIX.4.4\0019=53\00135=0\001115=abc\001116=2\001117=A\001127=19700101-00:00:00.001" + "\00110=043\001"; + public static final String UNKNOWN_TAG_MESSAGE = + "8=FIX.4.4\0019=53\00135=0\001115=abc\00110100=FOO\001116=2\001117=A\00110101=BAR" + + "\001127=19700101-00:00:00.001\00110=043\001"; + public static final String OUT_OF_RANGE_FLOAT_VALUE_MESSAGE = "8=FIX.4.4\0019=53\00135=0\001115=abc\001116=2\001117=10000000000000000000000\001" + "127=19700101-00:00:00.001\00110=043\001"; diff --git a/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/DecoderGeneratorFlyweightTest.java b/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/DecoderGeneratorFlyweightTest.java index 703064071f..396a0f66c3 100644 --- a/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/DecoderGeneratorFlyweightTest.java +++ b/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/DecoderGeneratorFlyweightTest.java @@ -20,6 +20,8 @@ import org.junit.Test; import uk.co.real_logic.artio.builder.Decoder; +import java.util.HashMap; + import static org.junit.Assert.assertEquals; import static uk.co.real_logic.artio.dictionary.ExampleDictionary.*; import static uk.co.real_logic.artio.util.CustomMatchers.assertTargetThrows; @@ -65,4 +67,15 @@ public void shouldNotThrowWhenAccessingUnsetString() throws Exception assertTargetThrows(() -> getAsciiSequenceView(decoder, "testReqID"), IllegalArgumentException.class, "No value for optional field: TestReqID"); } + + @Test + public void shouldVisitUnknownTags() throws Exception + { + final Decoder decoder = newHeartbeat(); + final HashMap map = new HashMap<>(); + decoder.setUnknownTagVisitor((tag, buffer, offset, length) -> map.put(tag, buffer.getAscii(offset, length))); + decode(UNKNOWN_TAG_MESSAGE, decoder); + assertEquals("FOO", map.get(10100)); + assertEquals("BAR", map.get(10101)); + } }