Skip to content

Commit

Permalink
Stabilize test
Browse files Browse the repository at this point in the history
  • Loading branch information
pcdv committed Apr 5, 2024
1 parent 01ed098 commit b40884c
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ void awaitMessage(final int sequenceNumber, final Session session)

void disconnectSessions()
{
logoutAcceptingSession();

assertSessionsDisconnected();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.junit.Ignore;
import org.junit.Test;
import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder;
import uk.co.real_logic.artio.dictionary.generation.Exceptions;
import uk.co.real_logic.artio.engine.EngineConfiguration;
import uk.co.real_logic.artio.engine.FixEngine;
import uk.co.real_logic.artio.fields.EpochFractionFormat;
Expand All @@ -29,10 +28,6 @@
import static org.junit.Assert.assertTrue;
import static uk.co.real_logic.artio.TestFixtures.launchMediaDriver;
import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.ACCEPTOR_ID;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.INITIATOR_ID;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingConfig;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingLibraryConfig;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.connect;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingConfig;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingLibraryConfig;
Expand All @@ -56,13 +51,14 @@ public class RaceResendResetTest extends AbstractGatewayToGatewaySystemTest
private long sleepBeforeSendResendRequest;

private final ArrayList<AutoCloseable> autoClose = new ArrayList<>();
private DebugServer initialAcceptor;

private void launch()
private void launch() throws IOException
{
mediaDriver = launchMediaDriver();
launchAccepting();
launchInitialAcceptor();
launchInitiating();
testSystem = new TestSystem(acceptingLibrary, initiatingLibrary);
testSystem = new TestSystem(initiatingLibrary);
}

private void launchInitiating()
Expand Down Expand Up @@ -103,15 +99,15 @@ public void execute()
}
}

private void launchAccepting()
private void launchInitialAcceptor() throws IOException
{
final EngineConfiguration acceptingConfig = acceptingConfig(port, ACCEPTOR_ID, INITIATOR_ID, nanoClock)
.deleteLogFileDirOnStart(true)
.initialAcceptedSessionOwner(SOLE_LIBRARY);
acceptingEngine = FixEngine.launch(acceptingConfig);

final LibraryConfiguration acceptingLibraryConfig = acceptingLibraryConfig(acceptingHandler, nanoClock);
acceptingLibrary = connect(acceptingLibraryConfig);
initialAcceptor = new DebugServer(port);
initialAcceptor.setWaitForData(true);
initialAcceptor.addFIXResponse(
"8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=1|52=***|98=0|108=10|141=N|35002=0|35003=0|10=024|",
"8=FIX.4.4|9=94|35=1|49=acceptor|56=initiator|34=2|52=***|112=hello|98=0|108=10|141=N|10=024|"
);
initialAcceptor.start();
}

