Skip to content

Commit 8d4974e

Browse files
author
Petr Matousek
committed
transactional receiver first version
1 parent 647d862 commit 8d4974e

File tree

2 files changed

+139
-86
lines changed

2 files changed

+139
-86
lines changed

src/api/qpid-proton/reactor/handler/TxReceiverHandler.cpp

Lines changed: 131 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,51 @@ int TxReceiverHandler::getBatchSize() const
239239

240240
// reactor methods
241241

242+
void TxReceiverHandler::on_session_open(session &s) override {
243+
sess = s;
244+
std::cout << " [on_session_open] declare_txn started..." << std::endl;
245+
s.declare_transaction(*this);
246+
std::cout << " [on_session_open] declare_txn ended..." << std::endl;
247+
}
248+
249+
void on_transaction_declare_failed(transaction) {}
250+
251+
void on_transaction_commit_failed(proton::transaction t) {
252+
std::cout << "Transaction Commit Failed" << std::endl;
253+
t.connection().close();
254+
exit(-1);
255+
}
256+
257+
void TxReceiverHandler::on_transaction_declared(transaction t) override {
258+
std::cout << "[on_transaction_declared] txn called " << (&t)
259+
<< std::endl;
260+
std::cout << "[on_transaction_declared] txn is_empty " << (t.is_empty())
261+
<< "\t" << transaction.is_empty() << std::endl;
262+
recv.add_credit(batch_size);
263+
transaction = t;
264+
}
265+
266+
void TxReceiverHandler::on_message(delivery &d, message &msg) override {
267+
std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl;
268+
transaction.accept(d);
269+
current_batch += 1;
270+
if(current_batch == batch_size) {
271+
transaction = proton::transaction(); // null
272+
}
273+
}
274+
275+
void TxReceiverHandler::on_transaction_committed(transaction t) override {
276+
committed += current_batch;
277+
current_batch = 0;
278+
std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl;
279+
if(committed == expected) {
280+
std::cout << "All messages committed" << std::endl;
281+
t.connection().close();
282+
}
283+
else {
284+
sess.declare_transaction(*this);
285+
}
286+
}
242287

243288
void TxReceiverHandler::on_container_start(container &c)
244289
{
@@ -418,92 +463,92 @@ void TxReceiverHandler::on_container_start(container &c)
418463

419464
}
420465

