25
25
#include < boost/thread/condition_variable.hpp>
26
26
27
27
#include < queue>
28
+
28
29
using eosio::chain::name;
29
30
namespace fc { class variant ; }
30
31
@@ -50,10 +51,10 @@ namespace eosio {
50
51
51
52
~kafka_plugin_impl ();
52
53
53
- std::optional<boost::signals2::scoped_connection> accepted_block_connection;
54
- std::optional<boost::signals2::scoped_connection> irreversible_block_connection;
55
- std::optional<boost::signals2::scoped_connection> accepted_transaction_connection;
56
- std::optional<boost::signals2::scoped_connection> applied_transaction_connection;
54
+ std::optional <boost::signals2::scoped_connection> accepted_block_connection;
55
+ std::optional <boost::signals2::scoped_connection> irreversible_block_connection;
56
+ std::optional <boost::signals2::scoped_connection> accepted_transaction_connection;
57
+ std::optional <boost::signals2::scoped_connection> applied_transaction_connection;
57
58
chain_plugin *chain_plug;
58
59
struct action_info {
59
60
account_name account;
@@ -65,14 +66,13 @@ namespace eosio {
65
66
struct trasaction_info_st {
66
67
uint64_t block_number;
67
68
fc::time_point block_time;
68
- std::optional<chain::chain_id_type> chain_id;
69
+ std::optional <chain::chain_id_type> chain_id;
69
70
chain::transaction_trace_ptr trace;
70
- vector<action_info> action_vec;
71
+ vector <action_info> action_vec;
71
72
72
73
};
73
74
74
75
75
-
76
76
void consume_blocks ();
77
77
78
78
void accepted_block (const chain::block_state_ptr &);
@@ -103,8 +103,9 @@ namespace eosio {
103
103
104
104
bool configured{false };
105
105
106
- void filter_traction_trace (const chain::transaction_trace_ptr trace,action_name act_name);
107
- void _process_trace (vector<chain::action_trace>::iterator action_trace_ptr,action_name act_name);
106
+ void filter_traction_trace (const chain::transaction_trace_ptr trace, action_name act_name);
107
+
108
+ void _process_trace (vector<chain::action_trace>::iterator action_trace_ptr, action_name act_name);
108
109
109
110
template <typename Queue, typename Entry>
110
111
void queue (Queue &queue, const Entry &e);
@@ -114,20 +115,20 @@ namespace eosio {
114
115
115
116
size_t max_queue_size = 10000 ;
116
117
int queue_sleep_time = 0 ;
117
- std::deque<chain::transaction_metadata_ptr> transaction_metadata_queue;
118
- std::deque<chain::transaction_metadata_ptr> transaction_metadata_process_queue;
119
- std::deque<trasaction_info_st> transaction_trace_queue;
120
- std::deque<trasaction_info_st> transaction_trace_process_queue;
121
- std::deque<chain::block_state_ptr> block_state_queue;
122
- std::deque<chain::block_state_ptr> block_state_process_queue;
123
- std::deque<chain::block_state_ptr> irreversible_block_state_queue;
124
- std::deque<chain::block_state_ptr> irreversible_block_state_process_queue;
118
+ std::deque <chain::transaction_metadata_ptr> transaction_metadata_queue;
119
+ std::deque <chain::transaction_metadata_ptr> transaction_metadata_process_queue;
120
+ std::deque <trasaction_info_st> transaction_trace_queue;
121
+ std::deque <trasaction_info_st> transaction_trace_process_queue;
122
+ std::deque <chain::block_state_ptr> block_state_queue;
123
+ std::deque <chain::block_state_ptr> block_state_process_queue;
124
+ std::deque <chain::block_state_ptr> irreversible_block_state_queue;
125
+ std::deque <chain::block_state_ptr> irreversible_block_state_process_queue;
125
126
std::mutex mtx;
126
127
std::condition_variable condition;
127
128
std::thread consume_thread;
128
129
std::atomic_bool done{false };
129
130
std::atomic_bool startup{true };
130
- std::optional<chain::chain_id_type> chain_id;
131
+ std::optional <chain::chain_id_type> chain_id;
131
132
fc::microseconds abi_serializer_max_time;
132
133
133
134
static const account_name newaccount;
@@ -154,7 +155,7 @@ namespace eosio {
154
155
155
156
template <typename Queue, typename Entry>
156
157
void kafka_plugin_impl::queue (Queue &queue, const Entry &e) {
157
- std::unique_lock<std::mutex> lock (mtx);
158
+ std::unique_lock <std::mutex> lock (mtx);
158
159
auto queue_size = queue.size ();
159
160
if (queue_size > max_queue_size) {
160
161
lock.unlock ();
@@ -193,7 +194,6 @@ namespace eosio {
193
194
trasaction_info_st transactioninfo = trasaction_info_st{
194
195
.block_number = t->block_num ,// chain.pending_block_state()->block_num,
195
196
.chain_id = this ->chain_id ,
196
- // .block_time = chain.pending_block_time(),
197
197
.trace =chain::transaction_trace_ptr (t),
198
198
};
199
199
transactioninfo.block_time = chain.pending_block_time ();
@@ -238,7 +238,7 @@ namespace eosio {
238
238
try {
239
239
240
240
while (true ) {
241
- std::unique_lock<std::mutex> lock (mtx);
241
+ std::unique_lock <std::mutex> lock (mtx);
242
242
while (transaction_metadata_queue.empty () &&
243
243
transaction_trace_queue.empty () &&
244
244
block_state_queue.empty () &&
@@ -409,8 +409,9 @@ namespace eosio {
409
409
filter_traction_trace (t.trace , name (" transfer" ));
410
410
if (t.trace ->action_traces .size () > 0 ) {
411
411
string transfer_json =
412
- " {\" block_number\" :" + std::to_string (t.block_number ) + " ,\" block_time\" :" + std::to_string (time) +
413
- " ,\" chain_id\" :" + " \" " + t.chain_id ->str () + " \" " +
412
+ " {\" block_number\" :" + std::to_string (t.block_number ) + " ,\" block_time\" :" +
413
+ std::to_string (time) +
414
+ " ,\" chain_id\" :" + " \" " + t.chain_id ->str () + " \" " +
414
415
" ,\" trace\" :" + fc::json::to_string (t.trace , fc::time_point::maximum ()).c_str () + " }" ;
415
416
producer->trx_kafka_sendmsg (KAFKA_TRX_TRANSFER, (char *) transfer_json.c_str ());
416
417
// elog("transfer_json = ${e}",("e",transfer_json));
@@ -523,8 +524,7 @@ namespace eosio {
523
524
(" kafka-block-start" , bpo::value<uint32_t >()->default_value (256 ),
524
525
" If specified then only abi data pushed to kafka until specified block is reached." )
525
526
(" kafka-compression-codec" , bpo::value<std::string>(),
526
- " Compression codec to use for compressing message sets. This is the default value for all topics, may be overriden by the topic configuration property compression.codec.(none, gzip, snappy, lz4)" )
527
- ;
527
+ " Compression codec to use for compressing message sets. This is the default value for all topics, may be overriden by the topic configuration property compression.codec.(none, gzip, snappy, lz4)" );
528
528
529
529
}
530
530
@@ -550,13 +550,14 @@ namespace eosio {
550
550
transfer_trx_topic = (char *) (options.at (" transfer_trx_topic" ).as <std::string>().c_str ());
551
551
elog (" transfer_trx_topic:${j}" , (" j" , transfer_trx_topic));
552
552
}
553
- if (options.count (" kafka-compression-codec" ) != 0 ) {
553
+ if (options.count (" kafka-compression-codec" ) != 0 ) {
554
554
compression_codec = (char *) (options.at (" kafka-compression-codec" ).as <std::string>().c_str ());
555
555
elog (" kafka-compression-codec:${j}" , (" j" , compression_codec));
556
556
}
557
557
558
- if (0 != my->producer ->trx_kafka_init (brokers_str, accept_trx_topic, compression_codec, applied_trx_topic,
559
- transfer_trx_topic)) {
558
+ if (0 !=
559
+ my->producer ->trx_kafka_init (brokers_str, accept_trx_topic, compression_codec, applied_trx_topic,
560
+ transfer_trx_topic)) {
560
561
elog (" trx_kafka_init fail" );
561
562
} else {
562
563
elog (" trx_kafka_init ok" );
@@ -600,7 +601,7 @@ namespace eosio {
600
601
}));
601
602
my->applied_transaction_connection .emplace (
602
603
chain.applied_transaction .connect (
603
- [&](std::tuple<const std::shared_ptr<chain::transaction_trace>&, const std::shared_ptr<const chain::packed_transaction>&> t) {
604
+ [&](std::tuple<const std::shared_ptr <chain::transaction_trace> &, const std::shared_ptr<const chain::packed_transaction> &> t) {
604
605
my->applied_transaction (std::get<0 >(t));
605
606
}));
606
607
my->init ();
0 commit comments