-
Notifications
You must be signed in to change notification settings - Fork 2.6k
chainHead: Add support for storage pagination and cancellation #14755
Conversation
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
7124b90
to
cab6202
Compare
client/rpc-spec-v2/Cargo.toml
Outdated
@@ -36,6 +36,7 @@ tokio = { version = "1.22.0", features = ["sync"] } | |||
array-bytes = "6.1" | |||
log = "0.4.17" | |||
futures-util = { version = "0.3.19", default-features = false } | |||
async-channel = "1.8.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why async-channel
is used here?
You already got the futures-channel and tokio sync
in the dependency tree so I don't follow why this dependency is introduced as the both tokio and futures-channel provides an API for the stuff you implemented on top of the channel
EDIT: tokio::sync::mpsc creates a buffer with 1 and futures_channel a buffer with 1 + num senders, so I guess you want exactly one here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense! I can't actually remember if I picked the async-channel
here since it doesn't require the receiver part to be mutable 🤔
I'll change a bit the API since it's worth not adding extra dependencies, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, didn't notice that :D
.requested_continue | ||
.store(true, std::sync::atomic::Ordering::Release); | ||
|
||
let _ = self.recv_continue.recv().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the channel is closed i.e, returns TrySendError::Closed or can't that happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sender part of the channel should be around until this RegisteredOperation
is dropped, since it object is the only one that can remove the operation from tracking (the operations: Arc<Mutex<HashMap<String, OperationState>>>
field), will add a comment :D
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
This PR adds configurable pagination for the storage query types:
DescendantHashes
andDescendantValues
with the possibility to cancel the operation after theWaitingForContinue
event is generated.WaitingForContinue
event is generatedOperations
object for better ergonomicscontinue
orstop
RegisteredOperation
object is returned to the backend (chainHead event generation) via theBlockGuard
OperationState
object is returned to the frontend and has direct user access viachainHead_unstable_continue
andchainHead_unstable_stopOperation
RegisteredOperation
in RAII manner (on drop) releases the capacity permit for the operation and unregisters the operation from the subscription statecheck_continue_operation
andstop_storage_operation
)Builds on top of: #14699
Closes: #14549
Closes: #14639
// @paritytech/subxt-team