diff --git a/quickfixj-core/src/main/doc/usermanual/usage/configuration.html b/quickfixj-core/src/main/doc/usermanual/usage/configuration.html index 48a2f51800..2eef371616 100644 --- a/quickfixj-core/src/main/doc/usermanual/usage/configuration.html +++ b/quickfixj-core/src/main/doc/usermanual/usage/configuration.html @@ -76,6 +76,7 @@

QuickFIX Settings

  • Storage
  • Logging
  • Miscellaneous
  • +
  • Invalid vs Garbled Messages
  • Sample Settings File
  • @@ -420,9 +421,21 @@

    QuickFIX Settings

    + + + + + + - + @@ -1251,6 +1264,82 @@

    QuickFIX Settings

    120
    RejectGarbledMessage If RejectGarbledMessage is set to Y, garbled messages will be rejected (with a generic error message in 58/Text field) instead of ignored.
    + This is only working for messages that pass the FIX decoder and reach the engine.
    + Messages that cannot be considered a real FIX message (i.e. not starting with 8=FIX or not ending with 10=xxx) will be ignored in any case.
    + See Invalid vs Garbled Messages for further explanation. +
    Y
    N
    N
    RejectInvalidMessage If RejectInvalidMessage is set to N, only a warning will be logged on reception of message that fails data dictionary validation. If RejectInvalidMessage is set to N, only a warning will be logged on reception of message that fails data dictionary validation.
    + See Invalid vs Garbled Messages for further explanation. +
    Y
    N
    Y
    +

    Rejecting Invalid vs Garbled Messages

    + +

    + There are mainly two settings that influence QFJ's rejection behaviour: +

    + + +

    + While the first applies to messages that fail data dictionary validation, + the latter applies to messages that fail basic validity checks on the FIX protocol level. +

    + +

    Setting RejectInvalidMessage

    + +

    + If RejectInvalidMessage is set to +

    + + +

    Setting RejectGarbledMessage

    + +

    + If RejectGarbledMessage is set to +

    + + +
    Information on garbled messages
    +

    + In FIX it is legal to ignore a message under certain circumstances. Since FIX is an optimistic protocol + it expects that some errors are transient and will correct themselves with the next message transmission. + Therefore the sequence number is not incremented and a resend request is issued on the next received + message that has a higher sequence number than expected. +

    + +

    + In the case that the error is not transient, the default behaviour is not optimal because not consuming a + message sequence number can lead to follow-up problems since QFJ will wait for the message to be resent + and queue all subsequent messages until the resend request has been satisfied (i.e. infinite resend loop). +

    + + What constitutes a garbled message (taken from the FIX protocol specification): +
    +
  • BeginString (tag #8) is not the first tag in a message or is not of the format 8=FIXT.n.m.
  • +
  • BodyLength (tag #9) is not the second tag in a message or does not contain the correct byte count.
  • +
  • MsgType (tag #35) is not the third tag in a message.
  • +
  • Checksum (tag #10) is not the last tag or contains an incorrect value.
  • + + If the MsgSeqNum(tag #34) is missing a logout message should be sent terminating the FIX Connection, as this + indicates a serious application error that is likely only circumvented by software modification. +
    + +

    + You have the possibility to adapt QFJ's behaviour for some of the cases mentioned above.
    +

    +
  • If an incoming message does neither start with the BeginString tag nor does it end with the Checksum tag, the message + cannot be passed to the session and will be discarded by the FIX decoder right away.
  • +
  • Examples where the message will be rejected instead of ignored when RejectGarbledMessage=Y: + +
  • +

    Sample Settings File

    diff --git a/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java b/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java index c7c7ba3a10..71c3b11f61 100644 --- a/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java +++ b/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java @@ -81,6 +81,9 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf try { String connectionType = null; + final boolean rejectGarbledMessage = getSetting(settings, sessionID, + Session.SETTING_REJECT_GARBLED_MESSAGE, false); + final boolean rejectInvalidMessage = getSetting(settings, sessionID, Session.SETTING_REJECT_INVALID_MESSAGE, true); @@ -209,7 +212,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf resetOnLogon, resetOnLogout, resetOnDisconnect, refreshAtLogon, checkCompID, redundantResentRequestAllowed, persistMessages, useClosedIntervalForResend, testRequestDelayMultiplier, senderDefaultApplVerID, validateSequenceNumbers, - logonIntervals, resetOnError, disconnectOnError, disableHeartBeatCheck, + logonIntervals, resetOnError, disconnectOnError, disableHeartBeatCheck, rejectGarbledMessage, rejectInvalidMessage, rejectMessageOnUnhandledException, requiresOrigSendingTime, forceResendWhenCorruptedStore, allowedRemoteAddresses, validateIncomingMessage, resendRequestChunkSize, enableNextExpectedMsgSeqNum, enableLastMsgSeqNumProcessed); diff --git a/quickfixj-core/src/main/java/quickfix/InvalidMessage.java b/quickfixj-core/src/main/java/quickfix/InvalidMessage.java index 4b2b4cc3b8..c15ee2440a 100644 --- a/quickfixj-core/src/main/java/quickfix/InvalidMessage.java +++ b/quickfixj-core/src/main/java/quickfix/InvalidMessage.java @@ -25,16 +25,42 @@ */ public class InvalidMessage extends Exception { + Message fixMessage; + public InvalidMessage() { super(); } + public InvalidMessage(Message fixMessage) { + super(); + setGarbledFixMessage(fixMessage); + } + public InvalidMessage(String message) { super(message); } + + public InvalidMessage(String message, Message fixMessage) { + super(message); + setGarbledFixMessage(fixMessage); + } public InvalidMessage(String message, Throwable cause) { super(message, cause); } + public InvalidMessage(String message, Throwable cause, Message fixMessage) { + super(message, cause); + setGarbledFixMessage(fixMessage); + } + + public Message getFixMessage() { + return fixMessage; + } + + private void setGarbledFixMessage(Message fixMessage) { + this.fixMessage = fixMessage; + this.fixMessage.setGarbled(true); + } + } diff --git a/quickfixj-core/src/main/java/quickfix/Message.java b/quickfixj-core/src/main/java/quickfix/Message.java index 3b64ef53fc..b3e2d21733 100644 --- a/quickfixj-core/src/main/java/quickfix/Message.java +++ b/quickfixj-core/src/main/java/quickfix/Message.java @@ -563,11 +563,11 @@ private void validateCheckSum(String messageData) throws InvalidMessage { final int checksum = trailer.getInt(CheckSum.FIELD); if (checksum != MessageUtils.checksum(messageData)) { // message will be ignored if checksum is wrong or missing - throw new InvalidMessage("Expected CheckSum=" + MessageUtils.checksum(messageData) - + ", Received CheckSum=" + checksum + " in " + messageData); + throw MessageUtils.newInvalidMessageException("Expected CheckSum=" + MessageUtils.checksum(messageData) + + ", Received CheckSum=" + checksum + " in " + messageData, this); } } catch (final FieldNotFound e) { - throw new InvalidMessage("Field not found: " + e.field + " in " + messageData); + throw MessageUtils.newInvalidMessageException("Field not found: " + e.field + " in " + messageData, this); } } @@ -579,7 +579,7 @@ && isNextField(dd, header, BodyLength.FIELD) if (!validHeaderFieldOrder) { // Invalid message preamble (first three fields) is a serious // condition and is handled differently from other message parsing errors. - throw new InvalidMessage("Header fields out of order in " + messageData); + throw MessageUtils.newInvalidMessageException("Header fields out of order in " + messageData, MessageUtils.getMinimalMessage(messageData)); } } @@ -609,7 +609,7 @@ private String getMsgType() throws InvalidMessage { try { return header.getString(MsgType.FIELD); } catch (final FieldNotFound e) { - throw new InvalidMessage(e.getMessage() + " in " + messageData); + throw MessageUtils.newInvalidMessageException(e.getMessage() + " in " + messageData, this); } } @@ -663,7 +663,7 @@ private void parseGroup(String msgType, StringField field, DataDictionary dd, Da try { declaredGroupCount = Integer.parseInt(field.getValue()); } catch (final NumberFormatException e) { - throw new InvalidMessage("Repeating group count requires an Integer but found: " + field.getValue(), e); + throw MessageUtils.newInvalidMessageException("Repeating group count requires an Integer but found '" + field.getValue() + "' in " + messageData, this); } parent.setField(groupCountTag, field); final int firstField = rg.getDelimiterField(); @@ -828,10 +828,9 @@ static boolean isTrailerField(int field) { // Extract field // private String messageData; - private int position; - private StringField pushedBackField; + private boolean isGarbled = false; public void pushBack(StringField field) { pushedBackField = field; @@ -851,7 +850,7 @@ private StringField extractField(DataDictionary dataDictionary, FieldMap fields) final int equalsOffset = messageData.indexOf('=', position); if (equalsOffset == -1) { - throw new InvalidMessage("Equal sign not found in field" + " in " + messageData); + throw MessageUtils.newInvalidMessageException("Equal sign not found in field in " + messageData, this); } int tag; @@ -859,12 +858,12 @@ private StringField extractField(DataDictionary dataDictionary, FieldMap fields) tag = Integer.parseInt(messageData.substring(position, equalsOffset)); } catch (final NumberFormatException e) { position = messageData.indexOf('\001', position + 1) + 1; - throw new InvalidMessage("Bad tag format: " + e.getMessage() + " in " + messageData); + throw MessageUtils.newInvalidMessageException("Bad tag format: " + e.getMessage() + " in " + messageData, this); } int sohOffset = messageData.indexOf('\001', equalsOffset + 1); if (sohOffset == -1) { - throw new InvalidMessage("SOH not found at end of field: " + tag + " in " + messageData); + throw MessageUtils.newInvalidMessageException("SOH not found at end of field: " + tag + " in " + messageData, this); } if (dataDictionary != null && dataDictionary.isDataField(tag)) { @@ -878,7 +877,7 @@ private StringField extractField(DataDictionary dataDictionary, FieldMap fields) try { fieldLength = fields.getInt(lengthField); } catch (final FieldNotFound e) { - throw new InvalidMessage("Did not find length field " + e.field + " required to parse data field " + tag + " in " + messageData); + throw MessageUtils.newInvalidMessageException("Did not find length field " + e.field + " required to parse data field " + tag + " in " + messageData, this); } // since length is in bytes but data is a string, and it may also contain an SOH, @@ -889,7 +888,7 @@ private StringField extractField(DataDictionary dataDictionary, FieldMap fields) && messageData.substring(equalsOffset + 1, sohOffset).getBytes(CharsetSupport.getCharsetInstance()).length < fieldLength) { sohOffset = messageData.indexOf('\001', sohOffset + 1); if (sohOffset == -1) { - throw new InvalidMessage("SOH not found at end of field: " + tag + " in " + messageData); + throw MessageUtils.newInvalidMessageException("SOH not found at end of field: " + tag + " in " + messageData, this); } } } @@ -936,5 +935,12 @@ public static MsgType identifyType(String message) throws MessageParseError { } } + boolean isGarbled() { + return isGarbled; + } + void setGarbled(boolean isGarbled) { + this.isGarbled = isGarbled; + } + } diff --git a/quickfixj-core/src/main/java/quickfix/MessageUtils.java b/quickfixj-core/src/main/java/quickfix/MessageUtils.java index 01ade452ae..8bfc25fffb 100644 --- a/quickfixj-core/src/main/java/quickfix/MessageUtils.java +++ b/quickfixj-core/src/main/java/quickfix/MessageUtils.java @@ -92,7 +92,7 @@ private static String getFieldOrDefault(FieldMap fields, int tag, String default } /** - * Utility method for parsing a mesasge. This should only be used for parsing messages from + * Utility method for parsing a message. This should only be used for parsing messages from * FIX versions 4.4 or earlier. * * @param messageFactory @@ -175,7 +175,7 @@ private static ApplVerID getApplVerID(Session session, String messageString) } if (applVerID == null) { - throw new InvalidMessage("Can't determine ApplVerID for message"); + throw newInvalidMessageException("Can't determine ApplVerID from message " + messageString, getMinimalMessage(messageString)); } return applVerID; @@ -204,11 +204,32 @@ private static boolean isMessageType(String message, String msgType) { public static String getMessageType(String messageString) throws InvalidMessage { final String value = getStringField(messageString, 35); if (value == null) { - throw new InvalidMessage("Missing or garbled message type in " + messageString); + throw newInvalidMessageException("Missing or garbled message type in " + messageString, getMinimalMessage(messageString)); } return value; } + /** + * Tries to set MsgSeqNum and MsgType from a FIX string to a new Message. + * These fields are referenced on the outgoing Reject message. + * + * @param messageString FIX message as String + * @return New quickfix.Message with optionally set header fields MsgSeqNum + * and MsgType. + */ + static Message getMinimalMessage(String messageString) { + final Message tempMessage = new Message(); + final String seqNum = getStringField(messageString, 34); + if (seqNum != null) { + tempMessage.getHeader().setString(34, seqNum); + } + final String msgType = getStringField(messageString, 35); + if (msgType != null) { + tempMessage.getHeader().setString(35, msgType); + } + return tempMessage; + } + public static String getStringField(String messageString, int tag) { String value = null; final String tagString = Integer.toString(tag); @@ -362,4 +383,18 @@ public static int checksum(String message) { public static int length(Charset charset, String data) { return CharsetSupport.isStringEquivalent(charset) ? data.length() : data.getBytes(charset).length; } + + /** + * Returns an InvalidMessage Exception with optionally attached FIX message. + * + * @param errorMessage error description + * @param fixMessage problematic FIX message + * @return InvalidMessage Exception + */ + static InvalidMessage newInvalidMessageException(String errorMessage, Message fixMessage) { + if (fixMessage != null) { + return new InvalidMessage(errorMessage, fixMessage); + } + return new InvalidMessage(errorMessage); + } } diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 7cd25a7441..e279432a38 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -333,6 +333,15 @@ public class Session implements Closeable { */ public static final String SETTING_ENABLE_NEXT_EXPECTED_MSG_SEQ_NUM = "EnableNextExpectedMsgSeqNum"; + /** + * Reject garbled messages instead of ignoring them. + * This is only working for messages that pass the FIX decoder and reach the engine. + * Messages that cannot be considered a real FIX message (i.e. not starting with + * 8=FIX or not ending with 10=xxx) will be ignored in any case. + * Default is "N". + */ + public static final String SETTING_REJECT_GARBLED_MESSAGE = "RejectGarbledMessage"; + public static final String SETTING_REJECT_INVALID_MESSAGE = "RejectInvalidMessage"; public static final String SETTING_REJECT_MESSAGE_ON_UNHANDLED_EXCEPTION = "RejectMessageOnUnhandledException"; @@ -389,6 +398,7 @@ public class Session implements Closeable { private final boolean checkCompID; private final boolean useClosedRangeForResend; private boolean disableHeartBeatCheck = false; + private boolean rejectGarbledMessage = false; private boolean rejectInvalidMessage = false; private boolean rejectMessageOnUnhandledException = false; private boolean requiresOrigSendingTime = false; @@ -434,7 +444,7 @@ public class Session implements Closeable { logFactory, messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false, false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] { 5 }, false, false, - false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, false); + false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, false); } Session(Application application, MessageStoreFactory messageStoreFactory, SessionID sessionID, @@ -447,7 +457,7 @@ public class Session implements Closeable { boolean useClosedRangeForResend, double testRequestDelayMultiplier, DefaultApplVerID senderDefaultApplVerID, boolean validateSequenceNumbers, int[] logonIntervals, boolean resetOnError, boolean disconnectOnError, - boolean disableHeartBeatCheck, boolean rejectInvalidMessage, + boolean disableHeartBeatCheck, boolean rejectGarbledMessage, boolean rejectInvalidMessage, boolean rejectMessageOnUnhandledException, boolean requiresOrigSendingTime, boolean forceResendWhenCorruptedStore, Set allowedRemoteAddresses, boolean validateIncomingMessage, int resendRequestChunkSize, @@ -473,6 +483,7 @@ public class Session implements Closeable { this.resetOnError = resetOnError; this.disconnectOnError = disconnectOnError; this.disableHeartBeatCheck = disableHeartBeatCheck; + this.rejectGarbledMessage = rejectGarbledMessage; this.rejectInvalidMessage = rejectInvalidMessage; this.rejectMessageOnUnhandledException = rejectMessageOnUnhandledException; this.requiresOrigSendingTime = requiresOrigSendingTime; @@ -947,7 +958,7 @@ private void next(Message message, boolean isProcessingQueuedMessages) throws Fi + "' does not match the session version '" + sessionBeginString + "'"); } - if (msgType.equals(MsgType.LOGON)) { + if (MsgType.LOGON.equals(msgType)) { if (sessionID.isFIXT()) { targetDefaultApplVerID.set(new ApplVerID(message .getString(DefaultApplVerID.FIELD))); @@ -1053,7 +1064,7 @@ private void next(Message message, boolean isProcessingQueuedMessages) throws Fi generateBusinessReject(message, BusinessRejectReason.CONDITIONALLY_REQUIRED_FIELD_MISSING, e.field); } else { - if (msgType.equals(MsgType.LOGON)) { + if (MsgType.LOGON.equals(msgType)) { getLog().onErrorEvent("Required field missing from logon"); disconnect("Required field missing from logon", true); } else { @@ -1064,10 +1075,17 @@ private void next(Message message, boolean isProcessingQueuedMessages) throws Fi /* InvalidMessage means a low-level error (e.g. checksum problem) and we should ignore the message and let the problem correct itself (optimistic approach). Target sequence number is not incremented, so it will trigger a ResendRequest - on the next message that is received. */ - getLog().onErrorEvent("Skipping invalid message: " + e + ": " + getMessageToLog(message)); - if (resetOrDisconnectIfRequired(message)) { - return; + on the next message that is received. + If the message should get rejected and the seqnum get incremented, + then setting RejectGarbledMessage=Y needs to be used. */ + if (rejectGarbledMessage) { + getLog().onErrorEvent("Processing garbled message: " + e.getMessage()); + generateReject(message, "Message failed basic validity check"); + } else { + getLog().onErrorEvent("Skipping invalid message: " + e + ": " + getMessageToLog(message)); + if (resetOrDisconnectIfRequired(message)) { + return; + } } } catch (final RejectLogon e) { final String rejectMessage = e.getMessage() != null ? (": " + e) : ""; @@ -1097,7 +1115,7 @@ ignore the message and let the problem correct itself (optimistic approach). if (logErrorAndDisconnectIfRequired(e, message)) { return; } - if (msgType.equals(MsgType.LOGOUT)) { + if (MsgType.LOGOUT.equals(msgType)) { nextLogout(message); } else { generateLogout("Incorrect BeginString: " + e.getMessage()); @@ -1125,7 +1143,7 @@ ignore the message and let the problem correct itself (optimistic approach). generateBusinessReject(message, BusinessRejectReason.APPLICATION_NOT_AVAILABLE, 0); } else { - if (msgType.equals(MsgType.LOGON)) { + if (MsgType.LOGON.equals(msgType)) { disconnect("Problem processing Logon message", true); } else { generateReject(message, SessionRejectReason.OTHER, 0); @@ -1179,6 +1197,10 @@ private boolean logErrorAndDisconnectIfRequired(final Exception e, Message messa public void next(Message message) throws FieldNotFound, RejectLogon, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType, IOException, InvalidMessage { + if (rejectGarbledMessage && message.isGarbled()) { + generateReject(message, "Message failed basic validity check"); + return; + } next(message, false); } @@ -1214,7 +1236,7 @@ private boolean resetOrDisconnectIfRequired(Message msg) { } private boolean isStateRefreshNeeded(String msgType) { - return refreshMessageStoreAtLogon && !state.isInitiator() && msgType.equals(MsgType.LOGON); + return refreshMessageStoreAtLogon && !state.isInitiator() && MsgType.LOGON.equals(msgType); } private void nextReject(Message reject) throws FieldNotFound, RejectLogon, IncorrectDataFormat, @@ -1498,15 +1520,15 @@ private void generateReject(Message message, String str) throws FieldNotFound, I reject.reverseRoute(header); initializeHeader(reject.getHeader()); - final String msgType = header.getString(MsgType.FIELD); - final String msgSeqNum = header.getString(MsgSeqNum.FIELD); - if (beginString.compareTo(FixVersions.BEGINSTRING_FIX42) >= 0) { + final String msgType = (header.isSetField(MsgType.FIELD) ? header.getString(MsgType.FIELD) : null); + final String msgSeqNum = (header.isSetField(MsgSeqNum.FIELD) ? header.getString(MsgSeqNum.FIELD) : NumbersCache.get(0)); + if (beginString.compareTo(FixVersions.BEGINSTRING_FIX42) >= 0 && msgType != null) { reject.setString(RefMsgType.FIELD, msgType); } reject.setString(RefSeqNum.FIELD, msgSeqNum); // QFJ-557: Only advance the sequence number if we are at the expected number. - if (!msgType.equals(MsgType.LOGON) && !msgType.equals(MsgType.SEQUENCE_RESET) + if (!MsgType.LOGON.equals(msgType) && !MsgType.SEQUENCE_RESET.equals(msgType) && Integer.parseInt(msgSeqNum) == getExpectedTargetNum()) { state.incrNextTargetMsgSeqNum(); } @@ -1559,7 +1581,7 @@ private void generateReject(Message message, String text, int err, int field) th } if (beginString.compareTo(FixVersions.BEGINSTRING_FIX42) >= 0) { - if (!msgType.equals("")) { + if (!"".equals(msgType)) { reject.setString(RefMsgType.FIELD, msgType); } if (beginString.compareTo(FixVersions.BEGINSTRING_FIX44) > 0) { @@ -1586,7 +1608,7 @@ private void generateReject(Message message, String text, int err, int field) th state.lockTargetMsgSeqNum(); try { // QFJ-557: Only advance the sequence number if we are at the expected number. - if (!msgType.equals(MsgType.LOGON) && !msgType.equals(MsgType.SEQUENCE_RESET) + if (!MsgType.LOGON.equals(msgType) && !MsgType.SEQUENCE_RESET.equals(msgType) && msgSeqNum == getExpectedTargetNum()) { state.incrNextTargetMsgSeqNum(); } @@ -1827,12 +1849,12 @@ private void fromCallback(String msgType, Message msg, SessionID sessionID2) } private synchronized boolean validLogonState(String msgType) { - return msgType.equals(MsgType.LOGON) && state.isResetSent() || state.isResetReceived() || - msgType.equals(MsgType.LOGON) && !state.isLogonReceived() || - !msgType.equals(MsgType.LOGON) && state.isLogonReceived() || - msgType.equals(MsgType.LOGOUT) && state.isLogonSent() || - !msgType.equals(MsgType.LOGOUT) && state.isLogoutSent() || - msgType.equals(MsgType.SEQUENCE_RESET) || msgType.equals(MsgType.REJECT); + return MsgType.LOGON.equals(msgType) && state.isResetSent() || state.isResetReceived() || + MsgType.LOGON.equals(msgType) && !state.isLogonReceived() || + !MsgType.LOGON.equals(msgType) && state.isLogonReceived() || + MsgType.LOGOUT.equals(msgType) && state.isLogonSent() || + !MsgType.LOGOUT.equals(msgType) && state.isLogoutSent() || + MsgType.SEQUENCE_RESET.equals(msgType) || MsgType.REJECT.equals(msgType); } private boolean verify(Message message) throws RejectLogon, FieldNotFound, IncorrectDataFormat, @@ -2347,7 +2369,7 @@ private boolean nextQueued(int num) throws FieldNotFound, RejectLogon, Incorrect getLog().onEvent("Processing queued message: " + num); final String msgType = msg.getHeader().getString(MsgType.FIELD); - if (msgType.equals(MsgType.LOGON) || msgType.equals(MsgType.RESEND_REQUEST)) { + if (MsgType.LOGON.equals(msgType) || MsgType.RESEND_REQUEST.equals(msgType)) { // Logon and ResendRequest processing has already been done, so we just need to increment the target seqnum. state.incrNextTargetMsgSeqNum(); } else { @@ -2437,7 +2459,7 @@ private boolean validatePossDup(Message msg) throws FieldNotFound, IOException { final Message.Header header = msg.getHeader(); final String msgType = header.getString(MsgType.FIELD); - if (!msgType.equals(MsgType.SEQUENCE_RESET)) { + if (!MsgType.SEQUENCE_RESET.equals(msgType)) { if (header.isSetField(OrigSendingTime.FIELD)) { final LocalDateTime origSendingTime = header.getUtcTimeStamp(OrigSendingTime.FIELD); final LocalDateTime sendingTime = header.getUtcTimeStamp(SendingTime.FIELD); @@ -2535,7 +2557,7 @@ private boolean sendRaw(Message message, int num) { logApplicationException("toAdmin()", t); } - if (msgType.equals(MsgType.LOGON)) { + if (MsgType.LOGON.equals(msgType)) { if (!state.isResetReceived()) { boolean resetSeqNumFlag = false; if (message.isSetField(ResetSeqNumFlag.FIELD)) { @@ -2550,9 +2572,9 @@ private boolean sendRaw(Message message, int num) { } messageString = message.toString(); - if (msgType.equals(MsgType.LOGON) || msgType.equals(MsgType.LOGOUT) - || msgType.equals(MsgType.RESEND_REQUEST) - || msgType.equals(MsgType.SEQUENCE_RESET) || isLoggedOn()) { + if (MsgType.LOGON.equals(msgType) || MsgType.LOGOUT.equals(msgType) + || MsgType.RESEND_REQUEST.equals(msgType) + || MsgType.SEQUENCE_RESET.equals(msgType) || isLoggedOn()) { result = send(messageString); } } else { @@ -2783,6 +2805,10 @@ public boolean isLogoutTimedOut() { return state.isLogoutTimedOut(); } + public boolean isRejectGarbledMessage() { + return rejectGarbledMessage; + } + public boolean isUsingDataDictionary() { return dataDictionaryProvider != null; } @@ -2892,6 +2918,10 @@ public void setIgnoreHeartBeatFailure(boolean ignoreHeartBeatFailure) { disableHeartBeatCheck = ignoreHeartBeatFailure; } + public void setRejectGarbledMessage(boolean rejectGarbledMessage) { + this.rejectGarbledMessage = rejectGarbledMessage; + } + public void setRejectInvalidMessage(boolean rejectInvalidMessage) { this.rejectInvalidMessage = rejectInvalidMessage; } diff --git a/quickfixj-core/src/main/java/quickfix/mina/AbstractIoHandler.java b/quickfixj-core/src/main/java/quickfix/mina/AbstractIoHandler.java index 8a27199b7d..727850970a 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/AbstractIoHandler.java +++ b/quickfixj-core/src/main/java/quickfix/mina/AbstractIoHandler.java @@ -19,18 +19,21 @@ package quickfix.mina; +import java.io.IOException; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecException; import org.apache.mina.filter.codec.ProtocolDecoderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import quickfix.*; -import quickfix.field.MsgType; - -import java.io.IOException; - +import quickfix.InvalidMessage; +import quickfix.Log; +import quickfix.LogUtil; +import quickfix.Message; +import quickfix.MessageUtils; import static quickfix.MessageUtils.parse; +import quickfix.Session; +import quickfix.SessionID; /** * Abstract class used for acceptor and initiator IO handlers. @@ -125,13 +128,22 @@ public void messageReceived(IoSession ioSession, Object message) throws Exceptio SessionID remoteSessionID = MessageUtils.getReverseSessionID(messageString); Session quickFixSession = findQFSession(ioSession, remoteSessionID); if (quickFixSession != null) { + final boolean rejectGarbledMessage = quickFixSession.isRejectGarbledMessage(); final Log sessionLog = quickFixSession.getLog(); sessionLog.onIncoming(messageString); try { Message fixMessage = parse(quickFixSession, messageString); processMessage(ioSession, fixMessage); } catch (InvalidMessage e) { - if (MsgType.LOGON.equals(MessageUtils.getMessageType(messageString))) { + if (rejectGarbledMessage) { + final Message fixMessage = e.getFixMessage(); + if ( fixMessage != null ) { + sessionLog.onErrorEvent("Processing garbled message: " + e.getMessage()); + processMessage(ioSession, fixMessage); + return; + } + } + if (MessageUtils.isLogon(messageString)) { sessionLog.onErrorEvent("Invalid LOGON message, disconnecting: " + e.getMessage()); ioSession.closeNow(); } else { diff --git a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java index 7c677f43e0..722173e071 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java +++ b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java @@ -115,7 +115,7 @@ public Session build() { timestampPrecision, resetOnLogon, resetOnLogout, resetOnDisconnect, refreshMessageStoreAtLogon, checkCompID, redundantResentRequestsAllowed, persistMessages, useClosedRangeForResend, testRequestDelayMultiplier, senderDefaultApplVerID, validateSequenceNumbers, logonIntervals, - resetOnError, disconnectOnError, disableHeartBeatCheck, rejectInvalidMessage, + resetOnError, disconnectOnError, disableHeartBeatCheck, false, rejectInvalidMessage, rejectMessageOnUnhandledException, requiresOrigSendingTime, forceResendWhenCorruptedStore, allowedRemoteAddresses, validateIncomingMessage, resendRequestChunkSize, enableNextExpectedMsgSeqNum, enableLastMsgSeqNumProcessed); diff --git a/quickfixj-core/src/test/java/quickfix/SessionTest.java b/quickfixj-core/src/test/java/quickfix/SessionTest.java index 1e00cde3ee..3adbe01589 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionTest.java @@ -85,7 +85,7 @@ public void testDisposalOfFileResources() throws Exception { mockMessageStoreFactory, sessionID, null, null, mockLogFactory, new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false, false, false, false, false, true, false, 1.5, null, true, - new int[] { 5 }, false, false, false, true, false, true, false, + new int[] { 5 }, false, false, false, false, true, false, true, false, null, true, 0, false, false)) { // Simulate socket disconnect session.setResponder(null); @@ -126,7 +126,7 @@ public void testNondisposableFileResources() throws Exception { mockMessageStoreFactory, sessionID, null, null, mockLogFactory, new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false, false, false, false, false, true, false, 1.5, null, true, - new int[] { 5 }, false, false, false, true, false, true, false, + new int[] { 5 }, false, false, false, false, true, false, true, false, null, true, 0, false, false)) { // Simulate socket disconnect session.setResponder(null); @@ -1980,7 +1980,7 @@ private void testSequenceResetGapFillWithChunkSize(int chunkSize) new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[] { 5 }, - false, false, false, true, false, true, false, null, true, + false, false, false, false, true, false, true, false, null, true, chunkSize, false, false)) { UnitTestResponder responder = new UnitTestResponder(); @@ -2042,7 +2042,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu sessionID, null, null, null, new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, - new int[]{5}, false, false, false, true, false, true, false, null, true, 0, + new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, false, false); Responder mockResponder = mock(Responder.class); @@ -2090,7 +2090,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu sessionID, null, null, null, new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, - new int[]{5}, false, false, false, true, false, true, false, null, true, 0, + new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, enableNextExpectedMsgSeqNum, false); Responder mockResponder = mock(Responder.class); @@ -2139,7 +2139,7 @@ public void testMsgSeqNumTooHighWithDisconnectOnError() throws Exception { new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[] { 5 }, - false, disconnectOnError, false, true, false, true, false, + false, disconnectOnError, false, false, true, false, true, false, null, true, 0, false, false)) { UnitTestResponder responder = new UnitTestResponder(); @@ -2175,7 +2175,7 @@ public void testTimestampPrecision() throws Exception { new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.NANOS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[] { 5 }, - false, disconnectOnError, false, true, false, true, false, + false, disconnectOnError, false, false, true, false, true, false, null, true, 0, false, false)) { UnitTestResponder responder = new UnitTestResponder(); @@ -2227,7 +2227,7 @@ private void testLargeQueue(int N) throws Exception { sessionID, null, null, null, new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, - new int[]{5}, false, false, false, true, false, true, false, null, true, 0, + new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, false, false); UnitTestResponder responder = new UnitTestResponder(); @@ -2343,7 +2343,7 @@ public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound sessionID, null, null, null, new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, - new int[]{5}, false, false, false, true, false, true, false, null, true, 0, + new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, enableNextExpectedMsgSeqNum, false); UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder); diff --git a/quickfixj-core/src/test/java/quickfix/mina/acceptor/AcceptorIoHandlerTest.java b/quickfixj-core/src/test/java/quickfix/mina/acceptor/AcceptorIoHandlerTest.java index cbc444a19a..82290f97f2 100644 --- a/quickfixj-core/src/test/java/quickfix/mina/acceptor/AcceptorIoHandlerTest.java +++ b/quickfixj-core/src/test/java/quickfix/mina/acceptor/AcceptorIoHandlerTest.java @@ -20,6 +20,7 @@ package quickfix.mina.acceptor; import java.time.LocalDateTime; +import java.time.ZoneOffset; import org.apache.mina.core.session.IoSession; import org.junit.Test; import quickfix.FixVersions; @@ -49,6 +50,16 @@ import static org.mockito.Mockito.stub; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import quickfix.ConfigError; +import quickfix.Message; +import quickfix.Responder; +import quickfix.RuntimeError; +import quickfix.SessionSettings; +import quickfix.SessionSettingsTest; +import quickfix.field.MsgType; +import quickfix.field.Text; +import quickfix.mina.SessionConnector; +import quickfix.mina.SingleThreadedEventHandlingStrategy; public class AcceptorIoHandlerTest { @@ -202,4 +213,133 @@ public void testLogonWithoutHeartBtInt() throws Exception { } } + // QFJ-950 + @Test + public void testRejectGarbledMessage() throws Exception { + SessionSettings settings = SessionSettingsTest.setUpSession(null); + SessionConnector connector = new SessionConnector(settings, null) { + @Override + public void start() throws ConfigError, RuntimeError {} + + @Override + public void stop(boolean force) {} + }; + SingleThreadedEventHandlingStrategy eventHandlingStrategy = new SingleThreadedEventHandlingStrategy(connector, 1000); + IoSession mockIoSession = mock(IoSession.class); + + final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIXT11, "SENDER", + "TARGET"); + final UnitTestApplication unitTestApplication = new UnitTestApplication(); + try (Session session = SessionFactoryTestSupport.createSession(sessionID, unitTestApplication, false, true, true, true, new DefaultApplVerID(ApplVerID.FIX50SP2))) { + session.setRejectGarbledMessage(true); + eventHandlingStrategy.blockInThread(); + Responder responder = new UnitTestResponder(); + stub(mockIoSession.getAttribute("QF_SESSION")).toReturn(null); // to create a new Session + + final HashMap acceptorSessions = new HashMap<>(); + acceptorSessions.put(sessionID, session); + final StaticAcceptorSessionProvider sessionProvider = createSessionProvider(acceptorSessions); + + final AcceptorIoHandler handler = new AcceptorIoHandler(sessionProvider, + new NetworkingOptions(new Properties()), eventHandlingStrategy); + + final DefaultApplVerID defaultApplVerID = new DefaultApplVerID(ApplVerID.FIX50SP2); + final Logon message = new Logon(new EncryptMethod(EncryptMethod.NONE_OTHER), + new HeartBtInt(30), defaultApplVerID); + message.getHeader().setString(TargetCompID.FIELD, sessionID.getSenderCompID()); + message.getHeader().setString(SenderCompID.FIELD, sessionID.getTargetCompID()); + message.getHeader().setField(new SendingTime(LocalDateTime.now(ZoneOffset.UTC))); + message.getHeader().setInt(MsgSeqNum.FIELD, 1); + + handler.messageReceived(mockIoSession, message.toString()); + session.setResponder(responder); + // wait some time for EventHandlingStrategy to poll the message + Thread.sleep(EventHandlingStrategy.THREAD_WAIT_FOR_MESSAGE_MS * 2); + + assertEquals(2, session.getStore().getNextTargetMsgSeqNum()); + assertEquals(2, session.getStore().getNextSenderMsgSeqNum()); + stub(mockIoSession.getAttribute("QF_SESSION")).toReturn(session); + + // garbled: character as group count + String fixString = "8=FIXT.1.19=6835=B34=249=TARGET52=20180623-22:06:28.97756=SENDER148=foo33=a10=248"; + handler.messageReceived(mockIoSession, fixString); + // wait some time for EventHandlingStrategy to poll the message + Thread.sleep(EventHandlingStrategy.THREAD_WAIT_FOR_MESSAGE_MS * 2); + + // ensure that seqnums are incremented (i.e. message is not ignored) + assertEquals(3, session.getStore().getNextTargetMsgSeqNum()); + assertEquals(3, session.getStore().getNextSenderMsgSeqNum()); + + Message lastToAdminMessage = unitTestApplication.lastToAdminMessage(); + assertEquals(MsgType.REJECT, lastToAdminMessage.getHeader().getString(MsgType.FIELD)); + assertEquals("Message failed basic validity check", lastToAdminMessage.getString(Text.FIELD)); + + // garbled: missing msgtype + fixString = "8=FIXT.1.19=6834=349=TARGET52=20180623-22:06:28.97756=SENDER148=foo33=a10=248"; + handler.messageReceived(mockIoSession, fixString); + // wait some time for EventHandlingStrategy to poll the message + Thread.sleep(EventHandlingStrategy.THREAD_WAIT_FOR_MESSAGE_MS * 2); + + // ensure that seqnums are incremented (i.e. message is not ignored) + assertEquals(4, session.getStore().getNextTargetMsgSeqNum()); + assertEquals(4, session.getStore().getNextSenderMsgSeqNum()); + + lastToAdminMessage = unitTestApplication.lastToAdminMessage(); + assertEquals(MsgType.REJECT, lastToAdminMessage.getHeader().getString(MsgType.FIELD)); + assertEquals("Message failed basic validity check", lastToAdminMessage.getString(Text.FIELD)); + + // garbled: wrong checksum + fixString = "8=FIXT.1.19=6835=B34=449=TARGET52=20180623-22:06:28.97756=SENDER148=foo33=110=256"; + handler.messageReceived(mockIoSession, fixString); + // wait some time for EventHandlingStrategy to poll the message + Thread.sleep(EventHandlingStrategy.THREAD_WAIT_FOR_MESSAGE_MS * 2); + + // ensure that seqnums are incremented (i.e. message is not ignored) + assertEquals(5, session.getStore().getNextTargetMsgSeqNum()); + assertEquals(5, session.getStore().getNextSenderMsgSeqNum()); + + lastToAdminMessage = unitTestApplication.lastToAdminMessage(); + assertEquals(MsgType.REJECT, lastToAdminMessage.getHeader().getString(MsgType.FIELD)); + assertEquals("Message failed basic validity check", lastToAdminMessage.getString(Text.FIELD)); + + // garbled: invalid tag 49garbled + fixString = "8=FIXT.1.19=6835=B34=549garbled=TARGET52=20180623-22:06:28.97756=SENDER148=foo33=110=256"; + handler.messageReceived(mockIoSession, fixString); + // wait some time for EventHandlingStrategy to poll the message + Thread.sleep(EventHandlingStrategy.THREAD_WAIT_FOR_MESSAGE_MS * 2); + + // ensure that seqnums are incremented (i.e. message is not ignored) + assertEquals(6, session.getStore().getNextTargetMsgSeqNum()); + assertEquals(6, session.getStore().getNextSenderMsgSeqNum()); + + lastToAdminMessage = unitTestApplication.lastToAdminMessage(); + assertEquals(MsgType.REJECT, lastToAdminMessage.getHeader().getString(MsgType.FIELD)); + assertEquals("Message failed basic validity check", lastToAdminMessage.getString(Text.FIELD)); + + } finally { + eventHandlingStrategy.stopHandlingMessages(true); + } + } + + private class UnitTestResponder implements Responder { + + public String sentMessageData; + public boolean disconnectCalled; + + @Override + public boolean send(String data) { + sentMessageData = data; + return true; + } + + @Override + public String getRemoteAddress() { + return null; + } + + @Override + public void disconnect() { + disconnectCalled = true; + } + } } diff --git a/quickfixj-core/src/test/java/quickfix/test/acceptance/AcceptanceTestSuite.java b/quickfixj-core/src/test/java/quickfix/test/acceptance/AcceptanceTestSuite.java index 77f620dee5..843f4e91e1 100644 --- a/quickfixj-core/src/test/java/quickfix/test/acceptance/AcceptanceTestSuite.java +++ b/quickfixj-core/src/test/java/quickfix/test/acceptance/AcceptanceTestSuite.java @@ -273,6 +273,11 @@ public static Test suite() { acceptanceTests.addTest(new AcceptanceTestServerSetUp(new AcceptanceTestSuite("timestamps", true, timestampProperties))); acceptanceTests.addTest(new AcceptanceTestServerSetUp(new AcceptanceTestSuite("timestamps", false, timestampProperties))); + Map rejectGarbledMessagesProperties = new HashMap<>(); + rejectGarbledMessagesProperties.put(Session.SETTING_REJECT_GARBLED_MESSAGE, "Y"); + acceptanceTests.addTest(new AcceptanceTestServerSetUp(new AcceptanceTestSuite("rejectGarbledMessages", true, rejectGarbledMessagesProperties))); + acceptanceTests.addTest(new AcceptanceTestServerSetUp(new AcceptanceTestSuite("rejectGarbledMessages", false, rejectGarbledMessagesProperties))); + return acceptanceTests; } diff --git a/quickfixj-core/src/test/resources/quickfix/test/acceptance/definitions/rejectGarbledMessages/fix50/QFJ950-RejectGarbledMessages.def b/quickfixj-core/src/test/resources/quickfix/test/acceptance/definitions/rejectGarbledMessages/fix50/QFJ950-RejectGarbledMessages.def new file mode 100644 index 0000000000..dd0ac9292e --- /dev/null +++ b/quickfixj-core/src/test/resources/quickfix/test/acceptance/definitions/rejectGarbledMessages/fix50/QFJ950-RejectGarbledMessages.def @@ -0,0 +1,49 @@ +# If message is garbled and setting RejectGarbledMessage=Y, then msg should be rejected + +iCONNECT +I8=FIXT.1.135=A34=149=TW52=