Skip to content

Commit 43b2353

Browse files
vibhurajeevLeouarzEvolveArtlana-shanghai
authored
Improve stability with avail DA (keep-starknet-strange#1257)
Co-authored-by: Leouarz <ghalielouarzazi@hotmail.com> Co-authored-by: 0xevolve <Artevolve@yahoo.com> Co-authored-by: lanaivina <31368580+lana-shanghai@users.noreply.github.com>
1 parent ee9ae0f commit 43b2353

File tree

5 files changed

+69
-47
lines changed

5 files changed

+69
-47
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
- chore(data-availability): update avail-subxt to version 0.4.0
1313
- fix(ci): setup should fetch files from local config
1414
- chore: deprecate `madara-app` and `madara-dev-explorer` modules
15+
- chore(data-availability-avail): implement fire and forget, and add ws
16+
reconnection logic
1517

1618
## v0.5.0
1719

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/client/data-availability/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ starknet_api = { workspace = true, default-features = true }
4545
ethers = "2.0.7"
4646

4747
# Avail subxt dependency
48-
avail-subxt = { git = "https://github.com/availproject/avail", version = "0.4.0", tag = "v1.8.0.0-rc1" }
48+
avail-subxt = { git = "https://github.com/availproject/avail", version = "0.4.0", tag = "v1.8.0.0" }
4949
sp-keyring = { workspace = true }
5050
subxt = "0.29"
5151

crates/client/data-availability/src/avail/mod.rs

Lines changed: 64 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
pub mod config;
22

3-
use anyhow::Result;
3+
use std::sync::Arc;
4+
5+
use anyhow::{anyhow, Result};
46
use async_trait::async_trait;
57
use avail_subxt::api::runtime_types::avail_core::AppId;
68
use avail_subxt::api::runtime_types::bounded_collections::bounded_vec::BoundedVec;
7-
use avail_subxt::api::runtime_types::da_control::pallet::Call as DaCall;
8-
use avail_subxt::avail::{AppUncheckedExtrinsic, Client as AvailSubxtClient};
9+
use avail_subxt::avail::Client as AvailSubxtClient;
910
use avail_subxt::primitives::AvailExtrinsicParams;
10-
use avail_subxt::{api as AvailApi, build_client, AvailConfig, Call};
11+
use avail_subxt::{api as AvailApi, build_client, AvailConfig};
1112
use ethers::types::{I256, U256};
12-
use sp_core::H256;
13+
use futures::lock::Mutex;
1314
use subxt::ext::sp_core::sr25519::Pair;
15+
use subxt::OnlineClient;
1416

1517
use crate::utils::get_bytes_from_state_diff;
1618
use crate::{DaClient, DaMode};
@@ -19,22 +21,55 @@ type AvailPairSigner = subxt::tx::PairSigner<AvailConfig, Pair>;
1921

2022
#[derive(Clone)]
2123
pub struct AvailClient {
22-
ws_client: AvailSubxtClient,
24+
ws_client: Arc<Mutex<SubxtClient>>,
2325
app_id: AppId,
2426
signer: AvailPairSigner,
2527
mode: DaMode,
2628
}
2729

30+
pub struct SubxtClient {
31+
client: AvailSubxtClient,
32+
config: config::AvailConfig,
33+
}
34+
35+
pub fn try_build_avail_subxt(conf: &config::AvailConfig) -> Result<OnlineClient<AvailConfig>> {
36+
let client =
37+
futures::executor::block_on(async { build_client(conf.ws_provider.as_str(), conf.validate_codegen).await })
38+
.map_err(|e| anyhow::anyhow!("DA Layer error: could not initialize ws endpoint {e}"))?;
39+
40+
Ok(client)
41+
}
42+
43+
impl SubxtClient {
44+
pub async fn restart(&mut self) -> Result<(), anyhow::Error> {
45+
self.client = match build_client(self.config.ws_provider.as_str(), self.config.validate_codegen).await {
46+
Ok(i) => i,
47+
Err(e) => return Err(anyhow!("DA Layer error: could not restart ws endpoint {e}")),
48+
};
49+
50+
Ok(())
51+
}
52+
53+
pub fn client(&self) -> &OnlineClient<AvailConfig> {
54+
&self.client
55+
}
56+
}
57+
58+
impl TryFrom<config::AvailConfig> for SubxtClient {
59+
type Error = anyhow::Error;
60+
61+
fn try_from(conf: config::AvailConfig) -> Result<Self, Self::Error> {
62+
Ok(Self { client: try_build_avail_subxt(&conf)?, config: conf })
63+
}
64+
}
65+
2866
#[async_trait]
2967
impl DaClient for AvailClient {
3068
async fn publish_state_diff(&self, state_diff: Vec<U256>) -> Result<()> {
3169
let bytes = get_bytes_from_state_diff(&state_diff);
3270
let bytes = BoundedVec(bytes);
71+
self.publish_data(&bytes).await?;
3372

34-
let submitted_block_hash = self.publish_data(&bytes).await?;
35-
36-
// This theoritically do not have to be put here since we wait for finalization before
37-
self.verify_bytes_inclusion(submitted_block_hash, &bytes).await?;
3873
Ok(())
3974
}
4075

@@ -50,38 +85,22 @@ impl DaClient for AvailClient {
5085
}
5186

5287
impl AvailClient {
53-
async fn publish_data(&self, bytes: &BoundedVec<u8>) -> Result<H256> {
88+
async fn publish_data(&self, bytes: &BoundedVec<u8>) -> Result<()> {
89+
let mut ws_client = self.ws_client.lock().await;
90+
5491
let data_transfer = AvailApi::tx().data_availability().submit_data(bytes.clone());
5592
let extrinsic_params = AvailExtrinsicParams::new_with_app_id(self.app_id);
56-
let events = self
57-
.ws_client
58-
.tx()
59-
.sign_and_submit_then_watch(&data_transfer, &self.signer, extrinsic_params)
60-
.await?
61-
.wait_for_finalized_success()
62-
.await?;
63-
64-
Ok(events.block_hash())
65-
}
6693

67-
async fn verify_bytes_inclusion(&self, block_hash: H256, bytes: &BoundedVec<u8>) -> Result<()> {
68-
let submitted_block = self
69-
.ws_client
70-
.rpc()
71-
.block(Some(block_hash))
72-
.await?
73-
.ok_or(anyhow::anyhow!("Invalid hash, block not found"))?;
74-
75-
submitted_block
76-
.block
77-
.extrinsics
78-
.into_iter()
79-
.filter_map(|chain_block_ext| AppUncheckedExtrinsic::try_from(chain_block_ext).map(|ext| ext.function).ok())
80-
.find(|call| match call {
81-
Call::DataAvailability(DaCall::submit_data { data }) => data == bytes,
82-
_ => false,
83-
})
84-
.ok_or(anyhow::anyhow!("Bytes not found in specified block"))?;
94+
match ws_client.client().tx().sign_and_submit(&data_transfer, &self.signer, extrinsic_params).await {
95+
Ok(i) => i,
96+
Err(e) => {
97+
if e.to_string().contains("restart required") {
98+
ws_client.restart().await;
99+
}
100+
101+
return Err(anyhow!("DA Layer error : failed due to closed websocket connection {e}"));
102+
}
103+
};
85104

86105
Ok(())
87106
}
@@ -95,11 +114,12 @@ impl TryFrom<config::AvailConfig> for AvailClient {
95114

96115
let app_id = AppId(conf.app_id);
97116

98-
let ws_client =
99-
futures::executor::block_on(async { build_client(conf.ws_provider.as_str(), conf.validate_codegen).await })
100-
.map_err(|e| anyhow::anyhow!("DA Layer error: could not initialize ws endpoint {e}"))?;
101-
102-
Ok(Self { ws_client, app_id, signer, mode: conf.mode })
117+
Ok(Self {
118+
ws_client: Arc::new(Mutex::new(SubxtClient::try_from(conf.clone())?)),
119+
app_id,
120+
signer,
121+
mode: conf.mode,
122+
})
103123
}
104124
}
105125

0 commit comments

Comments
 (0)