Skip to content

Commit 9ab85c5

Browse files
authored
[testnet] Avoid using shared chain state views except for GraphQL. (#4797) (#4798)
Backport of #4797. ## Motivation With the exception of a GraphQL query, the chain state view should only be accessed by the corresponding chain worker. GraphQL is why the worker can return a shared chain state view behind a lock, but we are using that in many other places, too. After #4769, this is even more dangerous, as an actor could be dropped and restarted while shared views are still being used. ## Proposal Replace several `chain_state_view()` calls with chain info or new chain worker requests. Also, extend a comment. (See #4793 (comment).) ## Test Plan CI ## Release Plan - These changes should be released in a new SDK. ## Links - PR to main: #4797 - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent 39cf649 commit 9ab85c5

File tree

7 files changed

+265
-35
lines changed

7 files changed

+265
-35
lines changed

linera-chain/src/chain.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,10 @@ where
652652
}
653653

654654
/// Removes the incoming message bundles in the block from the inboxes.
655+
///
656+
/// If `must_be_present` is `true`, an error is returned if any of the bundles have not been
657+
/// added to the inbox yet. So this should be `true` if the bundles are in a block _proposal_,
658+
/// and `false` if the block is already confirmed.
655659
#[instrument(target = "telemetry_only", skip_all, fields(
656660
chain_id = %self.chain_id(),
657661
))]

linera-core/src/chain_worker/actor.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,36 @@ where
157157
new_trackers: BTreeMap<ValidatorPublicKey, u64>,
158158
callback: oneshot::Sender<Result<(), WorkerError>>,
159159
},
160+
161+
/// Get preprocessed block hashes in a given height range.
162+
GetPreprocessedBlockHashes {
163+
start: BlockHeight,
164+
end: BlockHeight,
165+
#[debug(skip)]
166+
callback: oneshot::Sender<Result<Vec<CryptoHash>, WorkerError>>,
167+
},
168+
169+
/// Get the next block height to receive from an inbox.
170+
GetInboxNextHeight {
171+
origin: ChainId,
172+
#[debug(skip)]
173+
callback: oneshot::Sender<Result<BlockHeight, WorkerError>>,
174+
},
175+
176+
/// Get locking blobs for specific blob IDs.
177+
GetLockingBlobs {
178+
blob_ids: Vec<BlobId>,
179+
#[debug(skip)]
180+
callback: oneshot::Sender<Result<Option<Vec<Blob>>, WorkerError>>,
181+
},
182+
183+
/// Read a range from the confirmed log.
184+
ReadConfirmedLog {
185+
start: BlockHeight,
186+
end: BlockHeight,
187+
#[debug(skip)]
188+
callback: oneshot::Sender<Result<Vec<CryptoHash>, WorkerError>>,
189+
},
160190
}
161191

162192
/// The actor worker type.

linera-core/src/chain_worker/state.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,26 @@ where
205205
.await,
206206
)
207207
.is_ok(),
208+
ChainWorkerRequest::GetPreprocessedBlockHashes {
209+
start,
210+
end,
211+
callback,
212+
} => callback
213+
.send(self.get_preprocessed_block_hashes(start, end).await)
214+
.is_ok(),
215+
ChainWorkerRequest::GetInboxNextHeight { origin, callback } => callback
216+
.send(self.get_inbox_next_height(origin).await)
217+
.is_ok(),
218+
ChainWorkerRequest::GetLockingBlobs { blob_ids, callback } => callback
219+
.send(self.get_locking_blobs(blob_ids).await)
220+
.is_ok(),
221+
ChainWorkerRequest::ReadConfirmedLog {
222+
start,
223+
end,
224+
callback,
225+
} => callback
226+
.send(self.read_confirmed_log(start, end).await)
227+
.is_ok(),
208228
};
209229

210230
if !responded {
@@ -1020,6 +1040,92 @@ where
10201040
Ok(())
10211041
}
10221042

