Skip to content

Commit 6d1f246

Browse files
authored
Merge pull request #21 from AlignmentSystems/kafka2
KafkaListener and Event engine rework
2 parents 6d4a6ba + 3535994 commit 6d1f246

23 files changed

+735
-395
lines changed

administrator/version.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
#Sun Sep 17 16:27:22 AEST 2023
2-
VERSION_BUILD=650
1+
#Mon Sep 18 00:46:07 AEST 2023
2+
VERSION_BUILD=664

exchange/src/main/java/com/alignmentsystems/matching/FIXEngineExchange.java

Lines changed: 132 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.alignmentsystems.matching;
22

3+
import java.nio.ByteBuffer;
4+
35
/******************************************************************************
46
*
57
* Author : John Greenan
@@ -13,13 +15,28 @@
1315

1416
import java.util.UUID;
1517

18+
import org.apache.kafka.clients.consumer.ConsumerRecord;
19+
20+
import com.alignmentsystems.fix44.ExecutionReport;
21+
import com.alignmentsystems.fix44.field.AvgPx;
22+
import com.alignmentsystems.fix44.field.ClOrdID;
23+
import com.alignmentsystems.fix44.field.CumQty;
24+
import com.alignmentsystems.fix44.field.ExecID;
25+
import com.alignmentsystems.fix44.field.ExecType;
26+
import com.alignmentsystems.fix44.field.LeavesQty;
27+
import com.alignmentsystems.fix44.field.OrdStatus;
28+
import com.alignmentsystems.fix44.field.OrderID;
29+
import com.alignmentsystems.fix44.field.Side;
1630
import com.alignmentsystems.library.AlignmentOrder;
1731
import com.alignmentsystems.library.LibraryOrders;
1832
import com.alignmentsystems.library.LogEncapsulation;
1933
import com.alignmentsystems.library.constants.Constants;
34+
import com.alignmentsystems.library.enumerations.Encodings;
2035
import com.alignmentsystems.library.enumerations.InstanceType;
2136
import com.alignmentsystems.library.enumerations.MessageDirection;
37+
import com.alignmentsystems.library.enumerations.OperationEventType;
2238
import com.alignmentsystems.library.enumerations.OrderBookSide;
39+
import com.alignmentsystems.library.interfaces.InterfaceMatch;
2340
import com.alignmentsystems.library.interfaces.InterfaceQueueNonSequenced;
2441

2542
import quickfix.DoNotSend;
@@ -29,19 +46,27 @@
2946
import quickfix.Message;
3047
import quickfix.MessageCracker;
3148
import quickfix.RejectLogon;
49+
import quickfix.Session;
3250
import quickfix.SessionID;
51+
import quickfix.SessionNotFound;
3352
import quickfix.UnsupportedMessageType;
3453

3554
/**
3655
* @author <a href="mailto:sales@alignment-systems.com">John Greenan</a>
3756
*
3857
*/
39-
public class FIXEngineExchange extends MessageCracker implements quickfix.Application {
58+
public class FIXEngineExchange extends MessageCracker implements quickfix.Application , KafkaMessageHandler{
59+
protected final static String CLASSNAME = FIXEngineExchange.class.getSimpleName();
4060
private InterfaceQueueNonSequenced queueNonSequenced = null;
61+
private final static Encodings encoding = Encodings.FIXSBELITTLEENDIAN;
4162
private LogEncapsulation log = null;
42-
private final static String CLASSNAME = FIXEngineExchange.class.getSimpleName();
4363
private InstanceType instanceType = null;
4464

65+
66+
67+
68+
69+
4570
public FIXEngineExchange(LogEncapsulation log, InterfaceQueueNonSequenced queueNonSequenced,
4671
InstanceType instanceType) {
4772
this.log = log;
@@ -276,4 +301,109 @@ public void onMessage(com.alignmentsystems.fix44.ExecutionReport message, Sessio
276301

277302
}
278303

304+
@Override
305+
public void processMessage(String topicName, ConsumerRecord<String, byte[]> message) throws Exception {
306+
// TODO Auto-generated method stub
307+
//Here we receive a Kafka Message...
308+
309+
log.info(CLASSNAME + " received " + topicName);
310+
//topicName
311+
312+
313+
ByteBuffer bb = ByteBuffer.wrap(message.value()).order(encoding.getByteOrder());
314+
final short msgType = bb.getShort(); // buf.putShort(messageType);
315+
//When we get to here we know the message type that was used
316+
317+
318+
319+
}
320+
321+
private void sendExecutionReportsForMatch(InterfaceMatch match) {
322+
final String methodName ="matchHappened";
323+
324+
log.infoMatchingEvent(OperationEventType.MATCHEVENT, match);
325+
Long executionQuantity = match.getMatchQuantity();
326+
327+
328+
OrderID b_orderId = new OrderID(match.getBuyOrderId().toString());
329+
ClOrdID b_ClOrdId = new ClOrdID(match.getBuyClOrdId().toString());
330+
ExecID b_execID = new ExecID(UUID.randomUUID().toString());
331+
CumQty b_cumQty = new CumQty(match.getMatchQuantity());
332+
AvgPx b_avgPx = new AvgPx(match.getMatchPrice());
333+
Side b_side = new Side(Side.BUY);
334+
335+
ExecType b_execType = null;
336+
OrdStatus b_ordStatus = null;
337+
Long b_orderQty = match.getBuyOrderQty();
338+
LeavesQty b_leavesQty = new LeavesQty( match.getBuyOrderQty()-match.getBuyCumQty());
339+
340+
//int java.lang.Double.compareTo(Double anotherDouble)
341+
//the value 0 if anotherDouble is numerically equal to this Double;
342+
//a value less than 0 if this Double is numerically less than anotherDouble;
343+
//and a value greater than 0 if this Double is numerically greater than anotherDouble.
344+
//Therefore, if this is equal to zero then the execution quantity is equal to the order quantity
345+
346+
if (executionQuantity.compareTo(b_orderQty)==0) {
347+
b_execType = new ExecType(ExecType.FILL);
348+
b_ordStatus = new OrdStatus(OrdStatus.FILLED);
349+
}
350+
351+
ExecutionReport buyExecRpt = new ExecutionReport(
352+
b_orderId
353+
, b_execID
354+
, b_execType
355+
, b_ordStatus
356+
, b_side
357+
, b_leavesQty
358+
, b_cumQty
359+
, b_avgPx)
360+
;
361+
buyExecRpt.set(b_ClOrdId);
362+
363+
364+
365+
//TODO - clean up the above
366+
//Repeat the same code with s_ instead of b_
367+
OrderID s_orderId = new OrderID(match.getSellOrderId().toString());
368+
ClOrdID s_ClOrdId = new ClOrdID(match.getBuyClOrdId().toString());
369+
ExecID s_execID = new ExecID(UUID.randomUUID().toString());
370+
CumQty s_cumQty = new CumQty(match.getMatchQuantity());
371+
AvgPx s_avgPx = new AvgPx(match.getMatchPrice());
372+
Side s_side = new Side(Side.SELL);
373+
374+
ExecType s_execType = null;
375+
OrdStatus s_ordStatus = null;
376+
Long s_orderQty = match.getSellOrderQty();
377+
LeavesQty s_leavesQty = new LeavesQty(match.getSellOrderQty() - match.getSellCumQty() );
378+
379+
//int java.lang.Double.compareTo(Double anotherDouble)
380+
//the value 0 if anotherDouble is numerically equal to this Double;
381+
//a value less than 0 if this Double is numerically less than anotherDouble;
382+
//and a value greater than 0 if this Double is numerically greater than anotherDouble.
383+
//Therefore, if this is equal to zero then the execution quantity is equal to the order quantity
384+
385+
if (executionQuantity == s_orderQty) {
386+
s_execType = new ExecType(ExecType.FILL);
387+
s_ordStatus = new OrdStatus(OrdStatus.FILLED);
388+
}
389+
390+
ExecutionReport sellExecRpt = new ExecutionReport(
391+
s_orderId
392+
, s_execID
393+
, s_execType
394+
, s_ordStatus
395+
, s_side
396+
, s_leavesQty
397+
, s_cumQty
398+
, s_avgPx)
399+
;
400+
buyExecRpt.set(s_ClOrdId);
401+
402+
try {
403+
Session.sendToTarget(sellExecRpt, match.getSellSenderId(), match.getSellTargetId());
404+
Session.sendToTarget(buyExecRpt, match.getBuySenderId() , match.getBuyTargetId());
405+
} catch (SessionNotFound e) {
406+
log.error(e.getMessage(), e);
407+
};
408+
}
279409
}
Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,125 @@
11
package com.alignmentsystems.matching;
2+
/******************************************************************************
3+
*
4+
* Author : John Greenan
5+
* Contact : sales@alignment-systems.com
6+
* Date : 24th August 2023
7+
* Copyright : Alignment Systems Ltd 2023
8+
* Project : Alignment Matching Toy
9+
* Artefact : FIXEngineKafkaListener
10+
* Description :
11+
*****************************************************************************/
212

3-
public class FIXEngineKafkaListener {
13+
import java.io.FileNotFoundException;
14+
import java.time.Duration;
15+
import java.util.List;
16+
import java.util.Properties;
17+
import java.util.concurrent.atomic.AtomicBoolean;
418

5-
public FIXEngineKafkaListener() {
19+
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
import org.apache.kafka.clients.consumer.ConsumerRecords;
21+
import org.apache.kafka.clients.consumer.KafkaConsumer;
22+
import org.apache.kafka.common.errors.WakeupException;
23+
24+
import com.alignmentsystems.library.LibraryFunctions;
25+
import com.alignmentsystems.library.LogEncapsulation;
26+
import com.alignmentsystems.library.enumerations.InstanceType;
27+
28+
/**
29+
* @author <a href="mailto:sales@alignment-systems.com">John Greenan</a>
30+
*
31+
*/
32+
public class FIXEngineKafkaListener extends KafkaAbstractSimple implements Runnable {
33+
public final static String CLASSNAME = OrderBookKafkaConsumer.class.getSimpleName();
34+
private final int TIME_OUT_MS = 5000;
35+
private KafkaConsumer<String, byte[]> kafkaConsumer = null;
36+
private final AtomicBoolean closed = new AtomicBoolean(false);
37+
private LogEncapsulation log = null;
38+
private Properties props = null;
39+
40+
public FIXEngineKafkaListener() throws Exception {
41+
super();
642
// TODO Auto-generated constructor stub
743
}
844

9-
}
45+
public Boolean initialise(LogEncapsulation log) throws FileNotFoundException , NullPointerException{
46+
this.log = log;
47+
48+
try {
49+
this.props = LibraryFunctions.getProperties(FIXEngineKafkaListener.class.getClassLoader(), InstanceType.KAFKA.getProperties());
50+
} catch (FileNotFoundException |NullPointerException e) {
51+
throw e;
52+
}
53+
54+
return Boolean.TRUE;
55+
}
56+
57+
public void setKafkaConsumer(KafkaConsumer<String, byte[]> kafkaConsumer) {
58+
this.kafkaConsumer = kafkaConsumer;
59+
}
60+
61+
public KafkaConsumer<String, byte[]> getKafkaConsumer() {
62+
return kafkaConsumer;
63+
}
64+
65+
@Override
66+
public void run() {
67+
AtomicBoolean run = new AtomicBoolean(true);
68+
while (run.get()) {
69+
try {
70+
wait(2000);
71+
} catch (InterruptedException e) {
72+
log.error(e.getMessage() , e );
73+
}
74+
}
75+
}
76+
77+
@Override
78+
public void shutdown() throws Exception {
79+
closed.set(true);
80+
log.info("Shutting down consumer");
81+
getKafkaConsumer().wakeup();
82+
}
83+
84+
@Override
85+
public void runAlways(String topicName, KafkaMessageHandler callback) throws Exception {
86+
87+
List<String> topicNames = List.of(topicName);
88+
runAlways(topicNames, callback);
89+
90+
}
91+
92+
@Override
93+
public void runAlways(List<String> topicNames , KafkaMessageHandler callback) throws Exception {
94+
// TODO Auto-generated method stub
95+
Properties props;
96+
try {
97+
props = LibraryFunctions.getProperties(OrderBookKafkaConsumer.class.getClassLoader() , InstanceType.KAFKA.getProperties());
98+
} catch (FileNotFoundException | NullPointerException e) {
99+
//log.error(e.getMessage() , e);
100+
throw e;
101+
}
102+
// make the consumer available for graceful shutdown
103+
setKafkaConsumer(new KafkaConsumer<>(props));
104+
105+
// keep running forever or until shutdown() is called from another thread.
106+
try {
107+
getKafkaConsumer().subscribe(topicNames);
108+
while (!closed.get()) {
109+
ConsumerRecords<String, byte[]> records = getKafkaConsumer().poll(Duration.ofMillis(TIME_OUT_MS));
110+
if (records.count() == 0) {
111+
log.info("No records retrieved");
112+
}else {
113+
114+
for (ConsumerRecord<String, byte[]> record : records) {
115+
callback.processMessage(record.topic(), record);
116+
}
117+
}
118+
}
119+
} catch (WakeupException e) {
120+
// Ignore exception if closing
121+
if (!closed.get())
122+
throw e;
123+
}
124+
}
125+
}

exchange/src/main/java/com/alignmentsystems/matching/FIXToBinaryProcessor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@
2929
import com.alignmentsystems.library.interfaces.InterfaceFIXToBinaryProcessor;
3030
import com.alignmentsystems.library.interfaces.InterfaceOrder;
3131

32+
33+
34+
/**
35+
* @author <a href="mailto:sales@alignment-systems.com">John Greenan</a>
36+
*
37+
*/
3238
public class FIXToBinaryProcessor implements Runnable, InterfaceFIXToBinaryProcessor {
3339
protected final static String CLASSNAME = FIXToBinaryProcessor.class.getSimpleName().toString();
3440
private LogEncapsulation log = null;

0 commit comments

Comments
 (0)