421-
void TxReceiverHandler::on_message(delivery &d, message &m)
422-
{
423-
msg_received_cnt += 1;
424-
425-
logger(debug) << "Processing received message";
426-
427-
if (log_msgs == "dict") {
428-
logger(trace) << "Decoding message";
429-
ReactorDecoder decoder = ReactorDecoder(m);
430-
431-
std::ostringstream stream;
432-
DictWriter writer = DictWriter(&stream);
433-
434-
DictFormatter formatter = DictFormatter();
435-
formatter.printMessage(&decoder, &writer);
436-
437-
writer.endLine();
438-
std::cout << writer.toString();
439-
} else if (log_msgs == "interop") {
440-
DictFormatter formatter = DictFormatter();
441-
442-
formatter.printMessageInterop(m);
443-
}
444-
445-
if (duration_time > 0 && duration_mode == "after-receive") {
446-
logger(debug) << "Waiting...";
447-
sleep4next(ts, count, duration_time, msg_received_cnt);
448-
}
449-
450-
if((msg_received_cnt % msg_action_size) == 0) {
451-
do_message_action(d);
452-
}
453-
454-
if (duration_time > 0 && duration_mode == "after-receive-action") {
455-
sleep4next(ts, count, duration_time, msg_received_cnt);
456-
}
457-
458-
if (duration_time > 0 && duration_mode == "after-receive-action-tx-action") {
459-
// TODO: not implemented yet
460-
}
461-
462-
logger(debug) << "Process-reply-to: " << process_reply_to;
463-
464-
if (process_reply_to) {
465-
if (m.reply_to() != "") {
466-
logger(debug) << "Reply-to address: " << m.reply_to();
467-
468-
do_process_reply_to(m);
469-
} else {
470-
logger(debug) << "Reply-to address is not set";
471-
}
472-
}
473-
474-
if (recv_drain_after_credit_window && msg_received_cnt == recv_credit_window) {
475-
logger(debug) << "Scheduling drain";
476-
d.receiver().work_queue().add(make_work(&TxReceiverHandler::drain, this));
477-
}
478-
479-
if (!process_reply_to && msg_received_cnt == count) {
480-
if (durable_subscriber) {
481-
d.receiver().detach();
482-
} else {
483-
d.receiver().close();
484-
}
485-
d.connection().close();
486-
} else {
487-
#if defined(__REACTOR_HAS_TIMER)
488-
super::timer.reset();
489-
#endif
490-
}
491-
}
492-
493-
void TxReceiverHandler::on_receiver_drain_finish(receiver &r) {
494-
logger(debug) << "Receiver drain finished";
495-
}
496-
497-
void TxReceiverHandler::on_tracker_accept(tracker &t)
498-
{
499-
logger(debug) << "Delivery accepted";
500-
}
501-
502-
503-
void TxReceiverHandler::on_tracker_reject(tracker &t)
504-
{
505-
logger(debug) << "Delivery rejected";
506-
}
466+
//void TxReceiverHandler::on_message(delivery &d, message &m)
467+
//{
468+
// msg_received_cnt += 1;
469+
//
470+
// logger(debug) << "Processing received message";
471+
//
472+
// if (log_msgs == "dict") {
473+
// logger(trace) << "Decoding message";
474+
// ReactorDecoder decoder = ReactorDecoder(m);
475+
//
476+
// std::ostringstream stream;
477+
// DictWriter writer = DictWriter(&stream);
478+
//
479+
// DictFormatter formatter = DictFormatter();
480+
// formatter.printMessage(&decoder, &writer);
481+
//
482+
// writer.endLine();
483+
// std::cout << writer.toString();
484+
// } else if (log_msgs == "interop") {
485+
// DictFormatter formatter = DictFormatter();
486+
//
487+
// formatter.printMessageInterop(m);
488+
// }
489+
//
490+
// if (duration_time > 0 && duration_mode == "after-receive") {
491+
// logger(debug) << "Waiting...";
492+
// sleep4next(ts, count, duration_time, msg_received_cnt);
493+
// }
494+
//
495+
// if((msg_received_cnt % msg_action_size) == 0) {
496+
// do_message_action(d);
497+
// }
498+
//
499+
// if (duration_time > 0 && duration_mode == "after-receive-action") {
500+
// sleep4next(ts, count, duration_time, msg_received_cnt);
501+
// }
502+
//
503+
// if (duration_time > 0 && duration_mode == "after-receive-action-tx-action") {
504+
// // TODO: not implemented yet
505+
// }
506+
//
507+
// logger(debug) << "Process-reply-to: " << process_reply_to;
508+
//
509+
// if (process_reply_to) {
510+
// if (m.reply_to() != "") {
511+
// logger(debug) << "Reply-to address: " << m.reply_to();
512+
//
513+
// do_process_reply_to(m);
514+
// } else {
515+
// logger(debug) << "Reply-to address is not set";
516+
// }
517+
// }
518+
//
519+
// if (recv_drain_after_credit_window && msg_received_cnt == recv_credit_window) {
520+
// logger(debug) << "Scheduling drain";
521+
// d.receiver().work_queue().add(make_work(&TxReceiverHandler::drain, this));
522+
// }
523+
//
524+
// if (!process_reply_to && msg_received_cnt == count) {
525+
// if (durable_subscriber) {
526+
// d.receiver().detach();
527+
// } else {
528+
// d.receiver().close();
529+
// }
530+
// d.connection().close();
531+
// } else {
532+
//#if defined(__REACTOR_HAS_TIMER)
533+
// super::timer.reset();
534+
//#endif
535+
// }
536+
//}
537+
//
538+
//void TxReceiverHandler::on_receiver_drain_finish(receiver &r) {
539+
// logger(debug) << "Receiver drain finished";
540+
//}
541+
//
542+
//void TxReceiverHandler::on_tracker_accept(tracker &t)
543+
//{
544+
// logger(debug) << "Delivery accepted";
545+
//}
546+
//
547+
//
548+
//void TxReceiverHandler::on_tracker_reject(tracker &t)
549+
//{
550+
// logger(debug) << "Delivery rejected";
551+
//}
507552

508553
void TxReceiverHandler::on_transport_close(transport &t) {
509554
logger(debug) << "Closing the transport";

src/api/qpid-proton/reactor/handler/TxReceiverHandler.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,13 @@ class TxReceiverHandler : public CommonHandler, transaction_handler {
202202
void on_connection_close(connection &conn);
203203
void on_connection_error(connection &conn);
204204

205+
// TX
206+
void on_session_open(session &s);
207+
void on_transaction_declare_failed(transaction);
208+
void on_transaction_commit_failed(transaction t);
209+
void on_transaction_declared(transaction t);
210+
void on_transaction_committed(transaction t);
211+
205212
private:
206213
typedef CommonHandler super;
207214
receiver recv;
@@ -253,6 +260,7 @@ class TxReceiverHandler : public CommonHandler, transaction_handler {
253260
string tx_endloop_action = "commit";
254261

255262
transaction *tx;
263+
session sess;
256264
};
257265

258266
} /* namespace reactor */

0 commit comments

Comments
 (0)