1043+
/// Returns the preprocessed block hashes in the given height range.
1044+
#[instrument(target = "telemetry_only", skip_all, fields(
1045+
chain_id = %self.chain_id(),
1046+
start = %start,
1047+
end = %end
1048+
))]
1049+
async fn get_preprocessed_block_hashes(
1050+
&self,
1051+
start: BlockHeight,
1052+
end: BlockHeight,
1053+
) -> Result<Vec<CryptoHash>, WorkerError> {
1054+
let mut hashes = Vec::new();
1055+
let mut height = start;
1056+
while height < end {
1057+
match self.chain.preprocessed_blocks.get(&height).await? {
1058+
Some(hash) => hashes.push(hash),
1059+
None => break,
1060+
}
1061+
height = height.try_add_one()?;
1062+
}
1063+
Ok(hashes)
1064+
}
1065+
1066+
/// Returns the next block height to receive from an inbox.
1067+
#[instrument(target = "telemetry_only", skip_all, fields(
1068+
chain_id = %self.chain_id(),
1069+
origin = %origin
1070+
))]
1071+
async fn get_inbox_next_height(&self, origin: ChainId) -> Result<BlockHeight, WorkerError> {
1072+
Ok(match self.chain.inboxes.try_load_entry(&origin).await? {
1073+
Some(inbox) => inbox.next_block_height_to_receive()?,
1074+
None => BlockHeight::ZERO,
1075+
})
1076+
}
1077+
1078+
/// Returns the locking blobs for the given blob IDs.
1079+
/// Returns `Ok(None)` if any of the blobs is not found.
1080+
#[instrument(target = "telemetry_only", skip_all, fields(
1081+
chain_id = %self.chain_id(),
1082+
num_blob_ids = %blob_ids.len()
1083+
))]
1084+
async fn get_locking_blobs(
1085+
&self,
1086+
blob_ids: Vec<BlobId>,
1087+
) -> Result<Option<Vec<Blob>>, WorkerError> {
1088+
let mut blobs = Vec::new();
1089+
for blob_id in blob_ids {
1090+
match self.chain.manager.locking_blobs.get(&blob_id).await? {
1091+
None => return Ok(None),
1092+
Some(blob) => blobs.push(blob),
1093+
}
1094+
}
1095+
Ok(Some(blobs))
1096+
}
1097+
1098+
/// Reads a range from the confirmed log.
1099+
#[instrument(target = "telemetry_only", skip_all, fields(
1100+
chain_id = %self.chain_id(),
1101+
start = %start,
1102+
end = %end
1103+
))]
1104+
async fn read_confirmed_log(
1105+
&self,
1106+
start: BlockHeight,
1107+
end: BlockHeight,
1108+
) -> Result<Vec<CryptoHash>, WorkerError> {
1109+
let start_usize = usize::try_from(start)?;
1110+
let end_usize = usize::try_from(end)?;
1111+
let log_heights: Vec<_> = (start_usize..end_usize).collect();
1112+
let hashes = self
1113+
.chain
1114+
.confirmed_log
1115+
.multi_get(log_heights.clone())
1116+
.await?
1117+
.into_iter()
1118+
.enumerate()
1119+
.map(|(i, maybe_hash)| {
1120+
maybe_hash.ok_or_else(|| WorkerError::ConfirmedLogEntryNotFound {
1121+
height: BlockHeight(u64::try_from(start_usize + i).unwrap_or(u64::MAX)),
1122+
chain_id: self.chain_id(),
1123+
})
1124+
})
1125+
.collect::<Result<Vec<_>, _>>()?;
1126+
Ok(hashes)
1127+
}
1128+
10231129
/// Attempts to vote for a leader timeout, if possible.
10241130
#[instrument(target = "telemetry_only", skip_all, fields(
10251131
chain_id = %self.chain_id(),

