Skip to content

Commit 16a2616

Browse files
committed
Merge branch 'feature/parallel-collation-p' of https://github.com/SmaGMan/ever-node
1 parent 41b21ba commit 16a2616

File tree

10 files changed

+1153
-189
lines changed

10 files changed

+1153
-189
lines changed

src/config.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,26 +79,35 @@ impl Default for CellsGcConfig {
7979
pub struct CollatorConfig {
8080
pub cutoff_timeout_ms: u32,
8181
pub stop_timeout_ms: u32,
82+
pub finalize_parallel_percentage_points: u32,
8283
pub clean_timeout_percentage_points: u32,
8384
pub optimistic_clean_percentage_points: u32,
8485
pub max_secondary_clean_timeout_percentage_points: u32,
8586
pub max_collate_threads: u32,
87+
pub max_collate_msgs_queue_on_account: u32,
8688
pub retry_if_empty: bool,
8789
pub finalize_empty_after_ms: u32,
8890
pub empty_collation_sleep_ms: u32,
8991
pub external_messages_timeout_percentage_points: u32,
9092
#[serde(skip_serializing_if = "Option::is_none")]
9193
pub external_messages_maximum_queue_length: Option<u32>, // None - unlimited
9294
}
95+
impl CollatorConfig {
96+
pub fn get_finalize_parallel_timeout_ms(&self) -> u32 {
97+
self.stop_timeout_ms * self.finalize_parallel_percentage_points / 1000
98+
}
99+
}
93100
impl Default for CollatorConfig {
94101
fn default() -> Self {
95102
Self {
96103
cutoff_timeout_ms: 1000,
97104
stop_timeout_ms: 1500,
105+
finalize_parallel_percentage_points: 800, // 0.8 = 80% * stop_timeout_ms = 1200
98106
clean_timeout_percentage_points: 150, // 0.150 = 15% = 150ms
99107
optimistic_clean_percentage_points: 1000, // 1.000 = 100% = 150ms
100108
max_secondary_clean_timeout_percentage_points: 350, // 0.350 = 35% = 350ms
101109
max_collate_threads: 10,
110+
max_collate_msgs_queue_on_account: 3,
102111
retry_if_empty: false,
103112
finalize_empty_after_ms: 800,
104113
empty_collation_sleep_ms: 100,

src/lib.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@ mod shard_blocks;
3939

4040
include!("../common/src/info.rs");
4141

42-
#[cfg(feature = "tracing")]
43-
pub mod jaeger;
44-
45-
#[cfg(not(feature = "tracing"))]
4642
pub mod jaeger {
4743
pub fn init_jaeger(){}
4844
#[cfg(feature = "external_db")]

src/main.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@ mod ext_messages;
3939

4040
mod shard_blocks;
4141

42-
#[cfg(feature = "tracing")]
43-
mod jaeger;
44-
45-
#[cfg(not(feature = "tracing"))]
4642
mod jaeger {
4743
pub fn init_jaeger(){}
4844
#[cfg(feature = "external_db")]
@@ -52,7 +48,7 @@ mod jaeger {
5248

5349
use crate::{
5450
config::TonNodeConfig, engine::{Engine, Stopper, EngineFlags},
55-
jaeger::init_jaeger, internal_db::restore::set_graceful_termination,
51+
internal_db::restore::set_graceful_termination,
5652
validating_utils::supported_version
5753
};
5854
#[cfg(feature = "external_db")]
@@ -456,7 +452,7 @@ fn main() {
456452
.build()
457453
.expect("Can't create Validator tokio runtime");
458454

459-
init_jaeger();
455+
jaeger::init_jaeger();
460456

461457
#[cfg(feature = "trace_alloc_detail")]
462458
thread::spawn(

src/types/accounts.rs

Lines changed: 164 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,32 +11,48 @@
1111
* limitations under the License.
1212
*/
1313

14-
use std::sync::{atomic::AtomicU64, Arc};
1514
use ever_block::{
15+
error, fail, AccountId, Cell, HashmapRemover, Result, UInt256,
1616
Account, AccountBlock, Augmentation, CopyleftRewards, Deserializable, HashUpdate,
1717
HashmapAugType, LibDescr, Libraries, Serializable, ShardAccount, ShardAccounts, StateInitLib,
1818
Transaction, Transactions,
1919
};
20-
use ever_block::{fail, AccountId, Cell, HashmapRemover, Result, UInt256};
2120

2221
pub struct ShardAccountStuff {
2322
account_addr: AccountId,
2423
account_root: Cell,
2524
last_trans_hash: UInt256,
2625
last_trans_lt: u64,
27-
lt: Arc<AtomicU64>,
28-
transactions: Transactions,
26+
lt: u64,
27+
transactions: Option<Transactions>,
2928
state_update: HashUpdate,
30-
orig_libs: StateInitLib,
31-
copyleft_rewards: CopyleftRewards,
29+
orig_libs: Option<StateInitLib>,
30+
copyleft_rewards: Option<CopyleftRewards>,
31+
32+
/// * Sync key of message, which updated account state
33+
/// * It is an incremental counter set by executor
34+
update_msg_sync_key: Option<usize>,
35+
36+
// /// * Executor sets transaction that updated account to current state
37+
// /// * Initial account state contains None
38+
// last_transaction: Option<(Cell, CurrencyCollection)>,
39+
40+
/// LT of transaction, which updated account state
41+
update_trans_lt: Option<u64>,
42+
43+
/// The copyleft_reward of transaction, which updated account state (if exists)
44+
update_copyleft_reward_address: Option<AccountId>,
45+
46+
/// Executor stores prevoius account state
47+
prev_account_stuff: Option<Box<ShardAccountStuff>>,
3248
serde_opts: u8,
3349
}
3450

3551
impl ShardAccountStuff {
3652
pub fn new(
3753
account_addr: AccountId,
3854
shard_acc: ShardAccount,
39-
lt: Arc<AtomicU64>,
55+
lt: u64,
4056
serde_opts: u8,
4157
) -> Result<Self> {
4258
let account_hash = shard_acc.account_cell().repr_hash();
@@ -45,17 +61,65 @@ impl ShardAccountStuff {
4561
let last_trans_lt = shard_acc.last_trans_lt();
4662
Ok(Self{
4763
account_addr,
48-
orig_libs: shard_acc.read_account()?.libraries(),
64+
orig_libs: Some(shard_acc.read_account()?.libraries()),
4965
account_root,
5066
last_trans_hash,
5167
last_trans_lt,
5268
lt,
53-
transactions: Transactions::with_serde_opts(serde_opts),
69+
transactions: Some(Transactions::with_serde_opts(serde_opts)),
5470
state_update: HashUpdate::with_hashes(account_hash.clone(), account_hash),
55-
copyleft_rewards: CopyleftRewards::default(),
71+
copyleft_rewards: Some(CopyleftRewards::default()),
72+
73+
update_msg_sync_key: None,
74+
//last_transaction: None,
75+
update_trans_lt: None,
76+
update_copyleft_reward_address: None,
77+
prev_account_stuff: None,
5678
serde_opts,
5779
})
5880
}
81+
/// Returns:
82+
/// * None - if no updates or no matching records in history
83+
/// * Some(particular) - record from history that matches update_msg_sync_key == on_msg_sync_key
84+
pub fn commit(mut self, on_msg_sync_key: usize) -> Result<Option<Self>> {
85+
while let Some(current_update_msg_sync_key) = self.update_msg_sync_key {
86+
if current_update_msg_sync_key == on_msg_sync_key {
87+
log::debug!("account {:x} state committed by processed message {} in the queue", self.account_addr(), on_msg_sync_key);
88+
return Ok(Some(self));
89+
} else {
90+
if !self.revert()? {
91+
log::debug!("unable to revert account {:x} state, current state is a first in history", self.account_addr());
92+
return Ok(None);
93+
} else {
94+
log::debug!("account {:x} state reverted one step back to message {:?} in the queue", self.account_addr(), self.update_msg_sync_key);
95+
}
96+
}
97+
}
98+
Ok(None)
99+
}
100+
fn revert(&mut self) -> Result<bool> {
101+
let mut taked_prev = match self.prev_account_stuff.take() {
102+
Some(prev) => prev,
103+
None => return Ok(false),
104+
};
105+
let prev = taked_prev.as_mut();
106+
107+
prev.orig_libs = self.orig_libs.take();
108+
109+
prev.transactions = self.transactions.take();
110+
if let Some(update_trans_lt) = self.update_trans_lt {
111+
prev.remove_trans(update_trans_lt)?;
112+
}
113+
114+
prev.copyleft_rewards = self.copyleft_rewards.take();
115+
if let Some(update_copyleft_reward_address) = self.update_copyleft_reward_address.as_ref() {
116+
prev.remove_copyleft_reward(update_copyleft_reward_address)?;
117+
}
118+
119+
std::mem::swap(self, prev);
120+
121+
Ok(true)
122+
}
59123
pub fn update_shard_state(&mut self, new_accounts: &mut ShardAccounts) -> Result<AccountBlock> {
60124
let account = self.read_account()?;
61125
if account.is_none() {
@@ -65,10 +129,10 @@ impl ShardAccountStuff {
65129
let value = shard_acc.write_to_new_cell()?;
66130
new_accounts.set_builder_serialized(self.account_addr().clone(), &value, &account.aug()?)?;
67131
}
68-
AccountBlock::with_params(&self.account_addr, &self.transactions, &self.state_update)
132+
AccountBlock::with_params(&self.account_addr, self.transactions()?, &self.state_update)
69133
}
70-
pub fn lt(&self) -> Arc<AtomicU64> {
71-
self.lt.clone()
134+
pub fn lt(&self) -> u64 {
135+
self.lt
72136
}
73137
pub fn read_account(&self) -> Result<Account> {
74138
Account::construct_from_cell(self.account_root())
@@ -82,39 +146,117 @@ impl ShardAccountStuff {
82146
pub fn account_addr(&self) -> &AccountId {
83147
&self.account_addr
84148
}
85-
pub fn copyleft_rewards(&self) -> &CopyleftRewards {
86-
&self.copyleft_rewards
149+
pub fn copyleft_rewards(&self) -> Result<&CopyleftRewards> {
150+
self.copyleft_rewards.as_ref()
151+
.ok_or_else(|| error!(
152+
"`copyleft_rewards` field is None, possibly you try access a not root record in the history, run commit() before"
153+
))
154+
}
155+
fn copyleft_rewards_mut(&mut self) -> Result<&mut CopyleftRewards> {
156+
self.copyleft_rewards.as_mut()
157+
.ok_or_else(|| error!(
158+
"`copyleft_rewards` field is None, possibly you try access a not root record in the history, run commit() before"
159+
))
160+
}
161+
fn remove_copyleft_reward(&mut self, address: &AccountId) -> Result<bool> {
162+
self.copyleft_rewards_mut()?.remove(address)
163+
}
164+
165+
fn transactions(&self) -> Result<&Transactions> {
166+
self.transactions.as_ref()
167+
.ok_or_else(|| error!(
168+
"`transactions` field is None, possibly you try access a not root record in the history, run commit() before"
169+
))
170+
}
171+
fn transactions_mut(&mut self) -> Result<&mut Transactions> {
172+
self.transactions.as_mut()
173+
.ok_or_else(|| error!(
174+
"`transactions` field is None, possibly you try access a not root record in the history, run commit() before"
175+
))
176+
}
177+
fn remove_trans(&mut self, trans_lt: u64) -> Result<()> {
178+
let key = trans_lt.write_to_bitstring()?;
179+
self.transactions_mut()?.remove(key)?;
180+
Ok(())
181+
}
182+
183+
fn orig_libs(&self) -> Result<&StateInitLib> {
184+
self.orig_libs.as_ref()
185+
.ok_or_else(|| error!(
186+
"`orig_libs` field is None, possibly you try access a not root record in the history, run commit() before"
187+
))
188+
}
189+
190+
pub fn apply_transaction_res(
191+
&mut self,
192+
update_msg_sync_key: usize,
193+
tx_last_lt: u64,
194+
transaction_res: &mut Result<Transaction>,
195+
account_root: Cell,
196+
) -> Result<()> {
197+
let mut res = ShardAccountStuff {
198+
account_addr: self.account_addr.clone(),
199+
account_root: self.account_root.clone(),
200+
last_trans_hash: self.last_trans_hash.clone(),
201+
last_trans_lt: self.last_trans_lt,
202+
lt: tx_last_lt, // 1014 or 1104 or 1024
203+
transactions: self.transactions.take(),
204+
state_update: self.state_update.clone(),
205+
orig_libs: self.orig_libs.take(),
206+
copyleft_rewards: Some(CopyleftRewards::default()),
207+
update_msg_sync_key: Some(update_msg_sync_key),
208+
update_trans_lt: None,
209+
update_copyleft_reward_address: None,
210+
prev_account_stuff: None,
211+
serde_opts: self.serde_opts,
212+
};
213+
214+
if let Ok(transaction) = transaction_res {
215+
res.add_transaction(transaction, account_root)?;
216+
}
217+
218+
std::mem::swap(self, &mut res);
219+
220+
self.prev_account_stuff = Some(Box::new(res));
221+
222+
Ok(())
87223
}
88224
pub fn add_transaction(&mut self, transaction: &mut Transaction, account_root: Cell) -> Result<()> {
89225
transaction.set_prev_trans_hash(self.last_trans_hash.clone());
90-
transaction.set_prev_trans_lt(self.last_trans_lt);
226+
transaction.set_prev_trans_lt(self.last_trans_lt); // 1010
91227
// log::trace!("{} {}", self.collated_block_descr, debug_transaction(transaction.clone())?);
92228

93229
self.account_root = account_root;
94230
self.state_update.new_hash = self.account_root.repr_hash();
95231

96232
let tr_root = transaction.serialize_with_opts(self.serde_opts)?;
233+
let tr_lt = transaction.logical_time(); // 1011
234+
97235
self.last_trans_hash = tr_root.repr_hash();
98-
self.last_trans_lt = transaction.logical_time();
236+
self.last_trans_lt = tr_lt;
237+
238+
self.update_trans_lt = Some(tr_lt);
99239

100-
self.transactions.setref(
101-
&transaction.logical_time(),
240+
self.transactions_mut()?.setref(
241+
&tr_lt,
102242
&tr_root,
103243
transaction.total_fees()
104244
)?;
105245

106246
if let Some(copyleft_reward) = transaction.copyleft_reward() {
107247
log::trace!("Copyleft reward {} {} from transaction {}", copyleft_reward.address, copyleft_reward.reward, self.last_trans_hash);
108-
self.copyleft_rewards.add_copyleft_reward(&copyleft_reward.address, &copyleft_reward.reward)?;
248+
self.copyleft_rewards_mut()?.add_copyleft_reward(&copyleft_reward.address, &copyleft_reward.reward)?;
249+
self.update_copyleft_reward_address = Some(copyleft_reward.address.clone());
109250
}
110251

111252
Ok(())
112253
}
113254
pub fn update_public_libraries(&self, libraries: &mut Libraries) -> Result<()> {
114255
let account = self.read_account()?;
115256
let new_libs = account.libraries();
116-
if new_libs.root() != self.orig_libs.root() {
117-
new_libs.scan_diff(&self.orig_libs, |key: UInt256, old, new| {
257+
let orig_libs = self.orig_libs()?;
258+
if new_libs.root() != orig_libs.root() {
259+
new_libs.scan_diff(orig_libs, |key: UInt256, old, new| {
118260
let old = old.unwrap_or_default();
119261
let new = new.unwrap_or_default();
120262
if old.is_public_library() && !new.is_public_library() {

src/types/limits.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ impl BlockLimitStatus {
109109
}
110110

111111
/// Update logical time
112-
pub fn update_lt(&mut self, lt: u64) {
113-
self.lt_current = max(self.lt_current, lt);
112+
pub fn update_lt(&mut self, lt: u64, force: bool) {
113+
self.lt_current = if force { lt } else { max(self.lt_current, lt) };
114114
if self.lt_start > self.lt_current {
115115
self.lt_start = lt;
116116
}

src/types/messages.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ impl MsgEnvelopeStuff {
8383
pub fn message(&self) -> &CommonMessage { &self.msg }
8484
pub fn message_hash(&self) -> UInt256 { self.env.message_hash() }
8585
pub fn message_cell(&self) -> ChildCell<CommonMessage> { self.env.msg_cell() }
86+
pub fn out_msg_key(&self) -> OutMsgQueueKey {
87+
OutMsgQueueKey::with_account_prefix(&self.next_prefix(), self.message_hash())
88+
}
8689
#[cfg(test)]
8790
pub fn src_prefix(&self) -> &AccountIdPrefixFull { &self.src_prefix }
8891
pub fn dst_prefix(&self) -> &AccountIdPrefixFull { &self.dst_prefix }

0 commit comments

Comments
 (0)