Skip to content

Commit 5a7ab8a

Browse files
authored
Avoid using shared chain state views except for GraphQL. (#4797)
## Motivation With the exception of a GraphQL query, the chain state view should only be accessed by the corresponding chain worker. GraphQL is why the worker can return a shared chain state view behind a lock, but we are using that in many other places, too. After #4769, this is even more dangerous, as an actor could be dropped and restarted while shared views are still being used. ## Proposal Replace several `chain_state_view()` calls with chain info or new chain worker requests. Also, extend a comment. (See #4793 (comment).) ## Test Plan CI ## Release Plan - These changes should be backported to `testnet_conway`, then - be released in a new SDK. ## Links - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent a5c2afa commit 5a7ab8a

File tree

7 files changed

+265
-35
lines changed

7 files changed

+265
-35
lines changed

linera-chain/src/chain.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,10 @@ where
655655
}
656656

657657
/// Removes the incoming message bundles in the block from the inboxes.
658+
///
659+
/// If `must_be_present` is `true`, an error is returned if any of the bundles have not been
660+
/// added to the inbox yet. So this should be `true` if the bundles are in a block _proposal_,
661+
/// and `false` if the block is already confirmed.
658662
#[instrument(target = "telemetry_only", skip_all, fields(
659663
chain_id = %self.chain_id(),
660664
))]

linera-core/src/chain_worker/actor.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,36 @@ where
157157
new_trackers: BTreeMap<ValidatorPublicKey, u64>,
158158
callback: oneshot::Sender<Result<(), WorkerError>>,
159159
},
160+
161+
/// Get preprocessed block hashes in a given height range.
162+
GetPreprocessedBlockHashes {
163+
start: BlockHeight,
164+
end: BlockHeight,
165+
#[debug(skip)]
166+
callback: oneshot::Sender<Result<Vec<CryptoHash>, WorkerError>>,
167+
},
168+
169+
/// Get the next block height to receive from an inbox.
170+
GetInboxNextHeight {
171+
origin: ChainId,
172+
#[debug(skip)]
173+
callback: oneshot::Sender<Result<BlockHeight, WorkerError>>,
174+
},
175+
176+
/// Get locking blobs for specific blob IDs.
177+
GetLockingBlobs {
178+
blob_ids: Vec<BlobId>,
179+
#[debug(skip)]
180+
callback: oneshot::Sender<Result<Option<Vec<Blob>>, WorkerError>>,
181+
},
182+
183+
/// Read a range from the confirmed log.
184+
ReadConfirmedLog {
185+
start: BlockHeight,
186+
end: BlockHeight,
187+
#[debug(skip)]
188+
callback: oneshot::Sender<Result<Vec<CryptoHash>, WorkerError>>,
189+
},
160190
}
161191

162192
/// The actor worker type.

linera-core/src/chain_worker/state.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,26 @@ where
208208
.await,
209209
)
210210
.is_ok(),
211+
ChainWorkerRequest::GetPreprocessedBlockHashes {
212+
start,
213+
end,
214+
callback,
215+
} => callback
216+
.send(self.get_preprocessed_block_hashes(start, end).await)
217+
.is_ok(),
218+
ChainWorkerRequest::GetInboxNextHeight { origin, callback } => callback
219+
.send(self.get_inbox_next_height(origin).await)
220+
.is_ok(),
221+
ChainWorkerRequest::GetLockingBlobs { blob_ids, callback } => callback
222+
.send(self.get_locking_blobs(blob_ids).await)
223+
.is_ok(),
224+
ChainWorkerRequest::ReadConfirmedLog {
225+
start,
226+
end,
227+
callback,
228+
} => callback
229+
.send(self.read_confirmed_log(start, end).await)
230+
.is_ok(),
211231
};
212232

213233
if !responded {
@@ -1030,6 +1050,92 @@ where
10301050
Ok(())
10311051
}
10321052

