Skip to content

Commit 1c36ee3

Browse files
committed
Move the invocation of apply_init_tip into spawn_poll
1 parent a1657b1 commit 1c36ee3

File tree

2 files changed

+53
-139
lines changed

2 files changed

+53
-139
lines changed

rpc/src/service_builder.rs

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use crate::IoHandler;
1010
use ckb_app_config::{DBConfig, IndexerConfig, RpcConfig};
1111
use ckb_chain::chain::ChainController;
1212
use ckb_indexer::IndexerService;
13-
use ckb_logger::warn;
1413
use ckb_network::NetworkController;
1514
use ckb_network_alert::{notifier::Notifier as AlertNotifier, verifier::Verifier as AlertVerifier};
1615
use ckb_pow::Pow;
@@ -202,20 +201,14 @@ impl<'a> ServiceBuilder<'a> {
202201
db_config: &DBConfig,
203202
indexer_config: &IndexerConfig,
204203
) -> Self {
205-
match IndexerService::new(db_config, indexer_config, shared.async_handle().clone()) {
206-
Ok(indexer) => {
207-
let indexer_handle = indexer.handle();
208-
let rpc_methods = IndexerRpcImpl::new(indexer_handle).to_delegate();
209-
if self.config.indexer_enable() {
210-
start_indexer(&shared, indexer, indexer_config.index_tx_pool);
211-
self.add_methods(rpc_methods);
212-
} else {
213-
self.update_disabled_methods("Indexer", rpc_methods);
214-
}
215-
}
216-
Err(e) => {
217-
warn!("Failed to enable indexer: {}", e);
218-
}
204+
let indexer = IndexerService::new(db_config, indexer_config, shared.async_handle().clone());
205+
let indexer_handle = indexer.handle();
206+
let rpc_methods = IndexerRpcImpl::new(indexer_handle).to_delegate();
207+
if self.config.indexer_enable() {
208+
start_indexer(&shared, indexer, indexer_config.index_tx_pool);
209+
self.add_methods(rpc_methods);
210+
} else {
211+
self.update_disabled_methods("Indexer", rpc_methods);
219212
}
220213
self
221214
}

util/indexer/src/service.rs

Lines changed: 45 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
//!The indexer service.
22

3+
use crate::error::Error;
34
use crate::indexer::{self, extract_raw_data, CustomFilters, Indexer, Key, KeyPrefix, Value};
45
use crate::pool::Pool;
56
use crate::store::{Batch, IteratorDirection, RocksdbStore, SecondaryDB, Store};
67

7-
use crate::error::Error;
88
use ckb_app_config::{DBConfig, IndexerConfig};
99
use ckb_async_runtime::{
1010
tokio::{self, time},
@@ -25,6 +25,7 @@ use rocksdb::{prelude::*, Direction, IteratorMode};
2525
use std::convert::TryInto;
2626
use std::num::NonZeroUsize;
2727
use std::sync::{Arc, RwLock};
28+
use std::thread::sleep;
2829
use std::time::Duration;
2930

3031
const SUBSCRIBER_NAME: &str = "Indexer";
@@ -41,15 +42,12 @@ pub struct IndexerService {
4142
async_handle: Handle,
4243
block_filter: Option<String>,
4344
cell_filter: Option<String>,
45+
init_tip_hash: Option<H256>,
4446
}
4547

4648
impl IndexerService {
4749
/// Construct new Indexer service instance from DBConfig and IndexerConfig
48-
pub fn new(
49-
ckb_db_config: &DBConfig,
50-
config: &IndexerConfig,
51-
async_handle: Handle,
52-
) -> Result<Self, Error> {
50+
pub fn new(ckb_db_config: &DBConfig, config: &IndexerConfig, async_handle: Handle) -> Self {
5351
let store_opts = Self::indexer_store_options(config);
5452
let store = RocksdbStore::new(&store_opts, &config.store);
5553
let pool = if config.index_tx_pool {
@@ -72,17 +70,16 @@ impl IndexerService {
7270
config.secondary_path.to_string_lossy().to_string(),
7371
);
7472

75-
Self::apply_init_tip(&config.init_tip_hash, &store, &secondary_db)?;
76-
77-
Ok(Self {
73+
Self {
7874
store,
7975
secondary_db,
8076
pool,
8177
async_handle,
8278
poll_interval: Duration::from_secs(config.poll_interval),
8379
block_filter: config.block_filter.clone(),
8480
cell_filter: config.cell_filter.clone(),
85-
})
81+
init_tip_hash: config.init_tip_hash.clone(),
82+
}
8683
}
8784

8885
/// Returns a handle to the indexer.
@@ -133,6 +130,38 @@ impl IndexerService {
133130
});
134131
}
135132

133+
fn apply_init_tip(&self) {
134+
if let Some(init_tip_hash) = &self.init_tip_hash {
135+
if self
136+
.store
137+
.iter([KeyPrefix::Header as u8 + 1], IteratorDirection::Reverse)
138+
.expect("iter Header should be OK")
139+
.next()
140+
.is_none()
141+
{
142+
loop {
143+
if let Err(e) = self.secondary_db.try_catch_up_with_primary() {
144+
error!("secondary_db try_catch_up_with_primary error {}", e);
145+
}
146+
if let Some(header) = self.secondary_db.get_block_header(&init_tip_hash.pack())
147+
{
148+
let init_tip_number = header.number();
149+
let mut batch = self.store.batch().expect("create batch should be OK");
150+
batch
151+
.put_kv(
152+
Key::Header(init_tip_number, &init_tip_hash.pack(), true),
153+
vec![],
154+
)
155+
.expect("insert init tip header should be OK");
156+
batch.commit().expect("commit batch should be OK");
157+
break;
158+
}
159+
sleep(Duration::from_secs(1));
160+
}
161+
}
162+
}
163+
}
164+
136165
fn try_loop_sync(&self) {
137166
// assume that long fork will not happen >= 100 blocks.
138167
let keep_num = 100;
@@ -177,9 +206,11 @@ impl IndexerService {
177206
/// Processes that handle block cell and expect to be spawned to run in tokio runtime
178207
pub fn spawn_poll(&self, notify_controller: NotifyController) {
179208
let initial_service = self.clone();
180-
let initial_syncing = self
181-
.async_handle
182-
.spawn_blocking(move || initial_service.try_loop_sync());
209+
let initial_syncing = self.async_handle.spawn_blocking(move || {
210+
initial_service.apply_init_tip();
211+
initial_service.try_loop_sync()
212+
});
213+
183214
let stop: CancellationToken = new_tokio_exit_rx();
184215
let async_handle = self.async_handle.clone();
185216
let poll_service = self.clone();
@@ -253,35 +284,6 @@ impl IndexerService {
253284
);
254285
opts
255286
}
256-
257-
pub(crate) fn apply_init_tip<T: ChainStore>(
258-
init_tip_hash: &Option<H256>,
259-
store: &RocksdbStore,
260-
secondary_db: &T,
261-
) -> Result<(), Error> {
262-
if let Some(init_tip_hash) = init_tip_hash {
263-
if store
264-
.iter([KeyPrefix::Header as u8 + 1], IteratorDirection::Reverse)
265-
.expect("iter Header should be OK")
266-
.next()
267-
.is_none()
268-
{
269-
let init_tip_number = secondary_db
270-
.get_block_header(&init_tip_hash.pack())
271-
.ok_or_else(|| Error::Params("setting the initial tip failed: could not find the block corresponding to the init tip hash.".to_string()))?
272-
.number();
273-
let mut batch = store.batch().expect("create batch should be OK");
274-
batch
275-
.put_kv(
276-
Key::Header(init_tip_number, &init_tip_hash.pack(), true),
277-
vec![],
278-
)
279-
.expect("insert init tip header should be OK");
280-
batch.commit().expect("commit batch should be OK");
281-
}
282-
}
283-
Ok(())
284-
}
285287
}
286288

287289
/// Handle to the indexer.
@@ -981,20 +983,13 @@ impl TryInto<FilterOptions> for IndexerSearchKey {
981983
mod tests {
982984
use super::*;
983985
use crate::store::RocksdbStore;
984-
use ckb_db::{
985-
iter::{DBIter, IteratorMode},
986-
DBPinnableSlice,
987-
};
988-
use ckb_db_schema::Col;
989986
use ckb_jsonrpc_types::{IndexerRange, IndexerSearchKeyFilter};
990-
use ckb_store::{Freezer, StoreCache};
991987
use ckb_types::{
992988
bytes::Bytes,
993989
core::{
994990
capacity_bytes, BlockBuilder, Capacity, EpochNumberWithFraction, HeaderBuilder,
995-
HeaderView, ScriptHashType, TransactionBuilder,
991+
ScriptHashType, TransactionBuilder,
996992
},
997-
h256,
998993
packed::{CellInput, CellOutputBuilder, OutPoint, Script, ScriptBuilder},
999994
H256,
1000995
};
@@ -1008,49 +1003,6 @@ mod tests {
10081003
// Indexer::new(store, 10, 1)
10091004
}
10101005

1011-
fn new_chain_store(prefix: &str) -> impl ChainStore {
1012-
struct DummyChainStore {
1013-
inner: RocksdbStore,
1014-
}
1015-
impl ChainStore for DummyChainStore {
1016-
fn cache(&self) -> Option<&StoreCache> {
1017-
None
1018-
}
1019-
fn freezer(&self) -> Option<&Freezer> {
1020-
None
1021-
}
1022-
fn get(&self, _col: Col, _key: &[u8]) -> Option<DBPinnableSlice> {
1023-
None
1024-
}
1025-
fn get_iter(&self, _col: Col, mode: IteratorMode) -> DBIter {
1026-
let opts = ReadOptions::default();
1027-
self.inner.inner().get_iter(&opts, mode)
1028-
}
1029-
fn get_block_header(&self, hash: &packed::Byte32) -> Option<HeaderView> {
1030-
match hash {
1031-
_ if hash
1032-
== &h256!(
1033-
"0x8fbd0ec887159d2814cee475911600e3589849670f5ee1ed9798b38fdeef4e44"
1034-
)
1035-
.pack() =>
1036-
{
1037-
let epoch = EpochNumberWithFraction::new(0, 0, 10);
1038-
Some(
1039-
HeaderView::new_advanced_builder()
1040-
.number(100000.pack())
1041-
.epoch(epoch.pack())
1042-
.build(),
1043-
)
1044-
}
1045-
_ => None,
1046-
}
1047-
}
1048-
}
1049-
DummyChainStore {
1050-
inner: new_store(prefix),
1051-
}
1052-
}
1053-
10541006
#[test]
10551007
fn rpc() {
10561008
let store = new_store("rpc");
@@ -1639,37 +1591,6 @@ mod tests {
16391591
);
16401592
}
16411593

1642-
#[test]
1643-
fn rpc_get_indexer_tip_with_set_init_tip() {
1644-
let store = new_store("rpc_get_indexer_tip_with_set_init_tip");
1645-
let ckb_db = new_chain_store("rpc_get_indexer_tip_with_set_init_tip_ckb");
1646-
1647-
// test setting the initial tip failed
1648-
let ret = IndexerService::apply_init_tip(&Some(H256::default()), &store, &ckb_db);
1649-
assert_eq!(ret.unwrap_err().to_string(), "Invalid params setting the initial tip failed: could not find the block corresponding to the init tip hash.");
1650-
1651-
// test get_tip rpc
1652-
IndexerService::apply_init_tip(
1653-
&Some(h256!(
1654-
"0x8fbd0ec887159d2814cee475911600e3589849670f5ee1ed9798b38fdeef4e44"
1655-
)),
1656-
&store,
1657-
&ckb_db,
1658-
)
1659-
.unwrap();
1660-
let pool = Arc::new(RwLock::new(Pool::default()));
1661-
let rpc = IndexerHandle {
1662-
store,
1663-
pool: Some(Arc::clone(&pool)),
1664-
};
1665-
let tip = rpc.get_indexer_tip().unwrap().unwrap();
1666-
assert_eq!(
1667-
h256!("0x8fbd0ec887159d2814cee475911600e3589849670f5ee1ed9798b38fdeef4e44"),
1668-
tip.block_hash
1669-
);
1670-
assert_eq!(100000, tip.block_number.value());
1671-
}
1672-
16731594
#[test]
16741595
fn script_search_mode_rpc() {
16751596
let store = new_store("script_search_mode_rpc");

0 commit comments

Comments
 (0)