Skip to content

Commit 385c822

Browse files
committed
feat: recreate pangea client
1 parent 154b2bd commit 385c822

File tree

1 file changed

+71
-52
lines changed

1 file changed

+71
-52
lines changed

forge/src/pangea/indexer.rs

+71-52
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use crate::{
1919
const BATCH_SIZE: u64 = 100_000;
2020

2121
pub struct PangeaIndexer {
22-
pangea_client: Client<WsProvider>,
23-
fuel_provider: Provider,
22+
pangea_host: String,
23+
provider: Provider,
2424
operation_tx: Sender<OperationMessage>,
2525
chain_id: ChainId,
2626
contract_h256: H256,
@@ -36,37 +36,42 @@ impl PangeaIndexer {
3636
"FUEL" => ChainId::FUEL,
3737
_ => ChainId::FUELTESTNET,
3838
};
39-
40-
let username = env::var("PANGEA_USERNAME").unwrap();
41-
let password = env::var("PANGEA_PASSWORD").unwrap();
42-
43-
let pangea_client = ClientBuilder::default()
44-
.endpoint(&config.pangea_host)
45-
.credential(username, password)
46-
.build::<WsProvider>()
47-
.await?;
39+
let pangea_host = config.pangea_host.clone();
4840

4941
let provider_url = match chain_id {
5042
ChainId::FUEL => Ok("mainnet.fuel.network"),
5143
ChainId::FUELTESTNET => Ok("testnet.fuel.network"),
5244
_ => Err(Error::InvalidChainId),
5345
}?;
54-
let fuel_provider = Provider::connect(provider_url).await?;
46+
let provider = Provider::connect(provider_url).await?;
5547

5648
log::info!("CHAIN: {:?}, PROVIDER: {:?}", chain_id, provider_url);
5749

5850
Ok(Self {
59-
pangea_client,
60-
fuel_provider,
51+
pangea_host,
52+
provider,
6153
operation_tx,
6254
chain_id,
6355
contract_h256: H256::from_str(market_id)?,
6456
})
6557
}
6658

59+
pub async fn create_pangea_client(&self) -> Result<Client<WsProvider>, Error> {
60+
let username = env::var("PANGEA_USERNAME").unwrap();
61+
let password = env::var("PANGEA_PASSWORD").unwrap();
62+
63+
let pangea_client = ClientBuilder::default()
64+
.endpoint(&self.pangea_host)
65+
.credential(username, password)
66+
.build::<WsProvider>()
67+
.await?;
68+
69+
Ok(pangea_client)
70+
}
71+
6772
pub async fn start(&self, latest_processed_block: i64) -> Result<(), Error> {
6873
// Get latest block number from blockchain
69-
let latest_block = self.fuel_provider.latest_block_height().await.unwrap() as i64;
74+
let latest_block = self.provider.latest_block_height().await.unwrap() as i64;
7075

7176
log::info!("Prune newest orders & trades");
7277
self.prune(latest_processed_block).await?;
@@ -107,6 +112,8 @@ impl PangeaIndexer {
107112
mut latest_processed_block: i64,
108113
to_block: i64,
109114
) -> Result<i64, Error> {
115+
let client = self.create_pangea_client().await?;
116+
110117
while latest_processed_block < to_block {
111118
let to_block = (latest_processed_block + BATCH_SIZE as i64).min(to_block);
112119

@@ -118,8 +125,7 @@ impl PangeaIndexer {
118125
..Default::default()
119126
};
120127

121-
let stream = self
122-
.pangea_client
128+
let stream = client
123129
.get_fuel_spark_orders_by_format(batch_request, Format::JsonStream, false)
124130
.await
125131
.expect("Failed to get fuel spark orders batch");
@@ -168,46 +174,59 @@ impl PangeaIndexer {
168174
let max_backoff = Duration::from_secs(32);
169175

170176
loop {
171-
let deltas_request = GetSparkOrderRequest {
172-
from_block: Bound::Exact(latest_processed_block + 1),
173-
to_block: Bound::Subscribe,
174-
market_id__in: HashSet::from([self.contract_h256]),
175-
chains: HashSet::from([self.chain_id]),
176-
..Default::default()
177-
};
178-
179-
match self
180-
.pangea_client
181-
.get_fuel_spark_orders_by_format(deltas_request, Format::JsonStream, true)
182-
.await
183-
{
184-
Ok(stream) => {
185-
backoff = Duration::from_secs(1);
186-
futures::pin_mut!(stream);
187-
188-
while let Some(data) = stream.next().await {
189-
match data {
190-
Ok(data) => {
191-
let data = String::from_utf8(data)?;
192-
let event = serde_json::from_str::<PangeaEvent>(&data)?;
193-
latest_processed_block = event.block_number;
194-
195-
log::debug!("LATEST_PROCESSED_BLOCK: {}", latest_processed_block);
196-
197-
self.handle_event(&event).await;
198-
self.operation_tx
199-
.send(OperationMessage::Dispatch(latest_processed_block))
200-
.unwrap();
201-
}
202-
Err(e) => {
203-
log::error!("Error in the stream of new orders (deltas): {e}");
204-
break;
177+
match self.create_pangea_client().await {
178+
Ok(client) => {
179+
let deltas_request = GetSparkOrderRequest {
180+
from_block: Bound::Exact(latest_processed_block + 1),
181+
to_block: Bound::Subscribe,
182+
market_id__in: HashSet::from([self.contract_h256]),
183+
chains: HashSet::from([self.chain_id]),
184+
..Default::default()
185+
};
186+
187+
match client
188+
.get_fuel_spark_orders_by_format(deltas_request, Format::JsonStream, true)
189+
.await
190+
{
191+
Ok(stream) => {
192+
backoff = Duration::from_secs(1);
193+
futures::pin_mut!(stream);
194+
195+
while let Some(data) = stream.next().await {
196+
match data {
197+
Ok(data) => {
198+
let data = String::from_utf8(data)?;
199+
let event = serde_json::from_str::<PangeaEvent>(&data)?;
200+
latest_processed_block = event.block_number;
201+
202+
log::debug!(
203+
"LATEST_PROCESSED_BLOCK: {}",
204+
latest_processed_block
205+
);
206+
207+
self.handle_event(&event).await;
208+
self.operation_tx
209+
.send(OperationMessage::Dispatch(
210+
latest_processed_block,
211+
))
212+
.unwrap();
213+
}
214+
Err(e) => {
215+
log::error!(
216+
"Error in the stream of new orders (deltas): {e}"
217+
);
218+
break;
219+
}
220+
}
205221
}
206222
}
223+
Err(e) => {
224+
log::error!("Failed to get fuel spark deltas: {e}");
225+
}
207226
}
208227
}
209228
Err(e) => {
210-
log::error!("Failed to get fuel spark deltas: {e}");
229+
log::error!("Failed to create pangea client: {e}");
211230
}
212231
}
213232

0 commit comments

Comments
 (0)