1053+
/// Returns the preprocessed block hashes in the given height range.
1054+
#[instrument(target = "telemetry_only", skip_all, fields(
1055+
chain_id = %self.chain_id(),
1056+
start = %start,
1057+
end = %end
1058+
))]
1059+
async fn get_preprocessed_block_hashes(
1060+
&self,
1061+
start: BlockHeight,
1062+
end: BlockHeight,
1063+
) -> Result<Vec<CryptoHash>, WorkerError> {
1064+
let mut hashes = Vec::new();
1065+
let mut height = start;
1066+
while height < end {
1067+
match self.chain.preprocessed_blocks.get(&height).await? {
1068+
Some(hash) => hashes.push(hash),
1069+
None => break,
1070+
}
1071+
height = height.try_add_one()?;
1072+
}
1073+
Ok(hashes)
1074+
}
1075+
1076+
/// Returns the next block height to receive from an inbox.
1077+
#[instrument(target = "telemetry_only", skip_all, fields(
1078+
chain_id = %self.chain_id(),
1079+
origin = %origin
1080+
))]
1081+
async fn get_inbox_next_height(&self, origin: ChainId) -> Result<BlockHeight, WorkerError> {
1082+
Ok(match self.chain.inboxes.try_load_entry(&origin).await? {
1083+
Some(inbox) => inbox.next_block_height_to_receive()?,
1084+
None => BlockHeight::ZERO,
1085+
})
1086+
}
1087+
1088+
/// Returns the locking blobs for the given blob IDs.
1089+
/// Returns `Ok(None)` if any of the blobs is not found.
1090+
#[instrument(target = "telemetry_only", skip_all, fields(
1091+
chain_id = %self.chain_id(),
1092+
num_blob_ids = %blob_ids.len()
1093+
))]
1094+
async fn get_locking_blobs(
1095+
&self,
1096+
blob_ids: Vec<BlobId>,
1097+
) -> Result<Option<Vec<Blob>>, WorkerError> {
1098+
let mut blobs = Vec::new();
1099+
for blob_id in blob_ids {
1100+
match self.chain.manager.locking_blobs.get(&blob_id).await? {
1101+
None => return Ok(None),
1102+
Some(blob) => blobs.push(blob),
1103+
}
1104+
}
1105+
Ok(Some(blobs))
1106+
}
1107+
1108+
/// Reads a range from the confirmed log.
1109+
#[instrument(target = "telemetry_only", skip_all, fields(
1110+
chain_id = %self.chain_id(),
1111+
start = %start,
1112+
end = %end
1113+
))]
1114+
async fn read_confirmed_log(
1115+
&self,
1116+
start: BlockHeight,
1117+
end: BlockHeight,
1118+
) -> Result<Vec<CryptoHash>, WorkerError> {
1119+
let start_usize = usize::try_from(start)?;
1120+
let end_usize = usize::try_from(end)?;
1121+
let log_heights: Vec<_> = (start_usize..end_usize).collect();
1122+
let hashes = self
1123+
.chain
1124+
.confirmed_log
1125+
.multi_get(log_heights.clone())
1126+
.await?
1127+
.into_iter()
1128+
.enumerate()
1129+
.map(|(i, maybe_hash)| {
1130+
maybe_hash.ok_or_else(|| WorkerError::ConfirmedLogEntryNotFound {
1131+
height: BlockHeight(u64::try_from(start_usize + i).unwrap_or(u64::MAX)),
1132+
chain_id: self.chain_id(),
1133+
})
1134+
})
1135+
.collect::<Result<Vec<_>, _>>()?;
1136+
Ok(hashes)
1137+
}
1138+
10331139
/// Attempts to vote for a leader timeout, if possible.
10341140
#[instrument(target = "telemetry_only", skip_all, fields(
10351141
chain_id = %self.chain_id(),