linera-core/src/client/mod.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -321,19 +321,12 @@ impl<Env: Environment> Client<Env> {
321321
) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
322322
let mut last_info = None;
323323
// First load any blocks from local storage, if available.
324-
let mut hashes = Vec::new();
325-
let mut next_height = BlockHeight::ZERO;
326-
{
327-
let chain = self.local_node.chain_state_view(chain_id).await?;
328-
next_height = next_height.max(chain.tip_state.get().next_block_height);
329-
while next_height < stop {
330-
let Some(hash) = chain.preprocessed_blocks.get(&next_height).await? else {
331-
break;
332-
};
333-
hashes.push(hash);
334-
next_height = next_height.try_add_one()?;
335-
}
336-
}
324+
let chain_info = self.local_node.chain_info(chain_id).await?;
325+
let mut next_height = chain_info.next_block_height;
326+
let hashes = self
327+
.local_node
328+
.get_preprocessed_block_hashes(chain_id, next_height, stop)
329+
.await?;
337330
let certificates = self
338331
.storage_client()
339332
.read_certificates(hashes.clone())
@@ -3847,11 +3840,11 @@ impl<Env: Environment> ChainClient<Env> {
38473840
&self,
38483841
origin: ChainId,
38493842
) -> Result<BlockHeight, ChainClientError> {
3850-
let chain = self.chain_state_view().await?;
3851-
Ok(match chain.inboxes.try_load_entry(&origin).await? {
3852-
Some(inbox) => inbox.next_block_height_to_receive()?,
3853-
None => BlockHeight::ZERO,
3854-
})
3843+
Ok(self
3844+
.client
3845+
.local_node
3846+
.get_inbox_next_height(self.chain_id, origin)
3847+
.await?)
38553848
}
38563849

38573850
#[instrument(level = "trace", skip(remote_node, local_node, notification))]

linera-core/src/local_node.rs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -187,15 +187,12 @@ where
187187
blob_ids: impl IntoIterator<Item = &BlobId>,
188188
chain_id: ChainId,
189189
) -> Result<Option<Vec<Blob>>, LocalNodeError> {
190-
let chain = self.chain_state_view(chain_id).await?;
191-
let mut blobs = Vec::new();
192-
for blob_id in blob_ids {
193-
match chain.manager.locking_blobs.get(blob_id).await? {
194-
None => return Ok(None),
195-
Some(blob) => blobs.push(blob),
196-
}
197-
}
198-
Ok(Some(blobs))
190+
let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
191+
Ok(self
192+
.node
193+
.state
194+
.get_locking_blobs(chain_id, blob_ids_vec)
195+
.await?)
199196
}
200197

201198
/// Writes the given blobs to storage if there is an appropriate blob state.
@@ -304,6 +301,31 @@ where
304301
.await?;
305302
Ok(())
306303
}
304+
305+
pub async fn get_preprocessed_block_hashes(
306+
&self,
307+
chain_id: ChainId,
308+
start: BlockHeight,
309+
end: BlockHeight,
310+
) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
311+
Ok(self
312+
.node
313+
.state
314+
.get_preprocessed_block_hashes(chain_id, start, end)
315+
.await?)
316+
}
317+
318+
pub async fn get_inbox_next_height(
319+
&self,
320+
chain_id: ChainId,
321+
origin: ChainId,
322+
) -> Result<BlockHeight, LocalNodeError> {
323+
Ok(self
324+
.node
325+
.state
326+
.get_inbox_next_height(chain_id, origin)
327+
.await?)
328+
}
307329
}
308330

309331
/// Extension trait for [`ChainInfo`]s from our local node. These should always be valid and