/**
Expand Down Expand Up @@ -154,33 +150,19 @@ class Proxy extends DirectSessionProxy
@Override
public long onResend(
final Session session, final AbstractResendRequestDecoder resendRequest,
final int correctedEndSeqNo, final ResendRequestResponse response,
final int endSeqNo, final ResendRequestResponse response,
final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength
)
{
onResendRequestReceived(session, resendRequest, correctedEndSeqNo, response,
messageBuffer, messageOffset, messageLength);
return 1;
}

private void onResendRequestReceived(
final Session session, final AbstractResendRequestDecoder request, final int endSeqNo,
final ResendRequestResponse response,
final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength
)
{
System.err.println("onResendRequestReceived() called");
if (!useProxy || sleepBeforeSendResendRequest == 0)
{
response.resend();
}
else
System.err.println("onResend() called");
if (useProxy && sleepBeforeSendResendRequest != 0)
{
response.delay();
final MutableAsciiBuffer buf = new MutableAsciiBuffer(new byte[messageLength]);
buf.putBytes(0, messageBuffer, messageOffset, messageLength);
pendingResendRequest = new PendingResendRequest(session, request.beginSeqNo(), endSeqNo, buf);
pendingResendRequest = new PendingResendRequest(session, resendRequest.beginSeqNo(), endSeqNo, buf);
}
return 1;
}

@Override
Expand Down Expand Up @@ -224,10 +206,6 @@ public long sendResendRequest(
{
pendingResendRequest.execute();
}
else
{
System.err.println("onResend not called (direct)");
}
}
return 1;
}
Expand Down Expand Up @@ -303,14 +281,14 @@ private void reconnectTest() throws Exception
launch();

connectAndAcquire();
final DebugFIXClient acc1 = new DebugFIXClient(initialAcceptor.popClient(5000));
acc1.start();

messagesCanBeExchanged();

disconnectSessions();
Exceptions.closeAll(this::closeAcceptingEngine);

assertEquals(3, acceptingSession.lastReceivedMsgSeqNum());
assertEquals(3, initiatingSession.lastReceivedMsgSeqNum());
acc1.popAndAssert("35=A 34=1");
acc1.popAndAssert("35=0 34=2 112=hello");
acc1.close();
initialAcceptor.stop();
assertEquals(2, initiatingSession.lastReceivedMsgSeqNum());

final DebugServer srv = new DebugServer(port);
srv.setWaitForData(true);
Expand All @@ -323,11 +301,12 @@ private void reconnectTest() throws Exception

connectPersistentSessions(4, 4, false);

final DebugFIXClient exchange = new DebugFIXClient(srv.popClient(5000));
autoClose.add(exchange::close);
exchange.popAndAssert("35=A 34=4");
exchange.popAndAssert("35=2 34=5 7=4 16=0"); // ResendRequest now always received first
exchange.popAndAssert("35=4 34=4 36=6");
final DebugFIXClient acc2 = new DebugFIXClient(srv.popClient(5000));
acc2.start();
autoClose.add(acc2::close);
acc2.popAndAssert("35=A 34=4");
acc2.popAndAssert("35=2 34=5 7=4 16=0"); // ResendRequest now always received first
acc2.popAndAssert("35=4 34=4 36=6");
}

@Override
Expand All @@ -349,6 +328,5 @@ public void close()
private void connectAndAcquire()
{
connectSessions();
acceptingSession = acceptingHandler.lastSession();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public class DebugFIXClient
{
private final DebugServer.HasIOStream io;
private final Thread thread;
private Thread thread;

private final BlockingQueue<Map<String, String>> messages = new LinkedBlockingQueue<>();
private volatile boolean disposed;
Expand All @@ -27,6 +27,11 @@ public class DebugFIXClient
public DebugFIXClient(final DebugServer.HasIOStream io)
{
this.io = Objects.requireNonNull(io);
}

public void start()
{
assert thread == null;
thread = new Thread(this::run, "DebugFIXClient");
thread.start();
}
Expand All @@ -36,6 +41,7 @@ public void close() throws Exception
disposed = true;
io.in.close();
io.in.close();
io.socket.close();
thread.interrupt();
thread.join();
}
Expand All @@ -58,7 +64,7 @@ private void run()
{
messages.add(msg);
msg = new HashMap<>();
System.out.println(prefix + s);
System.err.println(prefix + s);
s.setLength(0);
}
}
Expand All @@ -78,6 +84,7 @@ public Map<String, String> popMessage() throws InterruptedException
public void popAndAssert(final String tagValues) throws InterruptedException
{
final Map<String, String> map = popMessage();
System.err.println(map);
if (map == null)
{
throw new AssertionError("No message received");
Expand All @@ -87,7 +94,15 @@ public void popAndAssert(final String tagValues) throws InterruptedException
{
final String tag = rule.substring(0, rule.indexOf('='));
final String value = map.get(tag);
Assert.assertEquals(rule, tag + "=" + value);
try
{
Assert.assertEquals(rule, tag + "=" + value);
}
catch (final Throwable e)
{
e.printStackTrace();
throw e;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void run()
in.reset();
}

final HasIOStream client = new HasIOStream(in, out);
final HasIOStream client = new HasIOStream(socket, in, out);
sendResponses(client.out);
clients.add(client);
}
Expand Down Expand Up @@ -154,11 +154,13 @@ public void setWaitForData(final boolean waitForData)
public static class HasIOStream
{

public final Socket socket;
public final InputStream in;
public final OutputStream out;

public HasIOStream(final InputStream in, final OutputStream out)
public HasIOStream(final Socket socket, final InputStream in, final OutputStream out)
{
this.socket = socket;
this.in = in;
this.out = out;
}
Expand Down

0 comments on commit b40884c

Please sign in to comment.