linera-core/src/client/mod.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -402,19 +402,12 @@ impl<Env: Environment> Client<Env> {
402402
) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
403403
let mut last_info = None;
404404
// First load any blocks from local storage, if available.
405-
let mut hashes = Vec::new();
406-
let mut next_height = BlockHeight::ZERO;
407-
{
408-
let chain = self.local_node.chain_state_view(chain_id).await?;
409-
next_height = next_height.max(chain.tip_state.get().next_block_height);
410-
while next_height < stop {
411-
let Some(hash) = chain.preprocessed_blocks.get(&next_height).await? else {
412-
break;
413-
};
414-
hashes.push(hash);
415-
next_height = next_height.try_add_one()?;
416-
}
417-
}
405+
let chain_info = self.local_node.chain_info(chain_id).await?;
406+
let mut next_height = chain_info.next_block_height;
407+
let hashes = self
408+
.local_node
409+
.get_preprocessed_block_hashes(chain_id, next_height, stop)
410+
.await?;
418411
let certificates = self
419412
.storage_client()
420413
.read_certificates(hashes.clone())
@@ -3971,11 +3964,11 @@ impl<Env: Environment> ChainClient<Env> {
39713964
&self,
39723965
origin: ChainId,
39733966
) -> Result<BlockHeight, ChainClientError> {
3974-
let chain = self.chain_state_view().await?;
3975-
Ok(match chain.inboxes.try_load_entry(&origin).await? {
3976-
Some(inbox) => inbox.next_block_height_to_receive()?,
3977-
None => BlockHeight::ZERO,
3978-
})
3967+
Ok(self
3968+
.client
3969+
.local_node
3970+
.get_inbox_next_height(self.chain_id, origin)
3971+
.await?)
39793972
}
39803973

39813974
#[instrument(level = "trace", skip(remote_node, local_node, notification))]

linera-core/src/local_node.rs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -187,15 +187,12 @@ where
187187
blob_ids: impl IntoIterator<Item = &BlobId>,
188188
chain_id: ChainId,
189189
) -> Result<Option<Vec<Blob>>, LocalNodeError> {
190-
let chain = self.chain_state_view(chain_id).await?;
191-
let mut blobs = Vec::new();
192-
for blob_id in blob_ids {
193-
match chain.manager.locking_blobs.get(blob_id).await? {
194-
None => return Ok(None),
195-
Some(blob) => blobs.push(blob),
196-
}
197-
}
198-
Ok(Some(blobs))
190+
let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
191+
Ok(self
192+
.node
193+
.state
194+
.get_locking_blobs(chain_id, blob_ids_vec)
195+
.await?)
199196
}
200197

201198
/// Writes the given blobs to storage if there is an appropriate blob state.
@@ -304,6 +301,31 @@ where
304301
.await?;
305302
Ok(())
306303
}
304+
305+
pub async fn get_preprocessed_block_hashes(
306+
&self,
307+
chain_id: ChainId,
308+
start: BlockHeight,
309+
end: BlockHeight,
310+
) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
311+
Ok(self
312+
.node
313+
.state
314+
.get_preprocessed_block_hashes(chain_id, start, end)
315+
.await?)
316+
}
317+
318+
pub async fn get_inbox_next_height(
319+
&self,
320+
chain_id: ChainId,
321+
origin: ChainId,
322+
) -> Result<BlockHeight, LocalNodeError> {
323+
Ok(self
324+
.node
325+
.state
326+
.get_inbox_next_height(chain_id, origin)
327+
.await?)
328+
}
307329
}
308330

309331
/// Extension trait for [`ChainInfo`]s from our local node. These should always be valid and