linera-core/src/worker.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,6 +1190,87 @@ where
11901190
})
11911191
.await
11921192
}
1193+
1194+
/// Gets preprocessed block hashes in a given height range.
1195+
#[instrument(target = "telemetry_only", skip_all, fields(
1196+
nickname = %self.nickname,
1197+
chain_id = %chain_id,
1198+
start = %start,
1199+
end = %end
1200+
))]
1201+
pub async fn get_preprocessed_block_hashes(
1202+
&self,
1203+
chain_id: ChainId,
1204+
start: BlockHeight,
1205+
end: BlockHeight,
1206+
) -> Result<Vec<CryptoHash>, WorkerError> {
1207+
self.query_chain_worker(chain_id, move |callback| {
1208+
ChainWorkerRequest::GetPreprocessedBlockHashes {
1209+
start,
1210+
end,
1211+
callback,
1212+
}
1213+
})
1214+
.await
1215+
}
1216+
1217+
/// Gets the next block height to receive from an inbox.
1218+
#[instrument(target = "telemetry_only", skip_all, fields(
1219+
nickname = %self.nickname,
1220+
chain_id = %chain_id,
1221+
origin = %origin
1222+
))]
1223+
pub async fn get_inbox_next_height(
1224+
&self,
1225+
chain_id: ChainId,
1226+
origin: ChainId,
1227+
) -> Result<BlockHeight, WorkerError> {
1228+
self.query_chain_worker(chain_id, move |callback| {
1229+
ChainWorkerRequest::GetInboxNextHeight { origin, callback }
1230+
})
1231+
.await
1232+
}
1233+
1234+
/// Gets locking blobs for specific blob IDs.
1235+
/// Returns `Ok(None)` if any of the blobs is not found.
1236+
#[instrument(target = "telemetry_only", skip_all, fields(
1237+
nickname = %self.nickname,
1238+
chain_id = %chain_id,
1239+
num_blob_ids = %blob_ids.len()
1240+
))]
1241+
pub async fn get_locking_blobs(
1242+
&self,
1243+
chain_id: ChainId,
1244+
blob_ids: Vec<BlobId>,
1245+
) -> Result<Option<Vec<Blob>>, WorkerError> {
1246+
self.query_chain_worker(chain_id, move |callback| {
1247+
ChainWorkerRequest::GetLockingBlobs { blob_ids, callback }
1248+
})
1249+
.await
1250+
}
1251+
1252+
/// Reads a range from the confirmed log.
1253+
#[instrument(target = "telemetry_only", skip_all, fields(
1254+
nickname = %self.nickname,
1255+
chain_id = %chain_id,
1256+
start = %start,
1257+
end = %end
1258+
))]
1259+
pub async fn read_confirmed_log(
1260+
&self,
1261+
chain_id: ChainId,
1262+
start: BlockHeight,
1263+
end: BlockHeight,
1264+
) -> Result<Vec<CryptoHash>, WorkerError> {
1265+
self.query_chain_worker(chain_id, move |callback| {
1266+
ChainWorkerRequest::ReadConfirmedLog {
1267+
start,
1268+
end,
1269+
callback,
1270+
}
1271+
})
1272+
.await
1273+
}
11931274
}
11941275

11951276
#[cfg(with_testing)]

linera-service/src/node_service.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -627,10 +627,7 @@ where
627627
let client = self.context.lock().await.make_chain_client(chain_id);
628628
let hash = match hash {
629629
Some(hash) => Some(hash),
630-
None => {
631-
let view = client.chain_state_view().await?;
632-
view.tip_state.get().block_hash
633-
}
630+
None => client.chain_info().await?.block_hash,
634631
};
635632
if let Some(hash) = hash {
636633
let block = client.read_confirmed_block(hash).await?;
@@ -665,10 +662,7 @@ where
665662
let limit = limit.unwrap_or(10);
666663
let from = match from {
667664
Some(from) => Some(from),
668-
None => {
669-
let view = client.chain_state_view().await?;
670-
view.tip_state.get().block_hash
671-
}
665+
None => client.chain_info().await?.block_hash,
672666
};
673667
let Some(from) = from else {
674668
return Ok(vec![]);

0 commit comments

Comments
 (0)