linera-core/src/worker.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,6 +1195,87 @@ where
11951195
})
11961196
.await
11971197
}
1198+
1199+
/// Gets preprocessed block hashes in a given height range.
1200+
#[instrument(target = "telemetry_only", skip_all, fields(
1201+
nickname = %self.nickname,
1202+
chain_id = %chain_id,
1203+
start = %start,
1204+
end = %end
1205+
))]
1206+
pub async fn get_preprocessed_block_hashes(
1207+
&self,
1208+
chain_id: ChainId,
1209+
start: BlockHeight,
1210+
end: BlockHeight,
1211+
) -> Result<Vec<CryptoHash>, WorkerError> {
1212+
self.query_chain_worker(chain_id, move |callback| {
1213+
ChainWorkerRequest::GetPreprocessedBlockHashes {
1214+
start,
1215+
end,
1216+
callback,
1217+
}
1218+
})
1219+
.await
1220+
}
1221+
1222+
/// Gets the next block height to receive from an inbox.
1223+
#[instrument(target = "telemetry_only", skip_all, fields(
1224+
nickname = %self.nickname,
1225+
chain_id = %chain_id,
1226+
origin = %origin
1227+
))]
1228+
pub async fn get_inbox_next_height(
1229+
&self,
1230+
chain_id: ChainId,
1231+
origin: ChainId,
1232+
) -> Result<BlockHeight, WorkerError> {
1233+
self.query_chain_worker(chain_id, move |callback| {
1234+
ChainWorkerRequest::GetInboxNextHeight { origin, callback }
1235+
})
1236+
.await
1237+
}
1238+
1239+
/// Gets locking blobs for specific blob IDs.
1240+
/// Returns `Ok(None)` if any of the blobs is not found.
1241+
#[instrument(target = "telemetry_only", skip_all, fields(
1242+
nickname = %self.nickname,
1243+
chain_id = %chain_id,
1244+
num_blob_ids = %blob_ids.len()
1245+
))]
1246+
pub async fn get_locking_blobs(
1247+
&self,
1248+
chain_id: ChainId,
1249+
blob_ids: Vec<BlobId>,
1250+
) -> Result<Option<Vec<Blob>>, WorkerError> {
1251+
self.query_chain_worker(chain_id, move |callback| {
1252+
ChainWorkerRequest::GetLockingBlobs { blob_ids, callback }
1253+
})
1254+
.await
1255+
}
1256+
1257+
/// Reads a range from the confirmed log.
1258+
#[instrument(target = "telemetry_only", skip_all, fields(
1259+
nickname = %self.nickname,
1260+
chain_id = %chain_id,
1261+
start = %start,
1262+
end = %end
1263+
))]
1264+
pub async fn read_confirmed_log(
1265+
&self,
1266+
chain_id: ChainId,
1267+
start: BlockHeight,
1268+
end: BlockHeight,
1269+
) -> Result<Vec<CryptoHash>, WorkerError> {
1270+
self.query_chain_worker(chain_id, move |callback| {
1271+
ChainWorkerRequest::ReadConfirmedLog {
1272+
start,
1273+
end,
1274+
callback,
1275+
}
1276+
})
1277+
.await
1278+
}
11981279
}
11991280

12001281
#[cfg(with_testing)]

linera-service/src/node_service.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -661,10 +661,7 @@ where
661661
let client = self.context.lock().await.make_chain_client(chain_id);
662662
let hash = match hash {
663663
Some(hash) => Some(hash),
664-
None => {
665-
let view = client.chain_state_view().await?;
666-
view.tip_state.get().block_hash
667-
}
664+
None => client.chain_info().await?.block_hash,
668665
};
669666
if let Some(hash) = hash {
670667
let block = client.read_confirmed_block(hash).await?;
@@ -699,10 +696,7 @@ where
699696
let limit = limit.unwrap_or(10);
700697
let from = match from {
701698
Some(from) => Some(from),
702-
None => {
703-
let view = client.chain_state_view().await?;
704-
view.tip_state.get().block_hash
705-
}
699+
None => client.chain_info().await?.block_hash,
706700
};
707701
let Some(from) = from else {
708702
return Ok(vec![]);

0 commit comments

Comments
 (0)